Skip to main content

a3s_flow/
engine.rs

1//! [`FlowEngine`] — the central API for managing workflow executions.
2//!
3//! `FlowEngine` wraps a [`NodeRegistry`] and an in-memory execution map.
4//! It is the recommended entry point for applications that need lifecycle
5//! control over running workflows.
6//!
7//! # Lifecycle
8//!
9//! ```text
10//!                    ┌──────────┐
11//!             start  │ Running  │ ←──────────── resume
12//!           ────────►│          │
13//!                    └────┬─────┘
14//!               pause │   │ node error / all nodes done / terminate
15//!                     ▼   ▼
16//!                 ┌────────────┐   ┌───────────┐   ┌────────────┐
17//!                 │   Paused   │   │ Completed │   │   Failed   │
18//!                 └────────────┘   └───────────┘   └────────────┘
19//!                                                  ┌────────────┐
20//!                                                  │ Terminated │
21//!                                                  └────────────┘
22//! ```
23//!
24//! Pause takes effect at the **next wave boundary** — the current wave of
25//! concurrently executing nodes runs to completion first. Terminate interrupts
26//! the engine between or within waves and is reflected as soon as the runner
27//! task observes the cancellation signal.
28
29use std::collections::HashMap;
30use std::sync::{Arc, RwLock as SyncRwLock};
31
32use serde_json::Value;
33use tokio::sync::{watch, RwLock};
34use tokio_util::sync::CancellationToken;
35use tracing::warn;
36use uuid::Uuid;
37
38use tokio::sync::broadcast;
39
40use crate::capabilities::FlowCapabilities;
41use crate::error::{FlowError, Result};
42use crate::event::{ChannelEmitter, EventEmitter, FlowEvent, MulticastEmitter, NoopEventEmitter};
43use crate::execution::{ExecutionHandle, ExecutionState};
44use crate::flow_store::FlowStore;
45use crate::graph::DagGraph;
46use crate::node::Node;
47use crate::registry::{NodeDescriptor, NodeRegistry};
48use crate::runner::{FlowRunner, FlowSignal};
49use crate::store::ExecutionStore;
50use crate::validation::ValidationIssue;
51
52/// Central entry point for managing workflow executions.
53///
54/// # Example
55///
56/// ```rust,no_run
57/// use a3s_flow::{FlowEngine, NodeRegistry};
58/// use serde_json::json;
59/// use std::collections::HashMap;
60///
61/// #[tokio::main]
62/// async fn main() -> a3s_flow::Result<()> {
63///     let engine = FlowEngine::new(NodeRegistry::with_defaults());
64///
65///     // Query available node types.
66///     println!("node types: {:?}", engine.node_types());
67///
68///     // Start a workflow and get its execution ID.
69///     let definition = json!({
70///         "nodes": [
71///             { "id": "a", "type": "noop" },
72///             { "id": "b", "type": "noop" }
73///         ],
74///         "edges": [{ "source": "a", "target": "b" }]
75///     });
76///     let id = engine.start(&definition, HashMap::new()).await?;
77///
78///     // Inspect state, pause, resume, or terminate.
79///     println!("state: {:?}", engine.state(id).await?);
80///     Ok(())
81/// }
82/// ```
83pub struct FlowEngine {
84    registry: Arc<SyncRwLock<NodeRegistry>>,
85    executions: Arc<RwLock<HashMap<Uuid, ExecutionHandle>>>,
86    /// Optional store — when set, completed results are automatically persisted.
87    execution_store: Option<Arc<dyn ExecutionStore>>,
88    /// Optional store — enables `start_named` to look up definitions by name.
89    flow_store: Option<Arc<dyn FlowStore>>,
90    /// Emitter passed to each runner; receives all node and flow lifecycle events.
91    emitter: Arc<dyn EventEmitter>,
92    /// When set, passed to every runner to cap per-wave node concurrency.
93    max_concurrency: Option<usize>,
94}
95
96impl FlowEngine {
97    /// Create a new engine with the given node registry.
98    ///
99    /// Uses [`NoopEventEmitter`] and no execution store by default. Use the
100    /// builder methods [`with_execution_store`](Self::with_execution_store) and
101    /// [`with_event_emitter`](Self::with_event_emitter) to customise behaviour.
102    pub fn new(registry: NodeRegistry) -> Self {
103        Self {
104            registry: Arc::new(SyncRwLock::new(registry)),
105            executions: Arc::new(RwLock::new(HashMap::new())),
106            execution_store: None,
107            flow_store: None,
108            emitter: Arc::new(NoopEventEmitter),
109            max_concurrency: None,
110        }
111    }
112
113    /// Attach an execution store.
114    ///
115    /// When set, every successfully completed execution result is saved to the
116    /// store automatically. Returns `self` for method chaining.
117    pub fn with_execution_store(mut self, store: Arc<dyn ExecutionStore>) -> Self {
118        self.execution_store = Some(store);
119        self
120    }
121
122    /// Attach a flow definition store.
123    ///
124    /// Required for [`start_named`](Self::start_named). Allows any backend
125    /// (in-memory, SQLite, remote API, …) by implementing [`FlowStore`].
126    /// Returns `self` for method chaining.
127    pub fn with_flow_store(mut self, store: Arc<dyn FlowStore>) -> Self {
128        self.flow_store = Some(store);
129        self
130    }
131
132    /// Attach a custom event emitter.
133    ///
134    /// The emitter is passed to every runner created by this engine and
135    /// receives all node and flow lifecycle events. Returns `self` for chaining.
136    pub fn with_event_emitter(mut self, emitter: Arc<dyn EventEmitter>) -> Self {
137        self.emitter = emitter;
138        self
139    }
140
141    /// Limit the number of nodes that may execute concurrently within a single
142    /// wave across all executions started by this engine.
143    ///
144    /// Delegates to [`FlowRunner::with_max_concurrency`]. Returns `self` for chaining.
145    pub fn with_max_concurrency(mut self, n: usize) -> Self {
146        self.max_concurrency = Some(n);
147        self
148    }
149
150    // ── Node type discovery ────────────────────────────────────────────────
151
152    /// Return all registered node type strings, sorted alphabetically.
153    ///
154    /// Includes built-in types (e.g. `"noop"`) and any types registered via
155    /// [`NodeRegistry::register`].
156    pub fn node_types(&self) -> Vec<String> {
157        self.registry.read().unwrap().list_types()
158    }
159
160    /// Return structured descriptors for all registered node types.
161    ///
162    /// This is the preferred discovery API for building UIs, skill pickers,
163    /// and progressive capability endpoints on top of `a3s-flow`.
164    pub fn node_descriptors(&self) -> Vec<NodeDescriptor> {
165        self.registry.read().unwrap().list_descriptors()
166    }
167
168    /// Return a transport-friendly capabilities document for this engine.
169    ///
170    /// Higher layers can serialize this value directly as JSON for progressive
171    /// discovery APIs.
172    pub fn capabilities(&self) -> FlowCapabilities {
173        FlowCapabilities::from_nodes(self.node_descriptors())
174    }
175
176    // ── Node type management ───────────────────────────────────────────────
177
178    /// Register or replace a node type for future executions started by this engine.
179    pub fn register_node_type(&self, node: Arc<dyn Node>) {
180        self.registry.write().unwrap().register(node);
181    }
182
183    /// Register or replace a node type with explicit discovery metadata.
184    pub fn register_node_type_with_descriptor(
185        &self,
186        node: Arc<dyn Node>,
187        descriptor: NodeDescriptor,
188    ) {
189        self.registry
190            .write()
191            .unwrap()
192            .register_with_descriptor(node, descriptor);
193    }
194
195    /// Remove a node type from this engine's registry.
196    ///
197    /// Returns `Ok(true)` if the node type existed and was removed, `Ok(false)`
198    /// if it was not registered, or an error if the type is protected.
199    ///
200    /// Removal only affects future validations and executions. Already running
201    /// executions keep using the registry snapshot captured when they were started.
202    pub fn unregister_node_type(&self, node_type: &str) -> Result<bool> {
203        self.registry.write().unwrap().unregister(node_type)
204    }
205
206    // ── Pre-flight validation ──────────────────────────────────────────────
207
208    /// Validate a flow definition without executing it.
209    ///
210    /// Returns a list of [`ValidationIssue`]s describing structural problems.
211    /// An empty list means the definition is valid and ready to run.
212    ///
213    /// The following checks are performed:
214    /// - DAG structural validity: no cycles, no unknown edge references,
215    ///   no duplicate node IDs, at least one node.
216    /// - All node types are registered in the engine's [`NodeRegistry`].
217    /// - Every `run_if.from` field references an existing node ID.
218    ///
219    /// ```rust
220    /// use a3s_flow::{FlowEngine, NodeRegistry};
221    /// use serde_json::json;
222    ///
223    /// let engine = FlowEngine::new(NodeRegistry::with_defaults());
224    /// let def = json!({
225    ///     "nodes": [
226    ///         { "id": "a", "type": "noop" },
227    ///         { "id": "b", "type": "unknown-type" }
228    ///     ],
229    ///     "edges": []
230    /// });
231    /// let issues = engine.validate(&def);
232    /// assert_eq!(issues.len(), 1);
233    /// assert!(issues[0].message.contains("unknown node type"));
234    /// ```
235    pub fn validate(&self, definition: &Value) -> Vec<ValidationIssue> {
236        let mut issues = Vec::new();
237
238        // Parse the DAG — catches cycle, unknown refs, duplicate IDs, empty flow.
239        let dag = match DagGraph::from_json(definition) {
240            Ok(dag) => dag,
241            Err(e) => {
242                issues.push(ValidationIssue {
243                    node_id: None,
244                    message: e.to_string(),
245                });
246                return issues;
247            }
248        };
249
250        for node_def in dag.nodes_in_order() {
251            // Check node type is registered.
252            if self
253                .registry
254                .read()
255                .unwrap()
256                .get(&node_def.node_type)
257                .is_err()
258            {
259                issues.push(ValidationIssue {
260                    node_id: Some(node_def.id.clone()),
261                    message: format!("unknown node type '{}'", node_def.node_type),
262                });
263            }
264
265            // Check run_if.from references an existing node.
266            if let Some(ref cond) = node_def.run_if {
267                if !dag.nodes.contains_key(&cond.from) {
268                    issues.push(ValidationIssue {
269                        node_id: Some(node_def.id.clone()),
270                        message: format!("run_if references unknown node '{}'", cond.from),
271                    });
272                }
273            }
274        }
275
276        issues
277    }
278
279    // ── Execution lifecycle ────────────────────────────────────────────────
280
281    /// Start a new workflow execution from a JSON DAG definition.
282    ///
283    /// The definition is parsed and validated synchronously. If valid, the
284    /// execution is launched in a background Tokio task and the execution ID
285    /// is returned immediately — the flow runs concurrently with the caller.
286    ///
287    /// # Errors
288    ///
289    /// Returns an error if the definition is invalid (cycle, unknown node ID,
290    /// bad JSON, unregistered node type).
291    pub async fn start(
292        &self,
293        definition: &Value,
294        variables: HashMap<String, Value>,
295    ) -> Result<Uuid> {
296        let (id, _rx) = self.start_inner(definition, variables).await?;
297        Ok(id)
298    }
299
300    /// Start a workflow and return a live event stream alongside the execution ID.
301    ///
302    /// The returned [`broadcast::Receiver<FlowEvent>`] is created **before** the
303    /// execution task is spawned, guaranteeing that no events are missed —
304    /// including `FlowStarted`. Multiple subscribers can be created by calling
305    /// [`broadcast::Receiver::resubscribe`].
306    ///
307    /// The stream closes (returns `Err(RecvError::Closed)`) when the execution
308    /// reaches a terminal state (`Completed`, `Failed`, or `Terminated`).
309    ///
310    /// If the engine also has a custom [`EventEmitter`] configured via
311    /// [`with_event_emitter`](Self::with_event_emitter), both the emitter and
312    /// the broadcast channel receive every event.
313    ///
314    /// # Example
315    ///
316    /// ```rust,no_run
317    /// use a3s_flow::{FlowEngine, FlowEvent, NodeRegistry};
318    /// use serde_json::json;
319    /// use std::collections::HashMap;
320    ///
321    /// #[tokio::main]
322    /// async fn main() -> a3s_flow::Result<()> {
323    ///     let engine = FlowEngine::new(NodeRegistry::with_defaults());
324    ///     let def = json!({
325    ///         "nodes": [{ "id": "a", "type": "noop" }],
326    ///         "edges": []
327    ///     });
328    ///
329    ///     let (id, mut rx) = engine.start_streaming(&def, HashMap::new()).await?;
330    ///
331    ///     while let Ok(event) = rx.recv().await {
332    ///         match event {
333    ///             FlowEvent::NodeCompleted { node_id, .. } => println!("done: {node_id}"),
334    ///             FlowEvent::FlowCompleted { .. } => break,
335    ///             _ => {}
336    ///         }
337    ///     }
338    ///     Ok(())
339    /// }
340    /// ```
341    pub async fn start_streaming(
342        &self,
343        definition: &Value,
344        variables: HashMap<String, Value>,
345    ) -> Result<(Uuid, broadcast::Receiver<FlowEvent>)> {
346        let (id, rx) = self.start_inner(definition, variables).await?;
347        Ok((id, rx))
348    }
349
350    /// Subscribe to live events for an existing execution.
351    ///
352    /// The returned receiver attaches to the execution's dedicated broadcast
353    /// channel. Events emitted before subscription are not replayed.
354    pub async fn subscribe(&self, id: Uuid) -> Result<broadcast::Receiver<FlowEvent>> {
355        let executions = self.executions.read().await;
356        let handle = executions
357            .get(&id)
358            .ok_or(FlowError::ExecutionNotFound(id))?;
359        let receiver = handle
360            .event_tx
361            .read()
362            .unwrap()
363            .as_ref()
364            .map(|tx| tx.subscribe())
365            .ok_or_else(|| FlowError::InvalidTransition {
366                action: "subscribe".into(),
367                from: "finished".into(),
368            })?;
369        Ok(receiver)
370    }
371
372    /// Internal start implementation shared by both `start` and
373    /// `start_streaming`.
374    async fn start_inner(
375        &self,
376        definition: &Value,
377        variables: HashMap<String, Value>,
378    ) -> Result<(Uuid, broadcast::Receiver<FlowEvent>)> {
379        let dag = DagGraph::from_json(definition)?;
380        let registry = self.registry.read().unwrap().clone();
381        let (event_tx, event_rx) = broadcast::channel(256);
382        let channel_emitter = Arc::new(ChannelEmitter::new(event_tx.clone()));
383        let emitter: Arc<dyn EventEmitter> = Arc::new(MulticastEmitter {
384            a: Arc::clone(&self.emitter),
385            b: channel_emitter,
386        });
387        let mut runner = FlowRunner::new(dag, registry).with_event_emitter(emitter);
388        if let Some(ref fs) = self.flow_store {
389            runner = runner.with_flow_store(Arc::clone(fs));
390        }
391        if let Some(n) = self.max_concurrency {
392            runner = runner.with_max_concurrency(n);
393        }
394
395        let execution_id = Uuid::new_v4();
396        let cancel = CancellationToken::new();
397        let (signal_tx, signal_rx) = watch::channel(FlowSignal::Run);
398        let state = Arc::new(RwLock::new(ExecutionState::Running));
399        let context: Arc<SyncRwLock<HashMap<String, Value>>> =
400            Arc::new(SyncRwLock::new(HashMap::new()));
401        let event_tx_handle = Arc::new(SyncRwLock::new(Some(event_tx)));
402
403        let handle = ExecutionHandle {
404            state: Arc::clone(&state),
405            signal_tx,
406            cancel: cancel.clone(),
407            context: Arc::clone(&context),
408            event_tx: Arc::clone(&event_tx_handle),
409        };
410
411        self.executions.write().await.insert(execution_id, handle);
412
413        // Spawn the execution task; it updates state on terminal transitions.
414        // Logging and event emission are handled by FlowRunner::run_seeded.
415        let state_for_task = Arc::clone(&state);
416        let event_tx_for_task = Arc::clone(&event_tx_handle);
417        let execution_store = self.execution_store.clone();
418        tokio::spawn(async move {
419            match runner
420                .run_controlled(execution_id, variables, signal_rx, cancel, context)
421                .await
422            {
423                Ok(result) => {
424                    // Persist the result if a store is configured.
425                    if let Some(ref store) = execution_store {
426                        if let Err(e) = store.save(&result).await {
427                            warn!(%execution_id, error = %e, "failed to persist execution result");
428                        }
429                    }
430                    *state_for_task.write().await = ExecutionState::Completed(result);
431                }
432                Err(FlowError::Terminated) => {
433                    *state_for_task.write().await = ExecutionState::Terminated;
434                }
435                Err(e) => {
436                    *state_for_task.write().await = ExecutionState::Failed(e.to_string());
437                }
438            }
439            let _ = event_tx_for_task.write().unwrap().take();
440        });
441
442        Ok((execution_id, event_rx))
443    }
444
445    /// Start a workflow by loading its definition from the configured
446    /// [`FlowStore`] by name.
447    ///
448    /// Equivalent to:
449    /// ```rust,ignore
450    /// let def = flow_store.load(name).await?.ok_or(...)?;
451    /// engine.start(&def, variables).await
452    /// ```
453    ///
454    /// # Errors
455    ///
456    /// - [`FlowError::Internal`] if no `FlowStore` was configured via
457    ///   [`with_flow_store`](Self::with_flow_store).
458    /// - [`FlowError::FlowNotFound`] if no definition exists under `name`.
459    /// - Any error returned by [`start`](Self::start) (invalid definition, etc.).
460    pub async fn start_named(&self, name: &str, variables: HashMap<String, Value>) -> Result<Uuid> {
461        let store = self.flow_store.as_ref().ok_or_else(|| {
462            FlowError::Internal("no FlowStore configured; call with_flow_store first".into())
463        })?;
464
465        let definition = store
466            .load(name)
467            .await?
468            .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
469
470        self.start(&definition, variables).await
471    }
472
473    /// Pause a running execution at the next wave boundary.
474    ///
475    /// Nodes in the **current wave** continue until they finish. No new wave
476    /// starts until [`resume`](Self::resume) is called.
477    ///
478    /// # Errors
479    ///
480    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
481    /// - [`FlowError::InvalidTransition`] if the execution is not `Running`.
482    pub async fn pause(&self, id: Uuid) -> Result<()> {
483        let executions = self.executions.read().await;
484        let handle = executions
485            .get(&id)
486            .ok_or(FlowError::ExecutionNotFound(id))?;
487
488        let mut state = handle.state.write().await;
489        match *state {
490            ExecutionState::Running => {
491                handle.signal_tx.send(FlowSignal::Pause).ok();
492                *state = ExecutionState::Paused;
493                Ok(())
494            }
495            ref s => Err(FlowError::InvalidTransition {
496                action: "pause".into(),
497                from: s.as_str().into(),
498            }),
499        }
500    }
501
502    /// Resume a paused execution.
503    ///
504    /// # Errors
505    ///
506    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
507    /// - [`FlowError::InvalidTransition`] if the execution is not `Paused`.
508    pub async fn resume(&self, id: Uuid) -> Result<()> {
509        let executions = self.executions.read().await;
510        let handle = executions
511            .get(&id)
512            .ok_or(FlowError::ExecutionNotFound(id))?;
513
514        let mut state = handle.state.write().await;
515        match *state {
516            ExecutionState::Paused => {
517                handle.signal_tx.send(FlowSignal::Run).ok();
518                *state = ExecutionState::Running;
519                Ok(())
520            }
521            ref s => Err(FlowError::InvalidTransition {
522                action: "resume".into(),
523                from: s.as_str().into(),
524            }),
525        }
526    }
527
528    /// Terminate an execution immediately.
529    ///
530    /// Sends a cancellation signal. The execution task stops at the next
531    /// cancellation checkpoint (between waves, or within a wave's result
532    /// collection). If the execution is currently paused it is unblocked so
533    /// it can observe the cancellation.
534    ///
535    /// # Errors
536    ///
537    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
538    /// - [`FlowError::InvalidTransition`] if the execution is already in a
539    ///   terminal state (`Completed`, `Failed`, `Terminated`).
540    pub async fn terminate(&self, id: Uuid) -> Result<()> {
541        let executions = self.executions.read().await;
542        let handle = executions
543            .get(&id)
544            .ok_or(FlowError::ExecutionNotFound(id))?;
545
546        let state = handle.state.read().await;
547        if state.is_terminal() {
548            return Err(FlowError::InvalidTransition {
549                action: "terminate".into(),
550                from: state.as_str().into(),
551            });
552        }
553        drop(state);
554
555        handle.cancel.cancel();
556        // Unblock a paused runner so it can observe the cancellation.
557        handle.signal_tx.send(FlowSignal::Run).ok();
558        Ok(())
559    }
560
561    /// Return a snapshot of the current state of an execution.
562    ///
563    /// # Errors
564    ///
565    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
566    pub async fn state(&self, id: Uuid) -> Result<ExecutionState> {
567        let executions = self.executions.read().await;
568        let handle = executions
569            .get(&id)
570            .ok_or(FlowError::ExecutionNotFound(id))?;
571        // Clone while the guard is still held, then drop the guard before returning.
572        let snapshot = handle.state.read().await.clone();
573        Ok(snapshot)
574    }
575
576    // ── Context CRUD ───────────────────────────────────────────────────────
577
578    /// Return a snapshot of the shared mutable context for a running execution.
579    ///
580    /// The context is a `HashMap<String, Value>` that nodes may read and write
581    /// via [`ExecContext::context`](crate::node::ExecContext::context) during
582    /// execution. This method lets the caller inspect (or react to) the
583    /// accumulated state from outside the runner.
584    ///
585    /// # Errors
586    ///
587    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
588    pub async fn get_context(&self, id: Uuid) -> Result<HashMap<String, Value>> {
589        let executions = self.executions.read().await;
590        let handle = executions
591            .get(&id)
592            .ok_or(FlowError::ExecutionNotFound(id))?;
593        let snapshot = handle.context.read().unwrap().clone();
594        Ok(snapshot)
595    }
596
597    /// Insert or overwrite a single entry in the shared context of a running
598    /// execution.
599    ///
600    /// The change is immediately visible to any node that reads the context
601    /// after this call returns.
602    ///
603    /// # Errors
604    ///
605    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
606    pub async fn set_context_entry(&self, id: Uuid, key: String, value: Value) -> Result<()> {
607        let executions = self.executions.read().await;
608        let handle = executions
609            .get(&id)
610            .ok_or(FlowError::ExecutionNotFound(id))?;
611        handle.context.write().unwrap().insert(key, value);
612        Ok(())
613    }
614
615    /// Remove a single entry from the shared context of a running execution.
616    ///
617    /// Returns `true` if the key existed and was removed, `false` if it was
618    /// not present.
619    ///
620    /// # Errors
621    ///
622    /// - [`FlowError::ExecutionNotFound`] if the ID is unknown.
623    pub async fn delete_context_entry(&self, id: Uuid, key: &str) -> Result<bool> {
624        let executions = self.executions.read().await;
625        let handle = executions
626            .get(&id)
627            .ok_or(FlowError::ExecutionNotFound(id))?;
628        let removed = handle.context.write().unwrap().remove(key).is_some();
629        Ok(removed)
630    }
631}
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636    use crate::node::{ExecContext, Node};
637    use async_trait::async_trait;
638    use serde_json::{json, Value};
639    use std::time::Duration;
640
641    // ── Helpers ────────────────────────────────────────────────────────────
642
643    /// A node that sleeps for the given duration before returning.
644    struct SlowNode(Duration);
645
646    #[async_trait]
647    impl Node for SlowNode {
648        fn node_type(&self) -> &str {
649            "slow"
650        }
651
652        async fn execute(&self, _ctx: ExecContext) -> crate::error::Result<Value> {
653            tokio::time::sleep(self.0).await;
654            Ok(json!({}))
655        }
656    }
657
658    fn slow_engine(delay: Duration) -> FlowEngine {
659        let mut registry = NodeRegistry::with_defaults();
660        registry.register(Arc::new(SlowNode(delay)));
661        FlowEngine::new(registry)
662    }
663
664    fn simple_def() -> Value {
665        json!({
666            "nodes": [
667                { "id": "a", "type": "noop" },
668                { "id": "b", "type": "noop" }
669            ],
670            "edges": [{ "source": "a", "target": "b" }]
671        })
672    }
673
674    fn slow_def() -> Value {
675        json!({
676            "nodes": [
677                { "id": "a", "type": "slow" },
678                { "id": "b", "type": "slow" }
679            ],
680            "edges": [{ "source": "a", "target": "b" }]
681        })
682    }
683
684    // ── node_types ─────────────────────────────────────────────────────────
685
686    #[test]
687    fn node_types_includes_builtins() {
688        let engine = FlowEngine::new(NodeRegistry::with_defaults());
689        let types = engine.node_types();
690        assert!(types.contains(&"noop".to_string()));
691    }
692
693    #[test]
694    fn node_types_includes_custom_nodes() {
695        let mut registry = NodeRegistry::with_defaults();
696        registry.register(Arc::new(SlowNode(Duration::from_millis(1))));
697        let engine = FlowEngine::new(registry);
698
699        let types = engine.node_types();
700        assert!(types.contains(&"noop".to_string()));
701        assert!(types.contains(&"slow".to_string()));
702    }
703
704    #[test]
705    fn node_types_is_sorted() {
706        let engine = FlowEngine::new(NodeRegistry::with_defaults());
707        let types = engine.node_types();
708        let mut sorted = types.clone();
709        sorted.sort();
710        assert_eq!(types, sorted);
711    }
712
713    #[test]
714    fn node_descriptors_include_builtin_metadata() {
715        let engine = FlowEngine::new(NodeRegistry::with_defaults());
716        let descriptors = engine.node_descriptors();
717        let llm = descriptors
718            .iter()
719            .find(|descriptor| descriptor.node_type == "llm")
720            .unwrap();
721        assert_eq!(llm.display_name, "LLM");
722        assert_eq!(llm.category, "ai");
723        assert!(llm.summary.contains("OpenAI-compatible"));
724        assert!(llm.default_data.is_object());
725        assert!(!llm.fields.is_empty());
726    }
727
728    #[test]
729    fn node_descriptors_include_custom_nodes() {
730        let mut registry = NodeRegistry::with_defaults();
731        registry.register(Arc::new(SlowNode(Duration::from_millis(1))));
732        let engine = FlowEngine::new(registry);
733
734        let descriptors = engine.node_descriptors();
735        let slow = descriptors
736            .iter()
737            .find(|descriptor| descriptor.node_type == "slow")
738            .unwrap();
739        assert_eq!(slow.display_name, "slow");
740        assert_eq!(slow.category, "custom");
741    }
742
743    #[test]
744    fn register_node_type_adds_custom_type_at_runtime() {
745        let engine = FlowEngine::new(NodeRegistry::with_defaults());
746        engine.register_node_type(Arc::new(SlowNode(Duration::from_millis(1))));
747
748        let types = engine.node_types();
749        assert!(types.contains(&"slow".to_string()));
750    }
751
752    #[test]
753    fn register_node_type_with_descriptor_updates_catalog() {
754        let engine = FlowEngine::new(NodeRegistry::with_defaults());
755        engine.register_node_type_with_descriptor(
756            Arc::new(SlowNode(Duration::from_millis(1))),
757            NodeDescriptor {
758                node_type: "ignored".to_string(),
759                display_name: "Slow Node".to_string(),
760                category: "testing".to_string(),
761                summary: "Sleeps briefly during tests.".to_string(),
762                default_data: json!({ "delay_ms": 1 }),
763                fields: vec![],
764            },
765        );
766
767        let slow = engine
768            .node_descriptors()
769            .into_iter()
770            .find(|descriptor| descriptor.node_type == "slow")
771            .unwrap();
772        assert_eq!(slow.display_name, "Slow Node");
773        assert_eq!(slow.category, "testing");
774        assert_eq!(slow.default_data["delay_ms"], 1);
775    }
776
777    #[test]
778    fn unregister_node_type_removes_runtime_type() {
779        let engine = FlowEngine::new(NodeRegistry::with_defaults());
780        engine.register_node_type(Arc::new(SlowNode(Duration::from_millis(1))));
781
782        assert!(engine.unregister_node_type("slow").unwrap());
783        assert!(!engine.node_types().contains(&"slow".to_string()));
784
785        let def = json!({
786            "nodes": [{ "id": "a", "type": "slow" }],
787            "edges": []
788        });
789        let issues = engine.validate(&def);
790        assert_eq!(issues.len(), 1);
791        assert!(issues[0].message.contains("unknown node type"));
792    }
793
794    #[test]
795    fn unregister_node_type_rejects_builtin_types() {
796        let engine = FlowEngine::new(NodeRegistry::with_defaults());
797
798        let err = engine.unregister_node_type("noop").unwrap_err();
799        assert!(matches!(err, FlowError::ProtectedNodeType(ref ty) if ty == "noop"));
800        assert!(engine.node_types().contains(&"noop".to_string()));
801    }
802
803    #[test]
804    fn capabilities_include_node_catalog() {
805        let engine = FlowEngine::new(NodeRegistry::with_defaults());
806        let capabilities = engine.capabilities();
807        assert_eq!(capabilities.version, "2026-03-22");
808        assert!(capabilities.progressive_disclosure);
809        assert!(capabilities
810            .nodes
811            .iter()
812            .any(|node| node.node_type == "llm"));
813    }
814
815    #[test]
816    fn http_request_descriptor_carries_editor_metadata() {
817        let engine = FlowEngine::new(NodeRegistry::with_defaults());
818        let descriptors = engine.node_descriptors();
819        let http = descriptors
820            .iter()
821            .find(|descriptor| descriptor.node_type == "http-request")
822            .unwrap();
823        assert_eq!(http.default_data["method"], "GET");
824        assert!(http.fields.iter().any(|field| field.key == "url"));
825    }
826
827    // ── start ──────────────────────────────────────────────────────────────
828
829    #[tokio::test]
830    async fn start_returns_execution_id() {
831        let engine = FlowEngine::new(NodeRegistry::with_defaults());
832        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
833        // ID is non-nil.
834        assert!(!id.is_nil());
835    }
836
837    #[tokio::test]
838    async fn start_rejects_invalid_definition() {
839        let engine = FlowEngine::new(NodeRegistry::with_defaults());
840        let bad = json!({
841            "nodes": [{ "id": "a", "type": "noop" }],
842            "edges": [{ "source": "ghost", "target": "a" }]
843        });
844        assert!(matches!(
845            engine.start(&bad, HashMap::new()).await,
846            Err(FlowError::UnknownNode(_))
847        ));
848    }
849
850    #[tokio::test]
851    async fn completed_flow_has_outputs() {
852        let engine = FlowEngine::new(NodeRegistry::with_defaults());
853        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
854
855        // Wait for the background task to finish.
856        tokio::time::sleep(Duration::from_millis(50)).await;
857
858        let state = engine.state(id).await.unwrap();
859        if let ExecutionState::Completed(result) = state {
860            assert!(result.outputs.contains_key("a"));
861            assert!(result.outputs.contains_key("b"));
862        } else {
863            panic!("expected Completed, got {}", state.as_str());
864        }
865    }
866
867    // ── state ──────────────────────────────────────────────────────────────
868
869    #[tokio::test]
870    async fn state_returns_not_found_for_unknown_id() {
871        let engine = FlowEngine::new(NodeRegistry::with_defaults());
872        let err = engine.state(Uuid::new_v4()).await.unwrap_err();
873        assert!(matches!(err, FlowError::ExecutionNotFound(_)));
874    }
875
876    // ── pause / resume ─────────────────────────────────────────────────────
877
878    #[tokio::test]
879    async fn pause_transitions_to_paused() {
880        let engine = slow_engine(Duration::from_millis(200));
881        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
882
883        // Give the runner a moment to start wave 1.
884        tokio::time::sleep(Duration::from_millis(10)).await;
885        engine.pause(id).await.unwrap();
886
887        assert!(matches!(
888            engine.state(id).await.unwrap(),
889            ExecutionState::Paused
890        ));
891    }
892
893    #[tokio::test]
894    async fn resume_transitions_to_running() {
895        let engine = slow_engine(Duration::from_millis(200));
896        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
897
898        tokio::time::sleep(Duration::from_millis(10)).await;
899        engine.pause(id).await.unwrap();
900        engine.resume(id).await.unwrap();
901
902        assert!(matches!(
903            engine.state(id).await.unwrap(),
904            ExecutionState::Running
905        ));
906    }
907
908    #[tokio::test]
909    async fn pause_on_completed_flow_returns_invalid_transition() {
910        let engine = FlowEngine::new(NodeRegistry::with_defaults());
911        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
912
913        tokio::time::sleep(Duration::from_millis(50)).await;
914        // Flow should be completed by now.
915        let err = engine.pause(id).await.unwrap_err();
916        assert!(matches!(err, FlowError::InvalidTransition { .. }));
917    }
918
919    #[tokio::test]
920    async fn resume_on_running_flow_returns_invalid_transition() {
921        let engine = slow_engine(Duration::from_millis(200));
922        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
923
924        tokio::time::sleep(Duration::from_millis(10)).await;
925        // Still running — not paused.
926        let err = engine.resume(id).await.unwrap_err();
927        assert!(matches!(err, FlowError::InvalidTransition { .. }));
928
929        engine.terminate(id).await.unwrap();
930    }
931
932    // ── terminate ──────────────────────────────────────────────────────────
933
934    #[tokio::test]
935    async fn terminate_stops_slow_execution() {
936        let engine = slow_engine(Duration::from_millis(500));
937        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
938
939        tokio::time::sleep(Duration::from_millis(10)).await;
940        engine.terminate(id).await.unwrap();
941
942        // The runner task should observe the cancellation quickly.
943        tokio::time::sleep(Duration::from_millis(50)).await;
944
945        assert!(matches!(
946            engine.state(id).await.unwrap(),
947            ExecutionState::Terminated
948        ));
949    }
950
951    #[tokio::test]
952    async fn terminate_unblocks_paused_execution() {
953        let engine = slow_engine(Duration::from_millis(500));
954        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
955
956        tokio::time::sleep(Duration::from_millis(10)).await;
957        engine.pause(id).await.unwrap();
958
959        // Flow is paused — now terminate it.
960        engine.terminate(id).await.unwrap();
961
962        tokio::time::sleep(Duration::from_millis(600)).await;
963
964        assert!(matches!(
965            engine.state(id).await.unwrap(),
966            ExecutionState::Terminated
967        ));
968    }
969
970    #[tokio::test]
971    async fn terminate_on_completed_flow_returns_invalid_transition() {
972        let engine = FlowEngine::new(NodeRegistry::with_defaults());
973        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
974
975        tokio::time::sleep(Duration::from_millis(50)).await;
976        let err = engine.terminate(id).await.unwrap_err();
977        assert!(matches!(err, FlowError::InvalidTransition { .. }));
978    }
979
980    #[tokio::test]
981    async fn unknown_execution_id_returns_not_found() {
982        let engine = FlowEngine::new(NodeRegistry::with_defaults());
983        let id = Uuid::new_v4();
984        assert!(matches!(
985            engine.pause(id).await,
986            Err(FlowError::ExecutionNotFound(_))
987        ));
988        assert!(matches!(
989            engine.resume(id).await,
990            Err(FlowError::ExecutionNotFound(_))
991        ));
992        assert!(matches!(
993            engine.terminate(id).await,
994            Err(FlowError::ExecutionNotFound(_))
995        ));
996    }
997
998    // ── ExecutionStore integration ──────────────────────────────────────────
999
1000    #[tokio::test]
1001    async fn execution_store_saves_completed_result() {
1002        use crate::store::MemoryExecutionStore;
1003
1004        let store = Arc::new(MemoryExecutionStore::new());
1005        let engine = FlowEngine::new(NodeRegistry::with_defaults())
1006            .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
1007
1008        let id = engine.start(&simple_def(), HashMap::new()).await.unwrap();
1009        tokio::time::sleep(Duration::from_millis(50)).await;
1010
1011        // Engine should have saved the result.
1012        let ids = store.list().await.unwrap();
1013        assert!(ids.contains(&id), "stored execution id not found");
1014
1015        let saved = store.load(id).await.unwrap().unwrap();
1016        assert_eq!(saved.execution_id, id);
1017        assert!(saved.outputs.contains_key("a"));
1018        assert!(saved.outputs.contains_key("b"));
1019    }
1020
1021    #[tokio::test]
1022    async fn execution_store_not_used_on_terminated_execution() {
1023        use crate::store::MemoryExecutionStore;
1024
1025        let store = Arc::new(MemoryExecutionStore::new());
1026        let engine = slow_engine(Duration::from_millis(500))
1027            .with_execution_store(Arc::clone(&store) as Arc<dyn crate::store::ExecutionStore>);
1028
1029        let id = engine.start(&slow_def(), HashMap::new()).await.unwrap();
1030        tokio::time::sleep(Duration::from_millis(10)).await;
1031        engine.terminate(id).await.unwrap();
1032        tokio::time::sleep(Duration::from_millis(50)).await;
1033
1034        // Terminated executions are not saved.
1035        assert!(
1036            store.list().await.unwrap().is_empty(),
1037            "terminated result should not be stored"
1038        );
1039    }
1040
1041    // ── EventEmitter integration (via engine) ───────────────────────────────
1042
1043    #[tokio::test]
1044    async fn engine_emitter_receives_flow_and_node_events() {
1045        use crate::event::EventEmitter;
1046        use std::sync::atomic::{AtomicU32, Ordering};
1047
1048        struct CountEmitter {
1049            flow_started: Arc<AtomicU32>,
1050            flow_completed: Arc<AtomicU32>,
1051            node_started: Arc<AtomicU32>,
1052            node_completed: Arc<AtomicU32>,
1053            node_skipped: Arc<AtomicU32>,
1054            node_failed: Arc<AtomicU32>,
1055            node_completed_full: Arc<AtomicU32>,
1056            iteration_started: Arc<AtomicU32>,
1057            iteration_next: Arc<AtomicU32>,
1058            iteration_completed: Arc<AtomicU32>,
1059            loop_started: Arc<AtomicU32>,
1060            loop_completed: Arc<AtomicU32>,
1061            parallel_branch_started: Arc<AtomicU32>,
1062            parallel_branch_completed: Arc<AtomicU32>,
1063            node_retry: Arc<AtomicU32>,
1064        }
1065
1066        #[async_trait::async_trait]
1067        impl EventEmitter for CountEmitter {
1068            async fn on_flow_started(&self, _: Uuid) {
1069                self.flow_started.fetch_add(1, Ordering::SeqCst);
1070            }
1071            async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {
1072                self.flow_completed.fetch_add(1, Ordering::SeqCst);
1073            }
1074            async fn on_flow_failed(&self, _: Uuid, _: &str) {}
1075            async fn on_flow_terminated(&self, _: Uuid) {}
1076            async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {
1077                self.node_started.fetch_add(1, Ordering::SeqCst);
1078            }
1079            async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {
1080                self.node_completed.fetch_add(1, Ordering::SeqCst);
1081            }
1082            async fn on_node_skipped(&self, _: Uuid, _: &str) {
1083                self.node_skipped.fetch_add(1, Ordering::SeqCst);
1084            }
1085            async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {
1086                self.node_failed.fetch_add(1, Ordering::SeqCst);
1087            }
1088            async fn on_node_completed_full(
1089                &self,
1090                _: Uuid,
1091                _: &str,
1092                _: &str,
1093                _: &serde_json::Value,
1094                _: Option<&serde_json::Value>,
1095                _: &serde_json::Value,
1096                _: u64,
1097            ) {
1098                self.node_completed_full.fetch_add(1, Ordering::SeqCst);
1099            }
1100            async fn on_iteration_started(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1101            async fn on_iteration_next(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1102            async fn on_iteration_completed(&self, _: Uuid, _: &str, _: &str) {}
1103            async fn on_loop_started(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1104            async fn on_loop_completed(&self, _: Uuid, _: &str, _: &str) {}
1105            async fn on_parallel_branch_started(&self, _: Uuid, _: &str, _: &str, _: &str) {}
1106            async fn on_parallel_branch_completed(
1107                &self,
1108                _: Uuid,
1109                _: &str,
1110                _: &str,
1111                _: &str,
1112                _: &serde_json::Value,
1113            ) {
1114            }
1115            async fn on_node_retry(&self, _: Uuid, _: &str, _: u32, _: u32) {}
1116        }
1117
1118        let flow_started = Arc::new(AtomicU32::new(0));
1119        let flow_completed = Arc::new(AtomicU32::new(0));
1120        let node_started = Arc::new(AtomicU32::new(0));
1121        let node_completed = Arc::new(AtomicU32::new(0));
1122        let node_skipped = Arc::new(AtomicU32::new(0));
1123        let node_failed = Arc::new(AtomicU32::new(0));
1124        let node_completed_full = Arc::new(AtomicU32::new(0));
1125        let iteration_started = Arc::new(AtomicU32::new(0));
1126        let iteration_next = Arc::new(AtomicU32::new(0));
1127        let iteration_completed = Arc::new(AtomicU32::new(0));
1128        let loop_started = Arc::new(AtomicU32::new(0));
1129        let loop_completed = Arc::new(AtomicU32::new(0));
1130        let parallel_branch_started = Arc::new(AtomicU32::new(0));
1131        let parallel_branch_completed = Arc::new(AtomicU32::new(0));
1132        let node_retry = Arc::new(AtomicU32::new(0));
1133
1134        let emitter = Arc::new(CountEmitter {
1135            flow_started: Arc::clone(&flow_started),
1136            flow_completed: Arc::clone(&flow_completed),
1137            node_started: Arc::clone(&node_started),
1138            node_completed: Arc::clone(&node_completed),
1139            node_skipped: Arc::clone(&node_skipped),
1140            node_failed: Arc::clone(&node_failed),
1141            node_completed_full: Arc::clone(&node_completed_full),
1142            iteration_started: Arc::clone(&iteration_started),
1143            iteration_next: Arc::clone(&iteration_next),
1144            iteration_completed: Arc::clone(&iteration_completed),
1145            loop_started: Arc::clone(&loop_started),
1146            loop_completed: Arc::clone(&loop_completed),
1147            parallel_branch_started: Arc::clone(&parallel_branch_started),
1148            parallel_branch_completed: Arc::clone(&parallel_branch_completed),
1149            node_retry: Arc::clone(&node_retry),
1150        });
1151
1152        let engine = FlowEngine::new(NodeRegistry::with_defaults())
1153            .with_event_emitter(emitter as Arc<dyn EventEmitter>);
1154
1155        // simple_def has nodes a and b.
1156        engine.start(&simple_def(), HashMap::new()).await.unwrap();
1157        tokio::time::sleep(Duration::from_millis(50)).await;
1158
1159        assert_eq!(flow_started.load(Ordering::SeqCst), 1, "flow_started");
1160        assert_eq!(flow_completed.load(Ordering::SeqCst), 1, "flow_completed");
1161        assert_eq!(node_started.load(Ordering::SeqCst), 2, "node_started (a+b)");
1162        assert_eq!(
1163            node_completed.load(Ordering::SeqCst),
1164            2,
1165            "node_completed (a+b)"
1166        );
1167    }
1168
1169    // ── start_named ────────────────────────────────────────────────────────
1170
1171    #[tokio::test]
1172    async fn start_named_loads_and_runs_from_flow_store() {
1173        use crate::flow_store::MemoryFlowStore;
1174
1175        let flow_store = Arc::new(MemoryFlowStore::new());
1176        flow_store.save("greet", &simple_def()).await.unwrap();
1177
1178        let engine = FlowEngine::new(NodeRegistry::with_defaults())
1179            .with_flow_store(Arc::clone(&flow_store) as Arc<dyn crate::flow_store::FlowStore>);
1180
1181        let id = engine.start_named("greet", HashMap::new()).await.unwrap();
1182        assert!(!id.is_nil());
1183
1184        tokio::time::sleep(Duration::from_millis(50)).await;
1185        assert!(matches!(
1186            engine.state(id).await.unwrap(),
1187            ExecutionState::Completed(_)
1188        ));
1189    }
1190
1191    #[tokio::test]
1192    async fn start_named_returns_flow_not_found_for_unknown_name() {
1193        use crate::flow_store::MemoryFlowStore;
1194
1195        let engine = FlowEngine::new(NodeRegistry::with_defaults())
1196            .with_flow_store(
1197                Arc::new(MemoryFlowStore::new()) as Arc<dyn crate::flow_store::FlowStore>
1198            );
1199
1200        let err = engine
1201            .start_named("nonexistent", HashMap::new())
1202            .await
1203            .unwrap_err();
1204
1205        assert!(
1206            matches!(err, FlowError::FlowNotFound(ref n) if n == "nonexistent"),
1207            "expected FlowNotFound, got: {err}"
1208        );
1209    }
1210
1211    #[tokio::test]
1212    async fn start_named_returns_internal_when_no_store_configured() {
1213        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1214
1215        let err = engine
1216            .start_named("anything", HashMap::new())
1217            .await
1218            .unwrap_err();
1219
1220        assert!(
1221            matches!(err, FlowError::Internal(_)),
1222            "expected Internal, got: {err}"
1223        );
1224    }
1225
1226    // ── start_streaming ────────────────────────────────────────────────────
1227
1228    #[tokio::test]
1229    async fn start_streaming_delivers_flow_started_and_completed_events() {
1230        use crate::event::FlowEvent;
1231
1232        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1233        let (_, mut rx) = engine
1234            .start_streaming(&simple_def(), HashMap::new())
1235            .await
1236            .unwrap();
1237
1238        let mut saw_started = false;
1239        let mut saw_completed = false;
1240
1241        loop {
1242            match rx.recv().await {
1243                Ok(FlowEvent::FlowStarted { .. }) => saw_started = true,
1244                Ok(FlowEvent::FlowCompleted { .. }) => {
1245                    saw_completed = true;
1246                    break;
1247                }
1248                Ok(_) => {}
1249                Err(_) => break,
1250            }
1251        }
1252
1253        assert!(saw_started, "FlowStarted not received");
1254        assert!(saw_completed, "FlowCompleted not received");
1255    }
1256
1257    #[tokio::test]
1258    async fn start_streaming_delivers_node_events_for_each_node() {
1259        use crate::event::FlowEvent;
1260        use std::collections::HashSet;
1261
1262        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1263        let (_, mut rx) = engine
1264            .start_streaming(&simple_def(), HashMap::new())
1265            .await
1266            .unwrap();
1267
1268        let mut completed_nodes: HashSet<String> = HashSet::new();
1269
1270        loop {
1271            match rx.recv().await {
1272                Ok(FlowEvent::NodeCompleted { node_id, .. }) => {
1273                    completed_nodes.insert(node_id);
1274                }
1275                Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1276                Ok(_) => {}
1277            }
1278        }
1279
1280        assert!(completed_nodes.contains("a"), "node 'a' not in stream");
1281        assert!(completed_nodes.contains("b"), "node 'b' not in stream");
1282    }
1283
1284    #[tokio::test]
1285    async fn start_streaming_zero_events_lost_on_fast_flow() {
1286        // Sanity check: even on an instantaneously completing flow the
1287        // receiver created before spawn misses no events.
1288        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1289        let def = json!({ "nodes": [{ "id": "x", "type": "noop" }], "edges": [] });
1290
1291        let (_id, mut rx) = engine.start_streaming(&def, HashMap::new()).await.unwrap();
1292
1293        let mut event_count = 0u32;
1294        loop {
1295            match rx.recv().await {
1296                Ok(_) => event_count += 1,
1297                Err(_) => break,
1298            }
1299        }
1300        // FlowStarted + NodeStarted + NodeCompleted + FlowCompleted = 4 minimum
1301        assert!(event_count >= 4, "expected ≥4 events, got {event_count}");
1302    }
1303
1304    #[tokio::test]
1305    async fn start_streaming_existing_emitter_also_fires() {
1306        use crate::event::{EventEmitter, FlowEvent};
1307        use std::sync::atomic::{AtomicU32, Ordering};
1308
1309        struct CountEmitter(Arc<AtomicU32>);
1310
1311        #[async_trait::async_trait]
1312        impl EventEmitter for CountEmitter {
1313            async fn on_flow_started(&self, _: Uuid) {
1314                self.0.fetch_add(1, Ordering::SeqCst);
1315            }
1316            async fn on_flow_completed(&self, _: Uuid, _: &crate::result::FlowResult) {}
1317            async fn on_flow_failed(&self, _: Uuid, _: &str) {}
1318            async fn on_flow_terminated(&self, _: Uuid) {}
1319            async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
1320            async fn on_node_completed(&self, _: Uuid, _: &str, _: &serde_json::Value) {}
1321            async fn on_node_skipped(&self, _: Uuid, _: &str) {}
1322            async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
1323            async fn on_node_completed_full(
1324                &self,
1325                _: Uuid,
1326                _: &str,
1327                _: &str,
1328                _: &serde_json::Value,
1329                _: Option<&serde_json::Value>,
1330                _: &serde_json::Value,
1331                _: u64,
1332            ) {
1333            }
1334            async fn on_iteration_started(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1335            async fn on_iteration_next(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1336            async fn on_iteration_completed(&self, _: Uuid, _: &str, _: &str) {}
1337            async fn on_loop_started(&self, _: Uuid, _: &str, _: &str, _: u32) {}
1338            async fn on_loop_completed(&self, _: Uuid, _: &str, _: &str) {}
1339            async fn on_parallel_branch_started(&self, _: Uuid, _: &str, _: &str, _: &str) {}
1340            async fn on_parallel_branch_completed(
1341                &self,
1342                _: Uuid,
1343                _: &str,
1344                _: &str,
1345                _: &str,
1346                _: &serde_json::Value,
1347            ) {
1348            }
1349            async fn on_node_retry(&self, _: Uuid, _: &str, _: u32, _: u32) {}
1350        }
1351
1352        let counter = Arc::new(AtomicU32::new(0));
1353        let engine = FlowEngine::new(NodeRegistry::with_defaults())
1354            .with_event_emitter(
1355                Arc::new(CountEmitter(Arc::clone(&counter))) as Arc<dyn EventEmitter>
1356            );
1357
1358        let (_id, mut rx) = engine
1359            .start_streaming(&simple_def(), HashMap::new())
1360            .await
1361            .unwrap();
1362
1363        // Drain the stream.
1364        loop {
1365            match rx.recv().await {
1366                Ok(FlowEvent::FlowCompleted { .. }) | Err(_) => break,
1367                Ok(_) => {}
1368            }
1369        }
1370
1371        // The existing CountEmitter should also have received FlowStarted.
1372        assert_eq!(
1373            counter.load(Ordering::SeqCst),
1374            1,
1375            "existing emitter did not fire"
1376        );
1377    }
1378
1379    // ── validate ───────────────────────────────────────────────────────────
1380
1381    #[test]
1382    fn validate_returns_empty_for_valid_flow() {
1383        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1384        let def = json!({
1385            "nodes": [
1386                { "id": "a", "type": "noop" },
1387                { "id": "b", "type": "noop" }
1388            ],
1389            "edges": [{ "source": "a", "target": "b" }]
1390        });
1391        assert!(engine.validate(&def).is_empty());
1392    }
1393
1394    #[test]
1395    fn validate_catches_unknown_node_type() {
1396        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1397        let def = json!({
1398            "nodes": [
1399                { "id": "a", "type": "noop" },
1400                { "id": "b", "type": "does-not-exist" }
1401            ],
1402            "edges": []
1403        });
1404        let issues = engine.validate(&def);
1405        assert_eq!(issues.len(), 1);
1406        assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1407        assert!(issues[0].message.contains("unknown node type"));
1408    }
1409
1410    #[test]
1411    fn validate_catches_cyclic_graph() {
1412        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1413        let def = json!({
1414            "nodes": [
1415                { "id": "a", "type": "noop" },
1416                { "id": "b", "type": "noop" }
1417            ],
1418            "edges": [
1419                { "source": "a", "target": "b" },
1420                { "source": "b", "target": "a" }
1421            ]
1422        });
1423        let issues = engine.validate(&def);
1424        assert_eq!(issues.len(), 1);
1425        assert!(issues[0].node_id.is_none());
1426    }
1427
1428    #[test]
1429    fn validate_catches_run_if_referencing_unknown_node() {
1430        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1431        let def = json!({
1432            "nodes": [
1433                { "id": "a", "type": "noop" },
1434                {
1435                    "id": "b",
1436                    "type": "noop",
1437                    "data": {
1438                        "run_if": { "from": "ghost", "path": "", "op": "eq", "value": true }
1439                    }
1440                }
1441            ],
1442            "edges": [{ "source": "a", "target": "b" }]
1443        });
1444        let issues = engine.validate(&def);
1445        assert_eq!(issues.len(), 1);
1446        assert_eq!(issues[0].node_id.as_deref(), Some("b"));
1447        assert!(issues[0].message.contains("ghost"));
1448    }
1449
1450    #[test]
1451    fn validate_reports_multiple_issues() {
1452        let engine = FlowEngine::new(NodeRegistry::with_defaults());
1453        let def = json!({
1454            "nodes": [
1455                { "id": "a", "type": "bad-type-1" },
1456                { "id": "b", "type": "bad-type-2" }
1457            ],
1458            "edges": []
1459        });
1460        assert_eq!(engine.validate(&def).len(), 2);
1461    }
1462}