Skip to main content

runkon_flow/
flow_engine.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use crate::cancellation::CancellationToken;
7use crate::cancellation_reason::CancellationReason;
8use crate::dsl::{
9    detect_workflow_cycles, ValidationError, WorkflowDef, WorkflowNode, QUALITY_GATE_TYPE,
10};
11use crate::engine::{
12    run_workflow_engine, ChildWorkflowContext, ChildWorkflowRunner, ExecutionState,
13};
14use crate::engine_error::EngineError;
15use crate::events::EventSink;
16use crate::output_schema::OutputSchema;
17use crate::status::WorkflowRunStatus;
18use crate::traits::action_executor::{ActionExecutor, ActionRegistry};
19use crate::traits::gate_resolver::{GateResolver, GateResolverRegistry};
20use crate::traits::item_provider::{ItemProvider, ItemProviderRegistry};
21use crate::traits::persistence::WorkflowPersistence;
22use crate::traits::run_context::RunContext;
23use crate::traits::script_env_provider::{NoOpScriptEnvProvider, ScriptEnvProvider};
24use crate::traits::workflow_resolver::WorkflowResolver;
25use crate::types::{WorkflowExecConfig, WorkflowResult, WorkflowRunStep};
26use crate::workflow_resolver_directory::DirectoryWorkflowResolver;
27
28// ---------------------------------------------------------------------------
29// FlowEngine
30// ---------------------------------------------------------------------------
31
32/// All per-run state needed by `cancel_run()` and `Drop`. Stored atomically in
33/// a single `Mutex<HashMap>` so register/deregister/drain are each one lock.
34struct ActiveRunEntry {
35    token: CancellationToken,
36    shutdown: Arc<AtomicBool>,
37    persistence: Arc<dyn WorkflowPersistence>,
38    registry: Arc<ActionRegistry>,
39    /// (executor_label, step_id) of the step currently in flight, if any.
40    exec_info: Arc<Mutex<Option<(String, String)>>>,
41    /// Stop flag for the lease refresh thread. Set to `true` to ask it to exit.
42    refresh_stop: Arc<AtomicBool>,
43    /// Thread handle used to `unpark()` the refresh thread for fast teardown.
44    refresh_thread: Option<std::thread::Thread>,
45    /// Join handle for the refresh thread.
46    refresh_handle: Option<std::thread::JoinHandle<()>>,
47}
48
49// ---------------------------------------------------------------------------
50// Lease refresh thread
51// ---------------------------------------------------------------------------
52
53struct RefreshContext {
54    persistence: Arc<dyn WorkflowPersistence>,
55    run_id: String,
56    token: String,
57    ttl_seconds: i64,
58    refresh_interval: Duration,
59    stop: Arc<AtomicBool>,
60    cancellation: CancellationToken,
61    shutdown: Arc<AtomicBool>,
62    registry: Arc<ActionRegistry>,
63    exec_info: Arc<Mutex<Option<(String, String)>>>,
64}
65
66fn refresh_lease_loop(ctx: RefreshContext) {
67    loop {
68        std::thread::park_timeout(ctx.refresh_interval);
69        if ctx.stop.load(Ordering::Relaxed) {
70            return;
71        }
72        match ctx
73            .persistence
74            .acquire_lease(&ctx.run_id, &ctx.token, ctx.ttl_seconds)
75        {
76            Ok(Some(_)) => {} // renewed successfully
77            Ok(None) => {
78                tracing::warn!(
79                    "run {}: lease reclaimed by another engine, aborting",
80                    ctx.run_id
81                );
82                signal_lease_abort(ctx.shutdown, ctx.cancellation, ctx.registry, ctx.exec_info);
83                return;
84            }
85            Err(e) => {
86                tracing::warn!("run {}: lease refresh DB error: {e}, aborting", ctx.run_id);
87                signal_lease_abort(ctx.shutdown, ctx.cancellation, ctx.registry, ctx.exec_info);
88                return;
89            }
90        }
91    }
92}
93
94fn signal_lease_abort(
95    shutdown: Arc<AtomicBool>,
96    cancellation: CancellationToken,
97    registry: Arc<ActionRegistry>,
98    exec_info: Arc<Mutex<Option<(String, String)>>>,
99) {
100    shutdown.store(true, Ordering::SeqCst);
101    cancellation.cancel(CancellationReason::LeaseLost);
102    let snap = exec_info.lock().unwrap_or_else(|e| e.into_inner()).clone();
103    if let Some((exec_label, step_id)) = snap {
104        std::thread::spawn(move || {
105            if let Err(e) = registry.cancel(&exec_label, &step_id) {
106                tracing::warn!("lease abort: cancel step '{step_id}' failed: {e}");
107            }
108        });
109    }
110}
111
112/// Signal the refresh thread to exit and wake it immediately.
113fn stop_refresh_thread(stop: &AtomicBool, thread: Option<&std::thread::Thread>) {
114    stop.store(true, Ordering::SeqCst);
115    if let Some(t) = thread {
116        t.unpark();
117    }
118}
119
120/// The primary harness for running and validating workflows.
121///
122/// Produced by [`FlowEngineBuilder::build()`].
123pub struct FlowEngine {
124    pub(crate) action_registry: ActionRegistry,
125    pub(crate) item_provider_registry: ItemProviderRegistry,
126    pub(crate) gate_resolver_registry: GateResolverRegistry,
127    /// Held for future use when FlowEngine constructs ExecutionState directly.
128    #[allow(dead_code)]
129    pub(crate) script_env_provider: Arc<dyn ScriptEnvProvider>,
130    pub(crate) workflow_resolver: Option<Arc<dyn WorkflowResolver>>,
131    pub(crate) event_sinks: Vec<Arc<dyn EventSink>>,
132    /// All per-run cancellation state in a single map so register/deregister
133    /// are atomic (one lock covers token + shutdown + persistence + registry).
134    active_runs: Mutex<HashMap<String, ActiveRunEntry>>,
135}
136
137/// All inputs required to start a top-level workflow execution via [`FlowEngine::run_workflow`].
138///
139/// When adding new fields to [`crate::engine::ExecutionState`], add the corresponding slot here
140/// so production callers can supply them without reaching into `test_helpers`.
141#[non_exhaustive]
142pub struct RunInput {
143    pub persistence: Arc<dyn WorkflowPersistence>,
144    pub workflow_run_id: String,
145    pub workflow_name: String,
146    pub action_registry: Arc<ActionRegistry>,
147    pub item_provider_registry: Arc<ItemProviderRegistry>,
148    pub script_env_provider: Arc<dyn ScriptEnvProvider>,
149    pub run_ctx: Arc<dyn RunContext>,
150    pub extra_plugin_dirs: Vec<String>,
151    pub model: Option<String>,
152    pub exec_config: WorkflowExecConfig,
153    pub inputs: HashMap<String, String>,
154    /// Empty string for top-level runs.
155    pub parent_run_id: String,
156    pub depth: u32,
157    pub target_label: Option<String>,
158    pub default_as_identity: Option<String>,
159    pub triggered_by_hook: bool,
160    #[allow(clippy::type_complexity)]
161    pub schema_resolver:
162        Option<Arc<dyn Fn(&str) -> crate::engine_error::Result<OutputSchema> + Send + Sync>>,
163    pub child_runner: Option<Arc<dyn ChildWorkflowRunner>>,
164    pub cancellation: CancellationToken,
165    /// Per-run sinks appended after the engine's own event sinks.
166    pub event_sinks: Vec<Arc<dyn EventSink>>,
167}
168
169/// Inputs for running a child workflow from a [`ChildWorkflowRunner`] implementation.
170///
171/// Pairs with [`ChildWorkflowContext`] (projected from the parent run) to supply fields
172/// that the parent context does not carry. Pass to [`FlowEngine::run_child`].
173///
174/// **Note on `child_runner`:** pass `Some(Arc::clone(&self_runner))` from a
175/// `ChildWorkflowRunner::execute_child` implementation so grandchild workflow calls work.
176/// A `None` value produces a child run that cannot fan out further.
177#[non_exhaustive]
178pub struct ChildRunInput {
179    /// The pre-created child run ID (from persistence).
180    pub workflow_run_id: String,
181    pub persistence: Arc<dyn WorkflowPersistence>,
182    pub action_registry: Arc<ActionRegistry>,
183    pub item_provider_registry: Arc<ItemProviderRegistry>,
184    pub script_env_provider: Arc<dyn ScriptEnvProvider>,
185    /// Re-inject the same runner for grandchild support; `None` disables further nesting.
186    pub child_runner: Option<Arc<dyn ChildWorkflowRunner>>,
187    #[allow(clippy::type_complexity)]
188    pub schema_resolver:
189        Option<Arc<dyn Fn(&str) -> crate::engine_error::Result<OutputSchema> + Send + Sync>>,
190    /// Maps to `ExecutionState::default_as_identity`.
191    pub as_identity: Option<String>,
192    pub depth: u32,
193    pub cancellation: CancellationToken,
194    pub target_label: Option<String>,
195    pub triggered_by_hook: bool,
196    /// When `Some`, replaces the parent context's `inputs`; when `None`, parent inputs flow through.
197    pub inputs_override: Option<HashMap<String, String>>,
198}
199
200impl RunInput {
201    /// Construct a `RunInput` with required fields; all optional/defaultable fields are zeroed.
202    ///
203    /// Use direct field assignment (`input.model = Some(...)`) to set any optional fields.
204    #[allow(clippy::too_many_arguments)]
205    pub fn new(
206        persistence: Arc<dyn WorkflowPersistence>,
207        workflow_run_id: String,
208        workflow_name: String,
209        action_registry: Arc<ActionRegistry>,
210        item_provider_registry: Arc<ItemProviderRegistry>,
211        script_env_provider: Arc<dyn ScriptEnvProvider>,
212        run_ctx: Arc<dyn RunContext>,
213        cancellation: CancellationToken,
214    ) -> Self {
215        Self {
216            persistence,
217            workflow_run_id,
218            workflow_name,
219            action_registry,
220            item_provider_registry,
221            script_env_provider,
222            run_ctx,
223            extra_plugin_dirs: vec![],
224            model: None,
225            exec_config: WorkflowExecConfig::default(),
226            inputs: HashMap::new(),
227            parent_run_id: String::new(),
228            depth: 0,
229            target_label: None,
230            default_as_identity: None,
231            triggered_by_hook: false,
232            schema_resolver: None,
233            child_runner: None,
234            cancellation,
235            event_sinks: vec![],
236        }
237    }
238}
239
240impl ChildRunInput {
241    /// Construct a `ChildRunInput` with required fields; all optional/defaultable fields are zeroed.
242    ///
243    /// Use direct field assignment (`input.as_identity = Some(...)`) to set any optional fields.
244    pub fn new(
245        workflow_run_id: String,
246        persistence: Arc<dyn WorkflowPersistence>,
247        action_registry: Arc<ActionRegistry>,
248        item_provider_registry: Arc<ItemProviderRegistry>,
249        script_env_provider: Arc<dyn ScriptEnvProvider>,
250        depth: u32,
251        cancellation: CancellationToken,
252    ) -> Self {
253        Self {
254            workflow_run_id,
255            persistence,
256            action_registry,
257            item_provider_registry,
258            script_env_provider,
259            child_runner: None,
260            schema_resolver: None,
261            as_identity: None,
262            depth,
263            cancellation,
264            target_label: None,
265            triggered_by_hook: false,
266            inputs_override: None,
267        }
268    }
269}
270
271/// Build an `ExecutionState` from caller-supplied config fields with all runtime accumulators
272/// zeroed. Centralises the accumulator defaults so `run_workflow` and `run_child` stay in sync.
273#[allow(clippy::too_many_arguments, clippy::type_complexity)]
274fn make_fresh_execution_state(
275    persistence: Arc<dyn WorkflowPersistence>,
276    action_registry: Arc<ActionRegistry>,
277    item_provider_registry: Arc<ItemProviderRegistry>,
278    script_env_provider: Arc<dyn ScriptEnvProvider>,
279    workflow_run_id: String,
280    workflow_name: String,
281    run_ctx: Arc<dyn RunContext>,
282    extra_plugin_dirs: Vec<String>,
283    model: Option<String>,
284    exec_config: WorkflowExecConfig,
285    inputs: HashMap<String, String>,
286    parent_run_id: String,
287    depth: u32,
288    target_label: Option<String>,
289    default_as_identity: Option<String>,
290    triggered_by_hook: bool,
291    #[allow(clippy::type_complexity)] schema_resolver: Option<
292        Arc<dyn Fn(&str) -> crate::engine_error::Result<OutputSchema> + Send + Sync>,
293    >,
294    child_runner: Option<Arc<dyn ChildWorkflowRunner>>,
295    cancellation: CancellationToken,
296    event_sinks: Arc<[Arc<dyn EventSink>]>,
297) -> ExecutionState {
298    ExecutionState {
299        persistence,
300        action_registry,
301        script_env_provider,
302        workflow_run_id,
303        workflow_name,
304        run_ctx,
305        extra_plugin_dirs,
306        model,
307        exec_config,
308        inputs,
309        parent_run_id,
310        depth,
311        target_label,
312        default_as_identity,
313        triggered_by_hook,
314        schema_resolver,
315        child_runner,
316        cancellation,
317        event_sinks,
318        registry: item_provider_registry,
319        // Runtime accumulators — zeroed for a fresh run.
320        step_results: HashMap::new(),
321        contexts: vec![],
322        position: 0,
323        all_succeeded: true,
324        total_cost: 0.0,
325        total_turns: 0,
326        total_duration_ms: 0,
327        total_input_tokens: 0,
328        total_output_tokens: 0,
329        total_cache_read_input_tokens: 0,
330        total_cache_creation_input_tokens: 0,
331        has_llm_metrics: false,
332        last_gate_feedback: None,
333        block_output: None,
334        block_with: vec![],
335        resume_ctx: None,
336        last_heartbeat_at: ExecutionState::new_heartbeat(),
337        current_execution_id: Arc::new(Mutex::new(None)),
338        owner_token: None,
339        lease_generation: None,
340    }
341}
342
343impl FlowEngine {
344    /// Validate a workflow definition against the registered executors, providers,
345    /// and gate resolvers.
346    ///
347    /// Collects all errors before returning. Returns `Ok(())` when valid, or
348    /// `Err(errors)` with one entry per problem found. Public so CI lint tools
349    /// can call it without actually running the workflow.
350    ///
351    /// # Registry asymmetry with `run()`
352    ///
353    /// This method validates against the **`FlowEngine`'s own registries** (those
354    /// supplied to `FlowEngineBuilder`). `run()`, however, validates against the
355    /// **`ExecutionState`'s registries** at call time. Because the two registry
356    /// sets are independent, a workflow that passes `validate()` may still be
357    /// rejected by `run()` if the `ExecutionState` was built with a different
358    /// set of action executors or item providers. Use `validate()` for static
359    /// analysis (CI lint, pre-flight checks) when you control both the engine
360    /// and execution state; rely on `run()`'s own validation when working with
361    /// externally-supplied `ExecutionState` values.
362    pub fn validate(&self, def: &WorkflowDef) -> Result<(), Vec<ValidationError>> {
363        self.validate_with_registries(
364            &self.action_registry,
365            &self.item_provider_registry,
366            &self.gate_resolver_registry,
367            def,
368        )
369    }
370
371    /// Build the merged event-sink slice: engine-wide sinks followed by per-run `extra` sinks.
372    fn build_event_sinks(&self, extra: &[Arc<dyn EventSink>]) -> Arc<[Arc<dyn EventSink>]> {
373        let mut sinks = self.event_sinks.clone();
374        sinks.extend_from_slice(extra);
375        Arc::from(sinks)
376    }
377
378    /// Core execution path. Validates, acquires a lease, runs the workflow, and tears down.
379    /// Does **not** overwrite `state.event_sinks` — callers set it before entering.
380    fn run_inner(
381        &self,
382        def: &WorkflowDef,
383        state: &mut ExecutionState,
384    ) -> crate::engine_error::Result<WorkflowResult> {
385        if let Err(validation_errors) = self.validate_with_registries(
386            &state.action_registry,
387            &state.registry,
388            &self.gate_resolver_registry,
389            def,
390        ) {
391            let joined = validation_errors
392                .iter()
393                .map(|e| e.to_string())
394                .collect::<Vec<_>>()
395                .join("\n");
396            return Err(EngineError::Workflow(format!(
397                "workflow '{}' failed validation:\n{}",
398                def.name, joined
399            )));
400        }
401
402        let lease_ttl_secs = state.exec_config.lease_ttl_secs;
403        let refresh_interval = state.exec_config.lease_refresh_interval;
404
405        // Acquire or re-claim lease (idempotent when token already set by resume()).
406        let token = state
407            .owner_token
408            .get_or_insert_with(|| ulid::Ulid::new().to_string())
409            .as_str();
410        match state
411            .persistence
412            .acquire_lease(&state.workflow_run_id, token, lease_ttl_secs)
413        {
414            Ok(Some(gen)) => {
415                state.lease_generation = Some(gen);
416            }
417            Ok(None) => return Err(EngineError::AlreadyOwned(state.workflow_run_id.clone())),
418            Err(e) => return Err(e),
419        }
420
421        // Ensure the exec_config.shutdown arc exists so cancel_run() can set it.
422        let shutdown_arc = state
423            .exec_config
424            .shutdown
425            .get_or_insert_with(|| Arc::new(AtomicBool::new(false)))
426            .clone();
427
428        let run_id = state.workflow_run_id.clone();
429
430        // Spawn the background refresh thread.
431        let refresh_stop = Arc::new(AtomicBool::new(false));
432        let refresh_handle = {
433            let ctx = RefreshContext {
434                persistence: Arc::clone(&state.persistence),
435                run_id: run_id.clone(),
436                token: state
437                    .owner_token
438                    .clone()
439                    .expect("owner_token was just set by get_or_insert_with"),
440                ttl_seconds: lease_ttl_secs,
441                refresh_interval,
442                stop: Arc::clone(&refresh_stop),
443                cancellation: state.cancellation.clone(),
444                shutdown: Arc::clone(&shutdown_arc),
445                registry: Arc::clone(&state.action_registry),
446                exec_info: Arc::clone(&state.current_execution_id),
447            };
448            std::thread::spawn(move || refresh_lease_loop(ctx))
449        };
450        let refresh_thread = refresh_handle.thread().clone();
451
452        // Register all per-run cancellation state in a single lock so cancel_run()
453        // and Drop each see a consistent snapshot.
454        {
455            let mut runs = self.active_runs.lock().unwrap_or_else(|e| e.into_inner());
456            runs.insert(
457                run_id.clone(),
458                ActiveRunEntry {
459                    token: state.cancellation.clone(),
460                    shutdown: shutdown_arc,
461                    persistence: Arc::clone(&state.persistence),
462                    registry: Arc::clone(&state.action_registry),
463                    exec_info: Arc::clone(&state.current_execution_id),
464                    refresh_stop,
465                    refresh_thread: Some(refresh_thread),
466                    refresh_handle: Some(refresh_handle),
467                },
468            );
469        }
470
471        let result = run_workflow_engine(state, def);
472
473        // Capture LeaseLost BEFORE stopping the refresh thread to avoid a teardown
474        // race where the thread sets LeaseLost after the run has already succeeded.
475        let lease_lost_during_run = matches!(
476            state.cancellation.reason(),
477            Some(CancellationReason::LeaseLost)
478        );
479
480        // Stop the refresh thread and join it before deregistering.
481        let join_handle = {
482            let mut runs = self.active_runs.lock().unwrap_or_else(|e| e.into_inner());
483            runs.remove(&run_id).and_then(|entry| {
484                stop_refresh_thread(&entry.refresh_stop, entry.refresh_thread.as_ref());
485                entry.refresh_handle
486            })
487        };
488        // Join outside the lock to avoid blocking cancel_run callers.
489        if let Some(h) = join_handle {
490            let _ = h.join();
491        }
492
493        if lease_lost_during_run {
494            return Err(EngineError::Cancelled(CancellationReason::LeaseLost));
495        }
496
497        result
498    }
499
500    /// Run a workflow definition with a pre-built execution state.
501    ///
502    /// Validates against the execution state's own registries (action,
503    /// item-provider) so the validation check uses the same source of truth
504    /// as dispatch-time lookup.  Gate resolvers are validated against the
505    /// FlowEngine's registry because `ExecutionState` carries none — gates
506    /// are resolved via persistence callbacks, not the executor pipeline.
507    ///
508    /// Event sinks registered on the engine are injected into the state for
509    /// this run; any sinks already set on `state.event_sinks` are replaced.
510    pub fn run(
511        &self,
512        def: &WorkflowDef,
513        state: &mut ExecutionState,
514    ) -> crate::engine_error::Result<WorkflowResult> {
515        state.event_sinks = self.build_event_sinks(&[]);
516        self.run_inner(def, state)
517    }
518
519    /// Run a top-level workflow, constructing `ExecutionState` internally from `input`.
520    ///
521    /// Acquires the lease inside this call so callers never observe an uninitialized
522    /// `lease_generation`. Engine-wide event sinks are merged with `input.event_sinks`
523    /// (engine sinks first).
524    pub fn run_workflow(
525        &self,
526        def: &WorkflowDef,
527        input: RunInput,
528    ) -> crate::engine_error::Result<WorkflowResult> {
529        let event_sinks = self.build_event_sinks(&input.event_sinks);
530        let mut state = make_fresh_execution_state(
531            input.persistence,
532            input.action_registry,
533            input.item_provider_registry,
534            input.script_env_provider,
535            input.workflow_run_id,
536            input.workflow_name,
537            input.run_ctx,
538            input.extra_plugin_dirs,
539            input.model,
540            input.exec_config,
541            input.inputs,
542            input.parent_run_id,
543            input.depth,
544            input.target_label,
545            input.default_as_identity,
546            input.triggered_by_hook,
547            input.schema_resolver,
548            input.child_runner,
549            input.cancellation,
550            event_sinks,
551        );
552        self.run_inner(def, &mut state)
553    }
554
555    /// Run a child workflow as part of a [`ChildWorkflowRunner`] implementation.
556    ///
557    /// Inherits `run_ctx`, `extra_plugin_dirs`, `model`, `exec_config`, and event sinks
558    /// from `parent_ctx`. Remaining harness-side fields come from `input`. The child's
559    /// `parent_run_id` is set to `parent_ctx.workflow_run_id`.
560    ///
561    /// `inputs` is sourced from `input.inputs_override` when `Some`, otherwise from
562    /// `parent_ctx.inputs`.
563    ///
564    /// Engine-wide event sinks are merged with `parent_ctx.event_sinks` (engine first).
565    pub fn run_child(
566        &self,
567        def: &WorkflowDef,
568        input: ChildRunInput,
569        parent_ctx: &ChildWorkflowContext,
570    ) -> crate::engine_error::Result<WorkflowResult> {
571        let event_sinks = self.build_event_sinks(&parent_ctx.event_sinks);
572        let inputs = input
573            .inputs_override
574            .unwrap_or_else(|| parent_ctx.inputs.clone());
575        let mut state = make_fresh_execution_state(
576            input.persistence,
577            input.action_registry,
578            input.item_provider_registry,
579            input.script_env_provider,
580            input.workflow_run_id,
581            def.name.clone(),
582            Arc::clone(&parent_ctx.run_ctx),
583            parent_ctx.extra_plugin_dirs.clone(),
584            parent_ctx.model.clone(),
585            parent_ctx.exec_config.clone(),
586            inputs,
587            parent_ctx.workflow_run_id.clone(),
588            input.depth,
589            input.target_label,
590            input.as_identity,
591            input.triggered_by_hook,
592            input.schema_resolver,
593            input.child_runner,
594            input.cancellation,
595            event_sinks,
596        );
597        self.run_inner(def, &mut state)
598    }
599
600    /// Resume a workflow from the post-reset DB state.
601    ///
602    /// Reads completed steps from persistence, builds the skip set internally, and
603    /// delegates to `run()`. The `state.resume_ctx` must be `None` on entry — this
604    /// method owns skip-set construction so that it reads the *post-reset* DB state.
605    pub fn resume(
606        &self,
607        def: &WorkflowDef,
608        state: &mut ExecutionState,
609    ) -> crate::engine_error::Result<WorkflowResult> {
610        if state.resume_ctx.is_some() {
611            return Err(EngineError::Workflow(
612                "resume() requires resume_ctx to be None on entry".to_string(),
613            ));
614        }
615
616        // Acquire before any DB reads so concurrent resume() calls race exactly once.
617        let token = ulid::Ulid::new().to_string();
618        let lease_ttl_secs = state.exec_config.lease_ttl_secs;
619        match state
620            .persistence
621            .acquire_lease(&state.workflow_run_id, &token, lease_ttl_secs)
622        {
623            Ok(Some(gen)) => {
624                state.owner_token = Some(token);
625                state.lease_generation = Some(gen);
626            }
627            Ok(None) => return Err(EngineError::AlreadyOwned(state.workflow_run_id.clone())),
628            Err(e) => return Err(e),
629        }
630
631        let steps = state
632            .persistence
633            .get_steps(&state.workflow_run_id)
634            .map_err(|e| {
635                EngineError::Workflow(format!(
636                    "resume: failed to load steps for run '{}': {e}",
637                    state.workflow_run_id
638                ))
639            })?;
640        let mut step_map: HashMap<String, HashMap<u32, WorkflowRunStep>> = HashMap::new();
641        for s in steps
642            .into_iter()
643            .filter(|s| s.status == crate::status::WorkflowStepStatus::Completed)
644        {
645            step_map
646                .entry(s.step_name.clone())
647                .or_default()
648                .insert(s.iteration as u32, s);
649        }
650        if !step_map.is_empty() {
651            state.resume_ctx = Some(crate::engine::ResumeContext { step_map });
652        }
653        self.run(def, state)
654    }
655
656    /// Cancel a running workflow by run ID.
657    ///
658    /// Marks the DB run as `Cancelling`, signals the in-memory token so the engine
659    /// halts at the next step boundary, and fire-and-forgets `executor.cancel()`
660    /// for the step currently in flight.
661    ///
662    /// Returns `Err` if the run is not currently active in this engine instance.
663    pub fn cancel_run(
664        &self,
665        run_id: &str,
666        reason: CancellationReason,
667    ) -> crate::engine_error::Result<()> {
668        // Pull all per-run state out in a single lock.
669        let entry = {
670            let runs = self.active_runs.lock().unwrap_or_else(|e| e.into_inner());
671            runs.get(run_id).map(|e| {
672                (
673                    e.token.clone(),
674                    Arc::clone(&e.shutdown),
675                    Arc::clone(&e.persistence),
676                    Arc::clone(&e.registry),
677                    Arc::clone(&e.exec_info),
678                    Arc::clone(&e.refresh_stop),
679                    e.refresh_thread.clone(),
680                )
681            })
682        };
683
684        let (token, shutdown, persistence, registry, exec_info, refresh_stop, refresh_thread) =
685            match entry {
686                Some(e) => e,
687                None => {
688                    return Err(EngineError::Workflow(format!(
689                        "cancel_run: run '{run_id}' is not active in this engine instance"
690                    )))
691                }
692            };
693
694        // Mark DB as Cancelling so cross-process engines also observe the signal.
695        if let Err(e) =
696            persistence.update_run_status(run_id, WorkflowRunStatus::Cancelling, None, None)
697        {
698            tracing::warn!("cancel_run: failed to mark run {run_id} as Cancelling in DB: {e}");
699        }
700
701        // Set the executor shutdown flag so the in-flight step stops promptly.
702        shutdown.store(true, Ordering::SeqCst);
703
704        // Signal the cancellation token so the engine halts at the next step boundary.
705        token.cancel(reason);
706
707        // Fire-and-forget: call executor.cancel() on the currently running step, if any.
708        let exec_snap = exec_info.lock().unwrap_or_else(|e| e.into_inner()).clone();
709        if let Some((exec_label, step_id)) = exec_snap {
710            std::thread::spawn(move || {
711                if let Err(e) = registry.cancel(&exec_label, &step_id) {
712                    tracing::warn!(
713                        "cancel_run: executor.cancel() for '{exec_label}' step '{step_id}' failed: {e}"
714                    );
715                }
716            });
717        }
718
719        // Stop the refresh thread (does not join — run() teardown handles the join).
720        stop_refresh_thread(&refresh_stop, refresh_thread.as_ref());
721
722        Ok(())
723    }
724
725    /// Inner validation implementation. Accepts explicit registry references so
726    /// both `validate()` (uses builder registries) and `run()` (uses execution
727    /// state registries) can call the same logic without risk of divergence.
728    fn validate_with_registries(
729        &self,
730        action_registry: &ActionRegistry,
731        item_provider_registry: &ItemProviderRegistry,
732        gate_resolver_registry: &GateResolverRegistry,
733        def: &WorkflowDef,
734    ) -> Result<(), Vec<ValidationError>> {
735        let mut errors = Vec::new();
736
737        // Cycle / depth detection — only when a workflow resolver is configured.
738        // Without a resolver we cannot traverse sub-workflows, so we degrade gracefully.
739        if let Some(resolver) = &self.workflow_resolver {
740            let r = Arc::clone(resolver);
741            let root_name = def.name.clone();
742            // Inject the root def so detect_workflow_cycles can resolve it by name.
743            let cycle_loader = |name: &str| -> std::result::Result<WorkflowDef, String> {
744                if name == root_name.as_str() {
745                    Ok(def.clone())
746                } else {
747                    r.resolve(name)
748                        .map(|arc_def| (*arc_def).clone())
749                        .map_err(|e| e.to_string())
750                }
751            };
752            if let Err(cycle_msg) = detect_workflow_cycles(&def.name, &cycle_loader) {
753                errors.push(ValidationError {
754                    message: cycle_msg,
755                    hint: None,
756                });
757            }
758        }
759
760        let ctx = ValidateCtx {
761            action_registry,
762            item_provider_registry,
763            gate_resolver_registry,
764            workflow_resolver: &self.workflow_resolver,
765        };
766        let mut visited: HashSet<String> = HashSet::new();
767        validate_workflow_sections(&ctx, &def.body, &def.always, &mut errors, &mut visited);
768
769        if errors.is_empty() {
770            Ok(())
771        } else {
772            Err(errors)
773        }
774    }
775}
776
777struct ValidateCtx<'a> {
778    action_registry: &'a ActionRegistry,
779    item_provider_registry: &'a ItemProviderRegistry,
780    gate_resolver_registry: &'a GateResolverRegistry,
781    workflow_resolver: &'a Option<Arc<dyn WorkflowResolver>>,
782}
783
784fn validate_workflow_sections(
785    ctx: &ValidateCtx<'_>,
786    body: &[WorkflowNode],
787    always: &[WorkflowNode],
788    errors: &mut Vec<ValidationError>,
789    visited: &mut HashSet<String>,
790) {
791    validate_nodes_impl(ctx, body, errors, visited);
792    validate_nodes_impl(ctx, always, errors, visited);
793}
794
795fn validate_nodes_impl(
796    ctx: &ValidateCtx<'_>,
797    nodes: &[WorkflowNode],
798    errors: &mut Vec<ValidationError>,
799    visited: &mut HashSet<String>,
800) {
801    for node in nodes {
802        match node {
803            WorkflowNode::Call(n) => {
804                let name = n.agent.label();
805                if !ctx.action_registry.has_action(name) {
806                    errors.push(ValidationError {
807                        message: format!(
808                            "call '{}': no registered ActionExecutor for '{}'",
809                            n.agent.step_key(),
810                            name
811                        ),
812                        hint: Some(format!(
813                            "register an executor named '{}' or add a fallback executor",
814                            name
815                        )),
816                    });
817                }
818            }
819            WorkflowNode::Parallel(n) => {
820                for agent_ref in &n.calls {
821                    let name = agent_ref.label();
822                    if !ctx.action_registry.has_action(name) {
823                        errors.push(ValidationError {
824                            message: format!(
825                                "parallel call '{}': no registered ActionExecutor for '{}'",
826                                agent_ref.step_key(),
827                                name
828                            ),
829                            hint: Some(format!(
830                                "register an executor named '{}' or add a fallback executor",
831                                name
832                            )),
833                        });
834                    }
835                }
836            }
837            WorkflowNode::ForEach(n) => {
838                if ctx.item_provider_registry.get(&n.over).is_none() {
839                    errors.push(ValidationError {
840                        message: format!(
841                            "foreach '{}': no registered ItemProvider for '{}'",
842                            n.name, n.over
843                        ),
844                        hint: Some(format!(
845                            "register a provider with name '{}' via FlowEngineBuilder::item_provider()",
846                            n.over
847                        )),
848                    });
849                }
850            }
851            WorkflowNode::Gate(n) => {
852                // QualityGate is evaluated inline and never goes through a GateResolver.
853                if n.gate_type != QUALITY_GATE_TYPE {
854                    let type_str = n.gate_type.as_str();
855                    if !ctx.gate_resolver_registry.has_type(type_str) {
856                        errors.push(ValidationError {
857                            message: format!(
858                                "gate '{}': no registered GateResolver for type '{}'",
859                                n.name, type_str
860                            ),
861                            hint: Some(format!(
862                                "register a resolver with gate_type() == '{}' via FlowEngineBuilder::gate_resolver()",
863                                type_str
864                            )),
865                        });
866                    }
867                }
868            }
869            WorkflowNode::CallWorkflow(n) => {
870                if !visited.contains(&n.workflow) {
871                    visited.insert(n.workflow.clone());
872                    if let Some(resolver) = ctx.workflow_resolver {
873                        match resolver.resolve(&n.workflow).map(|d| (*d).clone()) {
874                            Ok(sub_def) => {
875                                let mut sub_errors = Vec::new();
876                                validate_workflow_sections(
877                                    ctx,
878                                    &sub_def.body,
879                                    &sub_def.always,
880                                    &mut sub_errors,
881                                    visited,
882                                );
883                                for sub_err in sub_errors {
884                                    errors.push(ValidationError {
885                                        message: format!(
886                                            "in sub-workflow '{}': {}",
887                                            n.workflow, sub_err.message
888                                        ),
889                                        hint: sub_err.hint,
890                                    });
891                                }
892                            }
893                            Err(e) => {
894                                // Report every load failure so all missing sub-workflows
895                                // surface in one pass, not just the first one.
896                                errors.push(ValidationError {
897                                    message: format!(
898                                        "call workflow '{}': sub-workflow could not be loaded: {}",
899                                        n.workflow, e
900                                    ),
901                                    hint: None,
902                                });
903                            }
904                        }
905                    }
906                }
907            }
908            _ => {
909                if let Some(body) = node.body() {
910                    validate_nodes_impl(ctx, body, errors, visited);
911                }
912            }
913        }
914    }
915}
916
917// ---------------------------------------------------------------------------
918// FlowEngineBuilder
919// ---------------------------------------------------------------------------
920
921/// Builder for constructing a [`FlowEngine`].
922///
923/// Register action executors with `.action()` / `.action_fallback()`,
924/// foreach item sources with `.item_provider()`, gate resolvers with
925/// `.gate_resolver()`, and environment injection with `.script_env_provider()`.
926pub struct FlowEngineBuilder {
927    named: HashMap<String, Box<dyn ActionExecutor>>,
928    fallback: Option<Box<dyn ActionExecutor>>,
929    script_env_provider: Box<dyn ScriptEnvProvider>,
930    item_providers: ItemProviderRegistry,
931    gate_resolvers: GateResolverRegistry,
932    workflow_resolver: Option<Box<dyn WorkflowResolver>>,
933    event_sinks: Vec<Arc<dyn EventSink>>,
934}
935
936impl FlowEngineBuilder {
937    pub fn new() -> Self {
938        Self {
939            named: HashMap::new(),
940            fallback: None,
941            script_env_provider: Box::new(NoOpScriptEnvProvider),
942            item_providers: ItemProviderRegistry::new(),
943            gate_resolvers: GateResolverRegistry::new(),
944            workflow_resolver: None,
945            event_sinks: Vec::new(),
946        }
947    }
948
949    /// Register a named executor. The executor's `name()` is used as the lookup key.
950    #[allow(dead_code)]
951    pub fn action(mut self, executor: Box<dyn ActionExecutor>) -> Self {
952        self.named.insert(executor.name().to_string(), executor);
953        self
954    }
955
956    /// Register the fallback (catch-all) executor.
957    ///
958    /// Returns `Err` if called more than once — only one fallback is allowed.
959    pub fn action_fallback(
960        mut self,
961        executor: Box<dyn ActionExecutor>,
962    ) -> Result<Self, EngineError> {
963        if self.fallback.is_some() {
964            return Err(EngineError::Workflow(
965                "action_fallback already set — only one fallback executor is allowed".to_string(),
966            ));
967        }
968        self.fallback = Some(executor);
969        Ok(self)
970    }
971
972    /// Register an item provider for foreach fan-outs.
973    pub fn item_provider<P: ItemProvider + 'static>(mut self, provider: P) -> Self {
974        self.item_providers.register(provider);
975        self
976    }
977
978    /// Register a gate resolver for a specific gate type.
979    pub fn gate_resolver<R: GateResolver + 'static>(mut self, resolver: R) -> Self {
980        self.gate_resolvers.register(resolver);
981        self
982    }
983
984    /// Set the script env provider. Defaults to `NoOpScriptEnvProvider`.
985    pub fn script_env_provider(mut self, provider: Box<dyn ScriptEnvProvider>) -> Self {
986        self.script_env_provider = provider;
987        self
988    }
989
990    /// Convenience: register a `DirectoryWorkflowResolver` rooted at `path`.
991    ///
992    /// `FlowEngine::validate()` will read `<path>/<name>.wf` on each `call workflow`
993    /// node it encounters. Re-reads on every call so hot-reload is preserved.
994    pub fn workflow_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
995        self.workflow_resolver = Some(Box::new(DirectoryWorkflowResolver::new(path)));
996        self
997    }
998
999    /// Set a custom `WorkflowResolver` for sub-workflow validation and cycle detection.
1000    ///
1001    /// Overrides any previous `.workflow_dir()` call.
1002    pub fn workflow_resolver(mut self, resolver: Box<dyn WorkflowResolver>) -> Self {
1003        self.workflow_resolver = Some(resolver);
1004        self
1005    }
1006
1007    /// Register an event sink. Multiple calls register multiple sinks; events are
1008    /// emitted to all sinks in registration order.
1009    pub fn event_sink(mut self, sink: Box<dyn EventSink>) -> Self {
1010        self.event_sinks.push(Arc::from(sink));
1011        self
1012    }
1013
1014    /// Register multiple event sinks from an existing `Arc<[Arc<dyn EventSink>]>`.
1015    /// Sinks are appended in slice order after any already registered.
1016    pub fn with_event_sinks(mut self, sinks: &Arc<[Arc<dyn EventSink>]>) -> Self {
1017        self.event_sinks.extend(sinks.iter().cloned());
1018        self
1019    }
1020
1021    /// Consume the builder and produce a [`FlowEngine`].
1022    pub fn build(self) -> Result<FlowEngine, EngineError> {
1023        Ok(FlowEngine {
1024            action_registry: ActionRegistry::new(self.named, self.fallback),
1025            item_provider_registry: self.item_providers,
1026            gate_resolver_registry: self.gate_resolvers,
1027            script_env_provider: Arc::from(self.script_env_provider),
1028            workflow_resolver: self.workflow_resolver.map(Arc::from),
1029            event_sinks: self.event_sinks,
1030            active_runs: Mutex::new(HashMap::new()),
1031        })
1032    }
1033}
1034
1035impl Default for FlowEngineBuilder {
1036    fn default() -> Self {
1037        Self::new()
1038    }
1039}
1040
1041impl Drop for FlowEngine {
1042    fn drop(&mut self) {
1043        let entries: Vec<ActiveRunEntry> = {
1044            let mut guard = self.active_runs.lock().unwrap_or_else(|e| e.into_inner());
1045            guard.drain().map(|(_, e)| e).collect()
1046        };
1047        for entry in entries {
1048            // Stop the refresh thread first so it exits before we cancel the token.
1049            stop_refresh_thread(&entry.refresh_stop, entry.refresh_thread.as_ref());
1050            // Dropping refresh_handle detaches the thread; no join in Drop to avoid deadlock.
1051            entry.shutdown.store(true, Ordering::SeqCst);
1052            entry.token.cancel(CancellationReason::EngineShutdown);
1053        }
1054    }
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059    use super::*;
1060    use crate::dsl::{
1061        ApprovalMode, CallWorkflowNode, ForEachNode, GateNode, OnChildFail, OnCycle, OnTimeout,
1062        QUALITY_GATE_TYPE,
1063    };
1064    use crate::engine_error::EngineError;
1065    use crate::test_helpers::{
1066        call_node, make_def, make_params, make_run_ctx, make_step_info, ForwardSink, VecSink,
1067    };
1068    use crate::traits::action_executor::{ActionOutput, ActionParams, StepInfo};
1069    use crate::traits::gate_resolver::{GateParams, GatePoll};
1070    use crate::traits::item_provider::{FanOutItem, ProviderInfo};
1071    use crate::traits::run_context::RunContext;
1072    use crate::workflow_resolver_memory::InMemoryWorkflowResolver;
1073    use std::collections::HashMap;
1074
1075    // --- test executors / providers / resolvers ---
1076
1077    struct AlphaExecutor;
1078    impl ActionExecutor for AlphaExecutor {
1079        fn name(&self) -> &str {
1080            "alpha"
1081        }
1082        fn execute(
1083            &self,
1084            _ctx: &dyn RunContext,
1085            _info: &StepInfo,
1086            _params: &ActionParams,
1087        ) -> Result<ActionOutput, EngineError> {
1088            Ok(ActionOutput {
1089                markers: vec!["alpha".to_string()],
1090                ..Default::default()
1091            })
1092        }
1093    }
1094
1095    struct BetaExecutor;
1096    impl ActionExecutor for BetaExecutor {
1097        fn name(&self) -> &str {
1098            "beta"
1099        }
1100        fn execute(
1101            &self,
1102            _ctx: &dyn RunContext,
1103            _info: &StepInfo,
1104            _params: &ActionParams,
1105        ) -> Result<ActionOutput, EngineError> {
1106            Ok(ActionOutput {
1107                markers: vec!["beta".to_string()],
1108                ..Default::default()
1109            })
1110        }
1111    }
1112
1113    struct CountingExecutor {
1114        name: &'static str,
1115        count: Arc<std::sync::atomic::AtomicUsize>,
1116    }
1117    impl ActionExecutor for CountingExecutor {
1118        fn name(&self) -> &str {
1119            self.name
1120        }
1121        fn execute(
1122            &self,
1123            _: &dyn RunContext,
1124            _: &StepInfo,
1125            _: &ActionParams,
1126        ) -> Result<ActionOutput, EngineError> {
1127            self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1128            Ok(ActionOutput::default())
1129        }
1130    }
1131
1132    fn make_test_run(
1133        p: &Arc<crate::persistence_memory::InMemoryWorkflowPersistence>,
1134    ) -> crate::types::WorkflowRun {
1135        use crate::traits::persistence::{NewRun, WorkflowPersistence};
1136        p.create_run(NewRun {
1137            workflow_name: "wf".to_string(),
1138            parent_run_id: String::new(),
1139            dry_run: false,
1140            trigger: "manual".to_string(),
1141            definition_snapshot: None,
1142            parent_workflow_run_id: None,
1143        })
1144        .unwrap()
1145    }
1146
1147    /// Build an `ExecutionState` wired with two `CountingExecutor`s (alpha, beta)
1148    /// and return the counters alongside the state.
1149    fn make_counting_state(
1150        persistence: Arc<crate::persistence_memory::InMemoryWorkflowPersistence>,
1151        run_id: String,
1152    ) -> (
1153        Arc<std::sync::atomic::AtomicUsize>,
1154        Arc<std::sync::atomic::AtomicUsize>,
1155        crate::engine::ExecutionState,
1156    ) {
1157        let alpha_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1158        let beta_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1159        let mut m = HashMap::new();
1160        m.insert(
1161            "alpha".to_string(),
1162            Box::new(CountingExecutor {
1163                name: "alpha",
1164                count: Arc::clone(&alpha_count),
1165            }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
1166        );
1167        m.insert(
1168            "beta".to_string(),
1169            Box::new(CountingExecutor {
1170                name: "beta",
1171                count: Arc::clone(&beta_count),
1172            }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
1173        );
1174        let mut state = make_bare_state("wf");
1175        state.persistence = persistence;
1176        state.action_registry = Arc::new(ActionRegistry::new(m, None));
1177        state.workflow_run_id = run_id;
1178        (alpha_count, beta_count, state)
1179    }
1180
1181    struct TicketsProvider;
1182    impl crate::traits::item_provider::ItemProvider for TicketsProvider {
1183        fn name(&self) -> &str {
1184            "tickets"
1185        }
1186        fn items(
1187            &self,
1188            _ctx: &dyn RunContext,
1189            _info: &ProviderInfo,
1190            _scope: Option<&dyn std::any::Any>,
1191            _filter: &HashMap<String, String>,
1192        ) -> Result<Vec<FanOutItem>, EngineError> {
1193            Ok(vec![])
1194        }
1195    }
1196
1197    struct HumanApprovalResolver;
1198    impl crate::traits::gate_resolver::GateResolver for HumanApprovalResolver {
1199        fn gate_type(&self) -> &str {
1200            "human_approval"
1201        }
1202        fn poll(
1203            &self,
1204            _run_id: &str,
1205            _params: &GateParams,
1206            _ctx: &dyn RunContext,
1207        ) -> Result<GatePoll, EngineError> {
1208            Ok(GatePoll::Approved(None))
1209        }
1210    }
1211
1212    // --- helpers ---
1213
1214    fn foreach_node(step: &str, over: &str) -> WorkflowNode {
1215        WorkflowNode::ForEach(ForEachNode {
1216            name: step.to_string(),
1217            over: over.to_string(),
1218            scope: None,
1219            filter: HashMap::new(),
1220            ordered: false,
1221            on_cycle: OnCycle::Fail,
1222            max_parallel: 1,
1223            workflow: "child_wf".to_string(),
1224            inputs: HashMap::new(),
1225            on_child_fail: OnChildFail::Halt,
1226        })
1227    }
1228
1229    fn gate_node(name: &str, gate_type: &str) -> WorkflowNode {
1230        WorkflowNode::Gate(GateNode {
1231            name: name.to_string(),
1232            gate_type: gate_type.to_string(),
1233            prompt: None,
1234            min_approvals: 1,
1235            approval_mode: ApprovalMode::default(),
1236            timeout_secs: 0,
1237            on_timeout: OnTimeout::Fail,
1238            as_identity: None,
1239            quality_gate: None,
1240            options: None,
1241        })
1242    }
1243
1244    // --- existing FlowEngineBuilder tests (now produce FlowEngine) ---
1245
1246    #[test]
1247    fn build_with_named_executor() {
1248        let engine = FlowEngineBuilder::new()
1249            .action(Box::new(AlphaExecutor))
1250            .build()
1251            .unwrap();
1252        let ctx = make_run_ctx();
1253        let info = make_step_info();
1254        let output = engine
1255            .action_registry
1256            .dispatch("alpha", ctx.as_ref(), &info, &make_params("alpha"))
1257            .unwrap();
1258        assert_eq!(output.markers, vec!["alpha"]);
1259    }
1260
1261    #[test]
1262    fn build_with_fallback() {
1263        let engine = FlowEngineBuilder::new()
1264            .action_fallback(Box::new(BetaExecutor))
1265            .unwrap()
1266            .build()
1267            .unwrap();
1268        let ctx = make_run_ctx();
1269        let info = make_step_info();
1270        let output = engine
1271            .action_registry
1272            .dispatch("anything", ctx.as_ref(), &info, &make_params("anything"))
1273            .unwrap();
1274        assert_eq!(output.markers, vec!["beta"]);
1275    }
1276
1277    #[test]
1278    fn named_takes_precedence_over_fallback() {
1279        let engine = FlowEngineBuilder::new()
1280            .action(Box::new(AlphaExecutor))
1281            .action_fallback(Box::new(BetaExecutor))
1282            .unwrap()
1283            .build()
1284            .unwrap();
1285        let ctx = make_run_ctx();
1286        let info = make_step_info();
1287        let output = engine
1288            .action_registry
1289            .dispatch("alpha", ctx.as_ref(), &info, &make_params("alpha"))
1290            .unwrap();
1291        assert_eq!(output.markers, vec!["alpha"]);
1292    }
1293
1294    #[test]
1295    fn second_action_fallback_returns_err() {
1296        let result = FlowEngineBuilder::new()
1297            .action_fallback(Box::new(AlphaExecutor))
1298            .unwrap()
1299            .action_fallback(Box::new(BetaExecutor));
1300        assert!(result.is_err(), "second action_fallback should return Err");
1301    }
1302
1303    #[test]
1304    fn custom_script_env_provider_is_stored_in_bundle() {
1305        struct FixedEnvProvider;
1306        impl ScriptEnvProvider for FixedEnvProvider {
1307            fn env(
1308                &self,
1309                _ctx: &dyn RunContext,
1310                _as_identity: Option<&str>,
1311            ) -> HashMap<String, String> {
1312                let mut m = HashMap::new();
1313                m.insert("CUSTOM_VAR".to_string(), "42".to_string());
1314                m
1315            }
1316        }
1317
1318        let engine = FlowEngineBuilder::new()
1319            .script_env_provider(Box::new(FixedEnvProvider))
1320            .build()
1321            .unwrap();
1322
1323        let env = engine
1324            .script_env_provider
1325            .env(&crate::traits::run_context::NoopRunContext::default(), None);
1326        assert_eq!(env.get("CUSTOM_VAR").map(String::as_str), Some("42"));
1327    }
1328
1329    // --- validate() acceptance criteria ---
1330
1331    // AC1: missing action name produces error
1332    #[test]
1333    fn validate_missing_action_name_produces_error() {
1334        let def = make_def("wf", vec![call_node("missing_agent")]);
1335        let engine = FlowEngineBuilder::new().build().unwrap();
1336
1337        let errors = engine.validate(&def).unwrap_err();
1338        assert!(
1339            !errors.is_empty(),
1340            "expected at least one error for missing action"
1341        );
1342        assert!(
1343            errors.iter().any(|e| e.message.contains("missing_agent")),
1344            "error should name the missing executor; got: {:?}",
1345            errors
1346        );
1347    }
1348
1349    // AC2: missing item provider produces error
1350    #[test]
1351    fn validate_missing_item_provider_produces_error() {
1352        let def = make_def("wf", vec![foreach_node("items", "tickets")]);
1353        let engine = FlowEngineBuilder::new().build().unwrap();
1354
1355        let errors = engine.validate(&def).unwrap_err();
1356        assert!(
1357            errors.iter().any(|e| e.message.contains("tickets")),
1358            "error should mention the missing provider name; got: {:?}",
1359            errors
1360        );
1361    }
1362
1363    // AC3: missing gate type produces error
1364    #[test]
1365    fn validate_missing_gate_type_produces_error() {
1366        let def = make_def("wf", vec![gate_node("approval", "human_approval")]);
1367        let engine = FlowEngineBuilder::new().build().unwrap();
1368
1369        let errors = engine.validate(&def).unwrap_err();
1370        assert!(
1371            errors.iter().any(|e| e.message.contains("human_approval")),
1372            "error should mention the missing gate type; got: {:?}",
1373            errors
1374        );
1375    }
1376
1377    // AC3b: QualityGate is excluded from resolver checks
1378    #[test]
1379    fn validate_quality_gate_does_not_require_resolver() {
1380        use crate::dsl::{GateNode, OnFailAction, OnTimeout, QualityGateConfig};
1381        let gate = WorkflowNode::Gate(GateNode {
1382            name: "qg".to_string(),
1383            gate_type: QUALITY_GATE_TYPE.to_string(),
1384            prompt: None,
1385            min_approvals: 1,
1386            approval_mode: ApprovalMode::default(),
1387            timeout_secs: 0,
1388            on_timeout: OnTimeout::Fail,
1389            as_identity: None,
1390            quality_gate: Some(QualityGateConfig {
1391                source: "step1".to_string(),
1392                threshold: 80,
1393                on_fail_action: OnFailAction::Fail,
1394            }),
1395            options: None,
1396        });
1397        // Also need call step1 to be produced, but validate() only checks harness
1398        // registrations, not semantic step ordering — so just test the gate alone.
1399        let def = make_def("wf", vec![gate]);
1400        let engine = FlowEngineBuilder::new().build().unwrap();
1401        // No gate resolver registered — but QualityGate must not trigger an error.
1402        let result = engine.validate(&def);
1403        // QualityGate check should not produce a gate-resolver error.
1404        // (There may be no errors at all if no actions are referenced.)
1405        if let Err(errors) = result {
1406            assert!(
1407                !errors.iter().any(|e| e.message.contains("quality_gate")),
1408                "QualityGate should not produce a resolver error; got: {:?}",
1409                errors
1410            );
1411        }
1412    }
1413
1414    // AC4: valid workflow with all registrations passes
1415    #[test]
1416    fn validate_valid_workflow_passes() {
1417        let def = make_def(
1418            "wf",
1419            vec![
1420                call_node("alpha"),
1421                foreach_node("items", "tickets"),
1422                gate_node("approval", "human_approval"),
1423            ],
1424        );
1425        let engine = FlowEngineBuilder::new()
1426            .action(Box::new(AlphaExecutor))
1427            .item_provider(TicketsProvider)
1428            .gate_resolver(HumanApprovalResolver)
1429            .build()
1430            .unwrap();
1431
1432        assert!(
1433            engine.validate(&def).is_ok(),
1434            "all registrations present — validation should pass"
1435        );
1436    }
1437
1438    // Open-registry positive case: a custom gate type with a registered resolver passes validation
1439    #[test]
1440    fn validate_open_registry_custom_gate_type_accepted() {
1441        struct SlackReactionResolver;
1442        impl crate::traits::gate_resolver::GateResolver for SlackReactionResolver {
1443            fn gate_type(&self) -> &str {
1444                "slack_reaction"
1445            }
1446            fn poll(
1447                &self,
1448                _run_id: &str,
1449                _params: &GateParams,
1450                _ctx: &dyn RunContext,
1451            ) -> Result<GatePoll, EngineError> {
1452                Ok(GatePoll::Approved(None))
1453            }
1454        }
1455
1456        let def = make_def("wf", vec![gate_node("notify", "slack_reaction")]);
1457        let engine = FlowEngineBuilder::new()
1458            .gate_resolver(SlackReactionResolver)
1459            .build()
1460            .unwrap();
1461
1462        assert!(
1463            engine.validate(&def).is_ok(),
1464            "registered slack_reaction resolver should satisfy validation"
1465        );
1466    }
1467
1468    // Unregistered type still fails validation
1469    #[test]
1470    fn validate_unregistered_gate_type_produces_error() {
1471        let def = make_def("wf", vec![gate_node("g", "fictional_type")]);
1472        let engine = FlowEngineBuilder::new().build().unwrap();
1473
1474        let errors = engine.validate(&def).unwrap_err();
1475        assert!(
1476            errors.iter().any(|e| e.message.contains("fictional_type")),
1477            "error should mention the unregistered gate type; got: {:?}",
1478            errors
1479        );
1480    }
1481
1482    // AC5: sub-workflow validation errors surface with path prefix
1483    #[test]
1484    fn validate_sub_workflow_errors_have_path_prefix() {
1485        let sub_def = make_def("sub_wf", vec![call_node("missing_in_sub")]);
1486        let engine = FlowEngineBuilder::new()
1487            .workflow_resolver(Box::new(InMemoryWorkflowResolver::new([(
1488                "sub_wf", sub_def,
1489            )])))
1490            .build()
1491            .unwrap();
1492
1493        let root_def = make_def(
1494            "root",
1495            vec![WorkflowNode::CallWorkflow(CallWorkflowNode {
1496                workflow: "sub_wf".to_string(),
1497                inputs: HashMap::new(),
1498                retries: 0,
1499                on_fail: None,
1500                as_identity: None,
1501            })],
1502        );
1503
1504        let errors = engine.validate(&root_def).unwrap_err();
1505        assert!(
1506            errors
1507                .iter()
1508                .any(|e| e.message.contains("in sub-workflow") && e.message.contains("sub_wf")),
1509            "sub-workflow errors should be prefixed with the sub-workflow name; got: {:?}",
1510            errors
1511        );
1512        assert!(
1513            errors.iter().any(|e| e.message.contains("missing_in_sub")),
1514            "error should mention the missing executor from the sub-workflow; got: {:?}",
1515            errors
1516        );
1517    }
1518
1519    // AC6: cycle detection triggers ValidationError
1520    #[test]
1521    fn validate_cycle_detection_triggers_error() {
1522        // A workflow that calls itself creates a cycle.
1523        let cycle_def = make_def(
1524            "cycle_wf",
1525            vec![WorkflowNode::CallWorkflow(CallWorkflowNode {
1526                workflow: "cycle_wf".to_string(),
1527                inputs: HashMap::new(),
1528                retries: 0,
1529                on_fail: None,
1530                as_identity: None,
1531            })],
1532        );
1533        let engine = FlowEngineBuilder::new()
1534            .workflow_resolver(Box::new(InMemoryWorkflowResolver::new([(
1535                "cycle_wf",
1536                cycle_def.clone(),
1537            )])))
1538            .build()
1539            .unwrap();
1540
1541        let errors = engine.validate(&cycle_def).unwrap_err();
1542        assert!(
1543            errors
1544                .iter()
1545                .any(|e| e.message.contains("Circular") || e.message.contains("cycle")),
1546            "cycle detection should produce an error; got: {:?}",
1547            errors
1548        );
1549    }
1550
1551    // AC-new-1: WorkflowNotFound error when resolver misses a sub-workflow
1552    #[test]
1553    fn resolver_returns_not_found_error_for_missing_sub_workflow() {
1554        let engine = FlowEngineBuilder::new()
1555            .workflow_resolver(Box::new(InMemoryWorkflowResolver::new(
1556                [] as [(String, WorkflowDef); 0]
1557            )))
1558            .build()
1559            .unwrap();
1560
1561        let root_def = make_def(
1562            "root",
1563            vec![WorkflowNode::CallWorkflow(CallWorkflowNode {
1564                workflow: "missing_sub".to_string(),
1565                inputs: HashMap::new(),
1566                retries: 0,
1567                on_fail: None,
1568                as_identity: None,
1569            })],
1570        );
1571
1572        let errors = engine.validate(&root_def).unwrap_err();
1573        assert!(
1574            errors.iter().any(|e| e.message.contains("missing_sub")),
1575            "error should mention the missing sub-workflow name; got: {:?}",
1576            errors
1577        );
1578    }
1579
1580    // AC-new-2: InMemoryWorkflowResolver alone (no filesystem) is sufficient
1581    #[test]
1582    fn inmemory_resolver_sufficient_for_full_validation() {
1583        let sub_def = make_def("sub_wf", vec![call_node("alpha")]);
1584        let engine = FlowEngineBuilder::new()
1585            .action(Box::new(AlphaExecutor))
1586            .workflow_resolver(Box::new(InMemoryWorkflowResolver::new([(
1587                "sub_wf", sub_def,
1588            )])))
1589            .build()
1590            .unwrap();
1591
1592        let root_def = make_def(
1593            "root",
1594            vec![WorkflowNode::CallWorkflow(CallWorkflowNode {
1595                workflow: "sub_wf".to_string(),
1596                inputs: HashMap::new(),
1597                retries: 0,
1598                on_fail: None,
1599                as_identity: None,
1600            })],
1601        );
1602
1603        assert!(
1604            engine.validate(&root_def).is_ok(),
1605            "InMemoryWorkflowResolver alone should be sufficient for full validation"
1606        );
1607    }
1608
1609    // Builds a minimal ExecutionState with empty registries for run() tests.
1610    fn make_bare_state(wf_name: &str) -> crate::engine::ExecutionState {
1611        use crate::cancellation::CancellationToken;
1612        use crate::engine::ExecutionState;
1613        use crate::persistence_memory::InMemoryWorkflowPersistence;
1614        use crate::traits::run_context::NoopRunContext;
1615        use crate::traits::script_env_provider::NoOpScriptEnvProvider;
1616        use crate::types::WorkflowExecConfig;
1617        let persistence = InMemoryWorkflowPersistence::new();
1618        persistence.seed_run("test-run");
1619        ExecutionState {
1620            persistence: Arc::new(persistence),
1621            action_registry: Arc::new(ActionRegistry::new(HashMap::new(), None)),
1622            script_env_provider: Arc::new(NoOpScriptEnvProvider),
1623            workflow_run_id: "test-run".to_string(),
1624            workflow_name: wf_name.to_string(),
1625            run_ctx: Arc::new(NoopRunContext::default())
1626                as Arc<dyn crate::traits::run_context::RunContext>,
1627            extra_plugin_dirs: vec![],
1628            model: None,
1629            exec_config: WorkflowExecConfig::default(),
1630            inputs: HashMap::new(),
1631            parent_run_id: String::new(),
1632            depth: 0,
1633            target_label: None,
1634            step_results: HashMap::new(),
1635            contexts: vec![],
1636            position: 0,
1637            all_succeeded: true,
1638            total_cost: 0.0,
1639            total_turns: 0,
1640            total_duration_ms: 0,
1641            total_input_tokens: 0,
1642            total_output_tokens: 0,
1643            total_cache_read_input_tokens: 0,
1644            total_cache_creation_input_tokens: 0,
1645            has_llm_metrics: false,
1646            last_gate_feedback: None,
1647            block_output: None,
1648            block_with: vec![],
1649            resume_ctx: None,
1650            default_as_identity: None,
1651            triggered_by_hook: false,
1652            schema_resolver: None,
1653            child_runner: None,
1654            last_heartbeat_at: ExecutionState::new_heartbeat(),
1655            registry: Arc::new(ItemProviderRegistry::new()),
1656            event_sinks: Arc::from(vec![]),
1657            cancellation: CancellationToken::new(),
1658            current_execution_id: Arc::new(std::sync::Mutex::new(None)),
1659            owner_token: None,
1660            lease_generation: None,
1661        }
1662    }
1663
1664    // AC7a: run() validates against state action registry, not engine registry
1665    // Engine has "alpha" but ExecutionState doesn't — run() must reject.
1666    #[test]
1667    fn run_validates_against_state_registries_not_engine() {
1668        let def = make_def("wf", vec![call_node("alpha")]);
1669        // Engine has "alpha" registered — validate() on the engine itself would pass.
1670        let engine = FlowEngineBuilder::new()
1671            .action(Box::new(AlphaExecutor))
1672            .build()
1673            .unwrap();
1674        assert!(
1675            engine.validate(&def).is_ok(),
1676            "engine validate() should pass"
1677        );
1678
1679        // But ExecutionState has no actions — run() must catch the divergence.
1680        let mut state = make_bare_state("wf");
1681
1682        let result = engine.run(&def, &mut state);
1683        assert!(
1684            result.is_err(),
1685            "run() must reject when state action registry lacks 'alpha'"
1686        );
1687        assert_eq!(state.position, 0, "no side effects on rejection");
1688    }
1689
1690    // AC7b: run() validates against state item-provider registry, not engine registry
1691    // Engine has "tickets" provider but ExecutionState doesn't — run() must reject.
1692    #[test]
1693    fn run_validates_item_provider_against_state_registry_not_engine() {
1694        let def = make_def("wf", vec![foreach_node("items", "tickets")]);
1695        // Engine has "tickets" registered — validate() on the engine itself would pass.
1696        let engine = FlowEngineBuilder::new()
1697            .item_provider(TicketsProvider)
1698            .build()
1699            .unwrap();
1700        assert!(
1701            engine.validate(&def).is_ok(),
1702            "engine validate() should pass for tickets provider"
1703        );
1704
1705        // ExecutionState has no item providers — run() must catch the divergence.
1706        let mut state = make_bare_state("wf");
1707
1708        let result = engine.run(&def, &mut state);
1709        assert!(
1710            result.is_err(),
1711            "run() must reject when state item-provider registry lacks 'tickets'"
1712        );
1713        assert_eq!(state.position, 0, "no side effects on rejection");
1714    }
1715
1716    // AC7: run() rejects invalid workflows before any side effects
1717    #[test]
1718    fn run_rejects_invalid_workflow_before_side_effects() {
1719        let def = make_def("wf", vec![call_node("unregistered_agent")]);
1720        let engine = FlowEngineBuilder::new().build().unwrap();
1721
1722        let mut state = make_bare_state("wf");
1723
1724        let result = engine.run(&def, &mut state);
1725        assert!(result.is_err(), "run() must reject an invalid workflow");
1726        let err = result.unwrap_err().to_string();
1727        assert!(
1728            err.contains("validation"),
1729            "error should mention validation; got: {err}"
1730        );
1731        assert_eq!(
1732            state.position, 0,
1733            "no side effects: position must be unchanged when validation fails"
1734        );
1735        assert!(
1736            state.step_results.is_empty(),
1737            "no side effects: step_results must be empty when validation fails"
1738        );
1739    }
1740
1741    // ---------------------------------------------------------------------------
1742    // EventSink tests
1743    // ---------------------------------------------------------------------------
1744
1745    use crate::events::{EngineEvent, EngineEventData, EventSink};
1746    use crate::persistence_memory::InMemoryWorkflowPersistence;
1747
1748    /// A sink that always panics — used to test panic isolation.
1749    struct PanicSink;
1750
1751    impl EventSink for PanicSink {
1752        fn emit(&self, _event: &EngineEventData) {
1753            panic!("intentional sink panic");
1754        }
1755    }
1756
1757    /// Build a simple 1-step workflow that uses the NoopAlpha executor.
1758    fn make_single_step_def() -> WorkflowDef {
1759        make_def("wf", vec![call_node("alpha")])
1760    }
1761
1762    /// Build an ExecutionState with a fresh InMemoryWorkflowPersistence.
1763    fn make_state_with_persistence(wf_name: &str) -> crate::engine::ExecutionState {
1764        use crate::traits::persistence::{NewRun, WorkflowPersistence};
1765
1766        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
1767        // Create a run record so update_run_status doesn't fail; use the returned ID.
1768        let run = persistence
1769            .create_run(NewRun {
1770                workflow_name: wf_name.to_string(),
1771                parent_run_id: String::new(),
1772                dry_run: false,
1773                trigger: "manual".to_string(),
1774                definition_snapshot: None,
1775                parent_workflow_run_id: None,
1776            })
1777            .unwrap();
1778
1779        let mut state = make_bare_state(wf_name);
1780        state.persistence = persistence;
1781        state.action_registry = Arc::new(ActionRegistry::new(
1782            {
1783                let mut m = HashMap::new();
1784                m.insert(
1785                    "alpha".to_string(),
1786                    Box::new(AlphaExecutor)
1787                        as Box<dyn crate::traits::action_executor::ActionExecutor>,
1788                );
1789                m
1790            },
1791            None,
1792        ));
1793        state.workflow_run_id = run.id;
1794        state
1795    }
1796
1797    // Test: two sinks both receive all events in registration order
1798    #[test]
1799    fn event_sinks_multi_sink_ordering() {
1800        let sink_a = VecSink::new();
1801        let sink_b = VecSink::new();
1802
1803        let sink_a_clone = Arc::clone(&sink_a);
1804        let sink_b_clone = Arc::clone(&sink_b);
1805
1806        let engine = FlowEngineBuilder::new()
1807            .action(Box::new(AlphaExecutor))
1808            .event_sink(Box::new(ForwardSink(sink_a_clone)))
1809            .event_sink(Box::new(ForwardSink(sink_b_clone)))
1810            .build()
1811            .unwrap();
1812
1813        let def = make_single_step_def();
1814        let mut state = make_state_with_persistence("wf");
1815        let result = engine.run(&def, &mut state);
1816        assert!(result.is_ok(), "run should succeed: {:?}", result);
1817
1818        let events_a = sink_a.collected();
1819        let events_b = sink_b.collected();
1820        assert!(!events_a.is_empty(), "sink_a should have received events");
1821        assert_eq!(
1822            events_a.len(),
1823            events_b.len(),
1824            "both sinks should receive the same number of events"
1825        );
1826        // Verify at least RunStarted and RunCompleted were received
1827        let has_run_started = events_a
1828            .iter()
1829            .any(|e| matches!(e.event, EngineEvent::RunStarted { .. }));
1830        let has_run_completed = events_a
1831            .iter()
1832            .any(|e| matches!(e.event, EngineEvent::RunCompleted { .. }));
1833        assert!(has_run_started, "should have RunStarted event");
1834        assert!(has_run_completed, "should have RunCompleted event");
1835    }
1836
1837    // Test: with_event_sinks appends pre-built sinks and they all receive events
1838    #[test]
1839    fn with_event_sinks_accumulates_sinks() {
1840        let sink_a = VecSink::new();
1841        let sink_b = VecSink::new();
1842
1843        let pre_built: Arc<[Arc<dyn EventSink>]> = Arc::from(vec![
1844            Arc::clone(&sink_a) as Arc<dyn EventSink>,
1845            Arc::clone(&sink_b) as Arc<dyn EventSink>,
1846        ]);
1847
1848        let engine = FlowEngineBuilder::new()
1849            .action(Box::new(AlphaExecutor))
1850            .with_event_sinks(&pre_built)
1851            .build()
1852            .unwrap();
1853
1854        let def = make_single_step_def();
1855        let mut state = make_state_with_persistence("wf");
1856        let result = engine.run(&def, &mut state);
1857        assert!(result.is_ok(), "run should succeed: {:?}", result);
1858
1859        let events_a = sink_a.collected();
1860        let events_b = sink_b.collected();
1861        assert!(
1862            !events_a.is_empty(),
1863            "sink_a registered via with_event_sinks should receive events"
1864        );
1865        assert_eq!(
1866            events_a.len(),
1867            events_b.len(),
1868            "both sinks should receive the same number of events"
1869        );
1870        assert!(
1871            events_a
1872                .iter()
1873                .any(|e| matches!(e.event, EngineEvent::RunStarted { .. })),
1874            "should have RunStarted event"
1875        );
1876    }
1877
1878    // Test: mixing event_sink() and with_event_sinks() accumulates all sinks
1879    #[test]
1880    fn event_sink_and_with_event_sinks_both_accumulate() {
1881        let sink_a = VecSink::new();
1882        let sink_b = VecSink::new();
1883        let sink_c = VecSink::new();
1884
1885        let pre_built: Arc<[Arc<dyn EventSink>]> = Arc::from(vec![
1886            Arc::clone(&sink_b) as Arc<dyn EventSink>,
1887            Arc::clone(&sink_c) as Arc<dyn EventSink>,
1888        ]);
1889
1890        let engine = FlowEngineBuilder::new()
1891            .action(Box::new(AlphaExecutor))
1892            .event_sink(Box::new(ForwardSink(Arc::clone(&sink_a))))
1893            .with_event_sinks(&pre_built)
1894            .with_event_sinks(&pre_built) // second call appends, not replaces
1895            .build()
1896            .unwrap();
1897
1898        let def = make_single_step_def();
1899        let mut state = make_state_with_persistence("wf");
1900        engine.run(&def, &mut state).unwrap();
1901
1902        // sink_a (via event_sink) and sink_b/sink_c (via with_event_sinks) all fire
1903        assert!(
1904            !sink_a.collected().is_empty(),
1905            "event_sink sink should receive events"
1906        );
1907        assert_eq!(
1908            sink_b.collected().len(),
1909            sink_a.collected().len() * 2,
1910            "sink_b registered twice via with_event_sinks should receive 2x events"
1911        );
1912        assert_eq!(
1913            sink_b.collected().len(),
1914            sink_c.collected().len(),
1915            "both with_event_sinks sinks should receive the same count"
1916        );
1917    }
1918
1919    // Test: panicking sink doesn't abort the run; the non-panicking sink still receives events
1920    #[test]
1921    fn event_sinks_panic_safety() {
1922        let good_sink = VecSink::new();
1923        let good_sink_clone = Arc::clone(&good_sink);
1924
1925        let engine = FlowEngineBuilder::new()
1926            .action(Box::new(AlphaExecutor))
1927            .event_sink(Box::new(PanicSink))
1928            .event_sink(Box::new(ForwardSink(good_sink_clone)))
1929            .build()
1930            .unwrap();
1931
1932        let def = make_single_step_def();
1933        let mut state = make_state_with_persistence("wf");
1934        let result = engine.run(&def, &mut state);
1935        assert!(result.is_ok(), "run should succeed despite panicking sink");
1936
1937        let events = good_sink.collected();
1938        assert!(
1939            !events.is_empty(),
1940            "good sink should still receive events after panicking sink"
1941        );
1942    }
1943
1944    // Test: integration sequence — RunStarted → StepStarted → StepCompleted → MetricsUpdated → RunCompleted
1945    #[test]
1946    fn event_sink_integration_sequence() {
1947        let sink = VecSink::new();
1948        let sink_clone = Arc::clone(&sink);
1949
1950        let engine = FlowEngineBuilder::new()
1951            .action(Box::new(AlphaExecutor))
1952            .event_sink(Box::new(ForwardSink(sink_clone)))
1953            .build()
1954            .unwrap();
1955
1956        let def = make_single_step_def();
1957        let mut state = make_state_with_persistence("wf");
1958        let result = engine.run(&def, &mut state);
1959        assert!(result.is_ok(), "run should succeed: {:?}", result);
1960
1961        let events = sink.collected();
1962        let kinds: Vec<&str> = events
1963            .iter()
1964            .map(|e| match &e.event {
1965                EngineEvent::RunStarted { .. } => "RunStarted",
1966                EngineEvent::RunCompleted { .. } => "RunCompleted",
1967                EngineEvent::RunResumed { .. } => "RunResumed",
1968                EngineEvent::RunCancelled { .. } => "RunCancelled",
1969                EngineEvent::StepStarted { .. } => "StepStarted",
1970                EngineEvent::StepCompleted { .. } => "StepCompleted",
1971                EngineEvent::StepRetrying { .. } => "StepRetrying",
1972                EngineEvent::GateWaiting { .. } => "GateWaiting",
1973                EngineEvent::GateResolved { .. } => "GateResolved",
1974                EngineEvent::FanOutItemsCollected { .. } => "FanOutItemsCollected",
1975                EngineEvent::FanOutItemStarted { .. } => "FanOutItemStarted",
1976                EngineEvent::FanOutItemCompleted { .. } => "FanOutItemCompleted",
1977                EngineEvent::MetricsUpdated { .. } => "MetricsUpdated",
1978                EngineEvent::Panicked { .. } => "Panicked",
1979            })
1980            .collect();
1981
1982        assert_eq!(kinds[0], "RunStarted", "first event should be RunStarted");
1983        assert!(
1984            kinds.contains(&"StepStarted"),
1985            "should have StepStarted; got: {:?}",
1986            kinds
1987        );
1988        assert!(
1989            kinds.contains(&"StepCompleted"),
1990            "should have StepCompleted; got: {:?}",
1991            kinds
1992        );
1993        assert!(
1994            kinds.contains(&"MetricsUpdated"),
1995            "should have MetricsUpdated; got: {:?}",
1996            kinds
1997        );
1998        let last = kinds.last().unwrap();
1999        assert_eq!(*last, "RunCompleted", "last event should be RunCompleted");
2000    }
2001
2002    // ---------------------------------------------------------------------------
2003    // Cancellation integration tests (Task 16)
2004    // ---------------------------------------------------------------------------
2005
2006    /// Executor that always fails — used to trigger fail_fast in parallel tests.
2007    struct FailingExecutor;
2008    impl ActionExecutor for FailingExecutor {
2009        fn name(&self) -> &str {
2010            "failing"
2011        }
2012        fn execute(
2013            &self,
2014            _ctx: &dyn RunContext,
2015            _info: &StepInfo,
2016            _params: &ActionParams,
2017        ) -> Result<ActionOutput, EngineError> {
2018            Err(EngineError::Workflow("intentional failure".to_string()))
2019        }
2020    }
2021
2022    // AC: cancel_run marks run as Cancelling in DB and signals the token.
2023    #[test]
2024    fn cancel_run_marks_cancelling_in_db() {
2025        use crate::persistence_memory::InMemoryWorkflowPersistence;
2026        use crate::status::WorkflowRunStatus;
2027        use crate::traits::persistence::WorkflowPersistence;
2028
2029        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2030        let run = make_test_run(&persistence);
2031        persistence
2032            .update_run_status(&run.id, WorkflowRunStatus::Running, None, None)
2033            .unwrap();
2034
2035        let engine = FlowEngineBuilder::new().build().unwrap();
2036
2037        // Register a dummy active run entry so cancel_run finds it.
2038        {
2039            let mut runs = engine.active_runs.lock().unwrap_or_else(|e| e.into_inner());
2040            runs.insert(
2041                run.id.clone(),
2042                ActiveRunEntry {
2043                    token: crate::cancellation::CancellationToken::new(),
2044                    shutdown: Arc::new(AtomicBool::new(false)),
2045                    persistence: Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
2046                    registry: Arc::new(ActionRegistry::new(HashMap::new(), None)),
2047                    exec_info: Arc::new(Mutex::new(None)),
2048                    refresh_stop: Arc::new(AtomicBool::new(false)),
2049                    refresh_thread: None,
2050                    refresh_handle: None,
2051                },
2052            );
2053        }
2054
2055        engine
2056            .cancel_run(&run.id, CancellationReason::UserRequested(None))
2057            .unwrap();
2058
2059        let updated = persistence.get_run(&run.id).unwrap().unwrap();
2060        assert_eq!(
2061            updated.status,
2062            WorkflowRunStatus::Cancelling,
2063            "DB status should be Cancelling after cancel_run"
2064        );
2065    }
2066
2067    // AC: cancel_run returns Err when run is not active in this engine instance.
2068    #[test]
2069    fn cancel_run_returns_err_for_unknown_run() {
2070        let engine = FlowEngineBuilder::new().build().unwrap();
2071        let result = engine.cancel_run("nonexistent-run", CancellationReason::UserRequested(None));
2072        assert!(result.is_err(), "cancel_run on unknown run must return Err");
2073    }
2074
2075    // AC: token cancelled before run() starts causes the run to not succeed.
2076    #[test]
2077    fn pre_cancelled_token_causes_immediate_failure() {
2078        let engine = FlowEngineBuilder::new()
2079            .action(Box::new(AlphaExecutor))
2080            .build()
2081            .unwrap();
2082        let def = make_def("wf", vec![call_node("alpha")]);
2083        let mut state = make_state_with_persistence("wf");
2084
2085        // Cancel the token before run() starts.
2086        state
2087            .cancellation
2088            .cancel(CancellationReason::UserRequested(None));
2089
2090        // The engine handles cancellation internally, returning Ok(WorkflowResult{ all_succeeded: false }).
2091        let result = engine.run(&def, &mut state);
2092        let did_not_succeed = match result {
2093            Ok(wr) => !wr.all_succeeded,
2094            Err(_) => true,
2095        };
2096        assert!(
2097            did_not_succeed,
2098            "run with pre-cancelled token should not succeed"
2099        );
2100    }
2101
2102    // AC: fail_fast on a parallel block stops remaining branches after first failure.
2103    #[test]
2104    fn parallel_fail_fast_skips_remaining_branches() {
2105        use crate::dsl::{ParallelNode, WorkflowNode};
2106        use crate::persistence_memory::InMemoryWorkflowPersistence;
2107
2108        // Build engine with both alpha and failing executors.
2109        let engine = FlowEngineBuilder::new()
2110            .action(Box::new(AlphaExecutor))
2111            .action(Box::new(FailingExecutor))
2112            .build()
2113            .unwrap();
2114
2115        let parallel = WorkflowNode::Parallel(ParallelNode {
2116            fail_fast: true,
2117            min_success: None,
2118            calls: vec![
2119                crate::dsl::AgentRef::Name("failing".to_string()),
2120                crate::dsl::AgentRef::Name("alpha".to_string()),
2121                crate::dsl::AgentRef::Name("alpha".to_string()),
2122            ],
2123            output: None,
2124            call_outputs: HashMap::new(),
2125            with: vec![],
2126            call_with: HashMap::new(),
2127            call_if: HashMap::new(),
2128            call_retries: HashMap::new(),
2129        });
2130
2131        let def = make_def("wf", vec![parallel]);
2132
2133        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2134        let run = make_test_run(&persistence);
2135        let persistence: Arc<dyn crate::traits::persistence::WorkflowPersistence> = persistence;
2136
2137        // Build a state with both executors in the registry.
2138        let mut m = HashMap::new();
2139        m.insert(
2140            "alpha".to_string(),
2141            Box::new(AlphaExecutor) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2142        );
2143        m.insert(
2144            "failing".to_string(),
2145            Box::new(FailingExecutor) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2146        );
2147        let mut state = make_bare_state("wf");
2148        state.persistence = Arc::clone(&persistence);
2149        state.action_registry = Arc::new(ActionRegistry::new(m, None));
2150        state.workflow_run_id = run.id.clone();
2151
2152        engine.run(&def, &mut state).ok(); // may fail due to min_success
2153
2154        // With true parallel execution all branches are spawned simultaneously. The scope
2155        // token is cancelled as soon as the first failure result is processed; branches
2156        // that haven't dispatched yet will see the cancellation and return early. At minimum
2157        // the explicitly-failing branch must be recorded as Failed.
2158        let steps = persistence.get_steps(&run.id).unwrap();
2159        let failed = steps
2160            .iter()
2161            .filter(|s| s.status == crate::status::WorkflowStepStatus::Failed)
2162            .count();
2163        assert!(
2164            failed >= 1,
2165            "at least the first (failing) branch should be Failed; got steps: {:?}",
2166            steps
2167        );
2168    }
2169
2170    // AC: step-level timeout marks step TimedOut when DSL timeout fires.
2171    #[test]
2172    fn step_timeout_marks_timed_out() {
2173        use crate::dsl::{CallNode, WorkflowNode};
2174        use crate::persistence_memory::InMemoryWorkflowPersistence;
2175
2176        // Executor that sleeps longer than the DSL timeout.
2177        struct SlowExecutor;
2178        impl ActionExecutor for SlowExecutor {
2179            fn name(&self) -> &str {
2180                "slow"
2181            }
2182            fn execute(
2183                &self,
2184                _ctx: &dyn RunContext,
2185                _info: &StepInfo,
2186                _params: &ActionParams,
2187            ) -> Result<ActionOutput, EngineError> {
2188                std::thread::sleep(std::time::Duration::from_millis(100));
2189                Ok(ActionOutput::default())
2190            }
2191        }
2192
2193        let engine = FlowEngineBuilder::new()
2194            .action(Box::new(SlowExecutor))
2195            .build()
2196            .unwrap();
2197
2198        let timed_out_call = WorkflowNode::Call(CallNode {
2199            agent: crate::dsl::AgentRef::Name("slow".to_string()),
2200            retries: 0,
2201            on_fail: None,
2202            output: None,
2203            with: vec![],
2204            as_identity: None,
2205            plugin_dirs: vec![],
2206            timeout: Some("10ms".to_string()),
2207            max_turns: None,
2208        });
2209
2210        let def = make_def("wf", vec![timed_out_call]);
2211
2212        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2213        let run = make_test_run(&persistence);
2214        let persistence: Arc<dyn crate::traits::persistence::WorkflowPersistence> = persistence;
2215
2216        let mut m = HashMap::new();
2217        m.insert(
2218            "slow".to_string(),
2219            Box::new(SlowExecutor) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2220        );
2221        let mut state = make_bare_state("wf");
2222        state.persistence = Arc::clone(&persistence);
2223        state.action_registry = Arc::new(ActionRegistry::new(m, None));
2224        state.workflow_run_id = run.id.clone();
2225
2226        engine.run(&def, &mut state).ok();
2227
2228        let steps = persistence.get_steps(&run.id).unwrap();
2229        let timed_out = steps
2230            .iter()
2231            .any(|s| s.status == crate::status::WorkflowStepStatus::TimedOut);
2232        assert!(
2233            timed_out,
2234            "step should be marked TimedOut; got: {:?}",
2235            steps
2236        );
2237    }
2238
2239    // ---------------------------------------------------------------------------
2240    // FlowEngine::resume() tests
2241    // ---------------------------------------------------------------------------
2242
2243    // AC: resume() reads completed steps from DB and skips them; pending steps run.
2244    #[test]
2245    fn resume_skips_completed_steps() {
2246        use crate::persistence_memory::InMemoryWorkflowPersistence;
2247        use crate::status::WorkflowStepStatus;
2248        use crate::traits::persistence::{NewStep, StepUpdate, WorkflowPersistence};
2249        use std::sync::atomic::Ordering;
2250
2251        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2252        let run = make_test_run(&persistence);
2253
2254        // Pre-seed alpha as a completed step so resume() will skip it.
2255        let step_id = persistence
2256            .insert_step(NewStep {
2257                workflow_run_id: run.id.clone(),
2258                step_name: "alpha".to_string(),
2259                role: "actor".to_string(),
2260                can_commit: false,
2261                position: 0,
2262                iteration: 0,
2263                retry_count: Some(0),
2264            })
2265            .unwrap();
2266        persistence
2267            .update_step(
2268                &step_id,
2269                StepUpdate {
2270                    generation: 0,
2271                    status: WorkflowStepStatus::Completed,
2272                    child_run_id: None,
2273                    result_text: None,
2274                    context_out: None,
2275                    markers_out: None,
2276                    retry_count: None,
2277                    structured_output: None,
2278                    step_error: None,
2279                },
2280            )
2281            .unwrap();
2282
2283        let (alpha_count, beta_count, mut state) =
2284            make_counting_state(Arc::clone(&persistence), run.id);
2285
2286        let engine = FlowEngineBuilder::new().build().unwrap();
2287        let def = make_def("wf", vec![call_node("alpha"), call_node("beta")]);
2288        engine.resume(&def, &mut state).unwrap();
2289
2290        assert_eq!(
2291            alpha_count.load(Ordering::SeqCst),
2292            0,
2293            "alpha was pre-completed and should be skipped"
2294        );
2295        assert_eq!(
2296            beta_count.load(Ordering::SeqCst),
2297            1,
2298            "beta should execute once"
2299        );
2300    }
2301
2302    // AC: resume() skips pre-completed steps and runs remaining steps.
2303    // Metrics from pre-completed steps are sourced via agent_runs JOIN (conductor-side),
2304    // not from the removed WorkflowRunStep metric fields.
2305    #[test]
2306    fn resume_skips_pre_completed_steps() {
2307        use crate::persistence_memory::InMemoryWorkflowPersistence;
2308        use crate::status::WorkflowStepStatus;
2309        use crate::traits::persistence::{NewStep, StepUpdate, WorkflowPersistence};
2310
2311        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2312        let run = make_test_run(&persistence);
2313
2314        // Pre-seed alpha as a completed step.
2315        let step_id = persistence
2316            .insert_step(NewStep {
2317                workflow_run_id: run.id.clone(),
2318                step_name: "alpha".to_string(),
2319                role: "actor".to_string(),
2320                can_commit: false,
2321                position: 0,
2322                iteration: 0,
2323                retry_count: Some(0),
2324            })
2325            .unwrap();
2326        persistence
2327            .update_step(
2328                &step_id,
2329                StepUpdate {
2330                    generation: 0,
2331                    status: WorkflowStepStatus::Completed,
2332                    child_run_id: None,
2333                    result_text: None,
2334                    context_out: None,
2335                    markers_out: None,
2336                    retry_count: None,
2337                    structured_output: None,
2338                    step_error: None,
2339                },
2340            )
2341            .unwrap();
2342
2343        let (alpha_count, beta_count, mut state) =
2344            make_counting_state(Arc::clone(&persistence), run.id);
2345
2346        let engine = FlowEngineBuilder::new().build().unwrap();
2347        let def = make_def("wf", vec![call_node("alpha"), call_node("beta")]);
2348        engine.resume(&def, &mut state).unwrap();
2349
2350        assert_eq!(
2351            alpha_count.load(std::sync::atomic::Ordering::SeqCst),
2352            0,
2353            "alpha was pre-completed and should be skipped"
2354        );
2355        assert_eq!(
2356            beta_count.load(std::sync::atomic::Ordering::SeqCst),
2357            1,
2358            "beta should execute once"
2359        );
2360    }
2361
2362    // AC: resume() with no completed steps runs all steps (same behaviour as run()).
2363    #[test]
2364    fn resume_empty_skip_set_runs_all() {
2365        use crate::persistence_memory::InMemoryWorkflowPersistence;
2366        use std::sync::atomic::Ordering;
2367
2368        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2369        let run = make_test_run(&persistence);
2370
2371        let (alpha_count, beta_count, mut state) = make_counting_state(persistence, run.id);
2372
2373        let engine = FlowEngineBuilder::new().build().unwrap();
2374        let def = make_def("wf", vec![call_node("alpha"), call_node("beta")]);
2375        engine.resume(&def, &mut state).unwrap();
2376
2377        assert_eq!(
2378            alpha_count.load(Ordering::SeqCst),
2379            1,
2380            "alpha should execute once when no completed steps exist"
2381        );
2382        assert_eq!(
2383            beta_count.load(Ordering::SeqCst),
2384            1,
2385            "beta should execute once when no completed steps exist"
2386        );
2387    }
2388
2389    // AC: resume() with a while loop fast-forwards past completed iterations and only
2390    // executes body steps for the first incomplete iteration.
2391    #[test]
2392    fn resume_while_loop_starts_at_first_incomplete_iteration() {
2393        use crate::dsl::{OnMaxIter, WhileNode, WorkflowNode};
2394        use crate::persistence_memory::InMemoryWorkflowPersistence;
2395        use crate::status::WorkflowStepStatus;
2396        use crate::traits::persistence::{NewStep, StepUpdate, WorkflowPersistence};
2397        use std::sync::atomic::{AtomicUsize, Ordering};
2398
2399        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2400        let run = make_test_run(&persistence);
2401
2402        // Pre-seed the condition step (outside the while loop) as completed with a "continue" marker.
2403        let cond_id = persistence
2404            .insert_step(NewStep {
2405                workflow_run_id: run.id.clone(),
2406                step_name: "cond".to_string(),
2407                role: "actor".to_string(),
2408                can_commit: false,
2409                position: 0,
2410                iteration: 0,
2411                retry_count: Some(0),
2412            })
2413            .unwrap();
2414        persistence
2415            .update_step(
2416                &cond_id,
2417                StepUpdate {
2418                    generation: 0,
2419                    status: WorkflowStepStatus::Completed,
2420                    child_run_id: None,
2421                    result_text: None,
2422                    context_out: None,
2423                    markers_out: Some(r#"["continue"]"#.to_string()),
2424                    retry_count: None,
2425                    structured_output: None,
2426                    step_error: None,
2427                },
2428            )
2429            .unwrap();
2430
2431        // Pre-seed body_a and body_b for iterations 0 and 1.
2432        for iter in 0i64..2 {
2433            for (pos_offset, name) in [(0i64, "body_a"), (1, "body_b")] {
2434                let sid = persistence
2435                    .insert_step(NewStep {
2436                        workflow_run_id: run.id.clone(),
2437                        step_name: name.to_string(),
2438                        role: "actor".to_string(),
2439                        can_commit: false,
2440                        position: iter * 2 + pos_offset + 1,
2441                        iteration: iter,
2442                        retry_count: Some(0),
2443                    })
2444                    .unwrap();
2445                persistence
2446                    .update_step(
2447                        &sid,
2448                        StepUpdate {
2449                            generation: 0,
2450                            status: WorkflowStepStatus::Completed,
2451                            child_run_id: None,
2452                            result_text: None,
2453                            context_out: None,
2454                            markers_out: None,
2455                            retry_count: None,
2456                            structured_output: None,
2457                            step_error: None,
2458                        },
2459                    )
2460                    .unwrap();
2461            }
2462        }
2463
2464        // Build state with CountingExecutors for body_a and body_b.
2465        let a_count = Arc::new(AtomicUsize::new(0));
2466        let b_count = Arc::new(AtomicUsize::new(0));
2467        let mut m = HashMap::new();
2468        m.insert(
2469            "body_a".to_string(),
2470            Box::new(CountingExecutor {
2471                name: "body_a",
2472                count: Arc::clone(&a_count),
2473            }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2474        );
2475        m.insert(
2476            "body_b".to_string(),
2477            Box::new(CountingExecutor {
2478                name: "body_b",
2479                count: Arc::clone(&b_count),
2480            }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2481        );
2482        // Also register "cond" with a counting executor (it is pre-completed and will
2483        // be skipped, but must be present in the registry so validation passes).
2484        m.insert(
2485            "cond".to_string(),
2486            Box::new(CountingExecutor {
2487                name: "cond",
2488                count: Arc::new(AtomicUsize::new(0)),
2489            }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2490        );
2491        let mut state = make_bare_state("wf");
2492        state.persistence = Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>;
2493        state.action_registry = Arc::new(ActionRegistry::new(m, None));
2494        state.workflow_run_id = run.id.clone();
2495
2496        // Workflow: cond (outside) -> while(cond.continue, max=3) { body_a, body_b }
2497        let while_node = WorkflowNode::While(WhileNode {
2498            step: "cond".to_string(),
2499            marker: "continue".to_string(),
2500            max_iterations: 3,
2501            stuck_after: None,
2502            on_max_iter: OnMaxIter::Continue,
2503            body: vec![call_node("body_a"), call_node("body_b")],
2504        });
2505        let def = make_def("wf", vec![call_node("cond"), while_node]);
2506
2507        let engine = FlowEngineBuilder::new().build().unwrap();
2508        engine.resume(&def, &mut state).unwrap();
2509
2510        assert_eq!(
2511            a_count.load(Ordering::SeqCst),
2512            1,
2513            "body_a should execute only for the third iteration (first incomplete)"
2514        );
2515        assert_eq!(
2516            b_count.load(Ordering::SeqCst),
2517            1,
2518            "body_b should execute only for the third iteration (first incomplete)"
2519        );
2520    }
2521
2522    // AC: resume() propagates persistence errors from get_steps().
2523    #[test]
2524    fn resume_propagates_get_steps_error() {
2525        use crate::persistence_memory::InMemoryWorkflowPersistence;
2526
2527        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2528        persistence.seed_run("run-123");
2529        persistence.set_fail_get_steps(true);
2530
2531        let engine = FlowEngineBuilder::new().build().unwrap();
2532        let def = make_def("wf", vec![call_node("alpha")]);
2533        let mut state = make_bare_state("wf");
2534        state.persistence = persistence;
2535        state.workflow_run_id = "run-123".to_string();
2536
2537        let err = engine.resume(&def, &mut state).unwrap_err();
2538        let msg = err.to_string();
2539        assert!(
2540            msg.contains("resume: failed to load steps for run"),
2541            "error should contain the prefix; got: {msg}"
2542        );
2543        assert!(
2544            msg.contains("run-123"),
2545            "error should contain the run ID; got: {msg}"
2546        );
2547    }
2548
2549    // AC: resume() returns Err when called with a pre-seeded resume_ctx.
2550    #[test]
2551    fn resume_rejects_pre_seeded_resume_ctx() {
2552        use crate::engine::ResumeContext;
2553        use std::collections::HashMap;
2554
2555        let engine = FlowEngineBuilder::new().build().unwrap();
2556        let def = make_def("wf", vec![call_node("alpha")]);
2557        let mut state = make_bare_state("wf");
2558        state.resume_ctx = Some(ResumeContext {
2559            step_map: HashMap::new(),
2560        });
2561        state.workflow_run_id = "run-precond".to_string();
2562
2563        let err = engine.resume(&def, &mut state).unwrap_err();
2564        assert!(
2565            err.to_string().contains("resume_ctx"),
2566            "error should mention resume_ctx; got: {err}"
2567        );
2568    }
2569
2570    // ---------------------------------------------------------------------------
2571    // Lease acquisition tests
2572    // ---------------------------------------------------------------------------
2573
2574    // AC: run() sets owner_token and lease_generation after a successful acquire.
2575    #[test]
2576    fn run_sets_lease_fields_on_success() {
2577        use crate::persistence_memory::InMemoryWorkflowPersistence;
2578
2579        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2580        let run = make_test_run(&persistence);
2581
2582        let engine = FlowEngineBuilder::new()
2583            .action(Box::new(AlphaExecutor))
2584            .build()
2585            .unwrap();
2586        let def = make_def("wf", vec![call_node("alpha")]);
2587
2588        let mut state = make_bare_state("wf");
2589        state.persistence =
2590            Arc::clone(&persistence) as Arc<dyn crate::traits::persistence::WorkflowPersistence>;
2591        state.action_registry = Arc::new(ActionRegistry::new(
2592            {
2593                let mut m = HashMap::new();
2594                m.insert(
2595                    "alpha".to_string(),
2596                    Box::new(AlphaExecutor)
2597                        as Box<dyn crate::traits::action_executor::ActionExecutor>,
2598                );
2599                m
2600            },
2601            None,
2602        ));
2603        state.workflow_run_id = run.id.clone();
2604
2605        engine.run(&def, &mut state).unwrap();
2606
2607        assert!(
2608            state.owner_token.is_some(),
2609            "owner_token should be set after run()"
2610        );
2611        assert_eq!(
2612            state.lease_generation,
2613            Some(1),
2614            "lease_generation should be 1 after first acquire"
2615        );
2616    }
2617
2618    // AC: two concurrent FlowEngine::run calls on same run_id → exactly one succeeds.
2619    #[test]
2620    fn two_concurrent_runs_exactly_one_succeeds() {
2621        use crate::persistence_memory::InMemoryWorkflowPersistence;
2622        use crate::traits::persistence::WorkflowPersistence;
2623        use std::thread;
2624
2625        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2626        let run = make_test_run(&persistence);
2627        let run_id = run.id.clone();
2628
2629        let persistence: Arc<dyn WorkflowPersistence> = persistence;
2630
2631        // Build a state factory
2632        let make_state_for_run = |run_id: String, p: Arc<dyn WorkflowPersistence>| {
2633            let mut s = make_bare_state("wf");
2634            s.persistence = p;
2635            s.action_registry = Arc::new(ActionRegistry::new(
2636                {
2637                    let mut m = HashMap::new();
2638                    m.insert(
2639                        "alpha".to_string(),
2640                        Box::new(AlphaExecutor)
2641                            as Box<dyn crate::traits::action_executor::ActionExecutor>,
2642                    );
2643                    m
2644                },
2645                None,
2646            ));
2647            s.workflow_run_id = run_id;
2648            s
2649        };
2650
2651        let def = make_def("wf", vec![call_node("alpha")]);
2652
2653        // Use a barrier so both threads start run() at the same time.
2654        let barrier = Arc::new(std::sync::Barrier::new(2));
2655
2656        let p1 = Arc::clone(&persistence);
2657        let run_id1 = run_id.clone();
2658        let barrier1 = Arc::clone(&barrier);
2659        let def1 = def.clone();
2660        let t1 = thread::spawn(move || {
2661            let engine = FlowEngineBuilder::new()
2662                .action(Box::new(AlphaExecutor))
2663                .build()
2664                .unwrap();
2665            let mut state = make_state_for_run(run_id1, p1);
2666            barrier1.wait();
2667            engine.run(&def1, &mut state)
2668        });
2669
2670        let p2 = Arc::clone(&persistence);
2671        let run_id2 = run_id.clone();
2672        let barrier2 = Arc::clone(&barrier);
2673        let def2 = def.clone();
2674        let t2 = thread::spawn(move || {
2675            let engine = FlowEngineBuilder::new()
2676                .action(Box::new(AlphaExecutor))
2677                .build()
2678                .unwrap();
2679            let mut state = make_state_for_run(run_id2, p2);
2680            barrier2.wait();
2681            engine.run(&def2, &mut state)
2682        });
2683
2684        let r1 = t1.join().unwrap();
2685        let r2 = t2.join().unwrap();
2686
2687        let successes = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
2688        let already_owned = [&r1, &r2]
2689            .iter()
2690            .filter(|r| matches!(r, Err(EngineError::AlreadyOwned(_))))
2691            .count();
2692
2693        assert_eq!(
2694            successes, 1,
2695            "exactly one run should succeed; got r1={r1:?}, r2={r2:?}"
2696        );
2697        assert_eq!(
2698            already_owned, 1,
2699            "exactly one run should fail with AlreadyOwned; got r1={r1:?}, r2={r2:?}"
2700        );
2701    }
2702
2703    // AC: resume() acquires lease before get_steps() — a pre-held lease blocks resume().
2704    #[test]
2705    fn resume_acquires_before_get_steps() {
2706        use crate::persistence_memory::InMemoryWorkflowPersistence;
2707        use crate::traits::persistence::WorkflowPersistence;
2708
2709        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2710        let run = make_test_run(&persistence);
2711
2712        // Manually acquire the lease for another token (TTL = 1 hour, won't expire).
2713        persistence
2714            .acquire_lease(&run.id, "other-engine-token", 3600)
2715            .unwrap();
2716
2717        let engine = FlowEngineBuilder::new()
2718            .action(Box::new(AlphaExecutor))
2719            .build()
2720            .unwrap();
2721        let def = make_def("wf", vec![call_node("alpha")]);
2722
2723        let mut state = make_bare_state("wf");
2724        state.persistence = Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>;
2725        state.workflow_run_id = run.id.clone();
2726
2727        let err = engine.resume(&def, &mut state).unwrap_err();
2728        assert!(
2729            matches!(err, EngineError::AlreadyOwned(_)),
2730            "resume() with a pre-held lease should fail with AlreadyOwned; got {err:?}"
2731        );
2732    }
2733
2734    // AC: existing single-engine workflow still completes normally.
2735    #[test]
2736    fn single_engine_workflow_still_completes() {
2737        let engine = FlowEngineBuilder::new()
2738            .action(Box::new(AlphaExecutor))
2739            .build()
2740            .unwrap();
2741        let def = make_single_step_def();
2742        let mut state = make_state_with_persistence("wf");
2743        let result = engine.run(&def, &mut state).unwrap();
2744        assert!(
2745            result.all_succeeded,
2746            "single-engine workflow should complete successfully"
2747        );
2748    }
2749
2750    // AC: refresh_lease_loop Err path — DB error during refresh triggers LeaseLost abort.
2751    //
2752    // Setup: executor blocks until its shutdown flag is set; fail_acquire_lease is flipped
2753    // from a side thread once the executor is running so the initial acquire() in run()
2754    // still succeeds. The first refresh tick then returns Err, calling signal_lease_abort
2755    // which sets shutdown=true. The executor exits, and run() returns Err(Cancelled(LeaseLost)).
2756    #[test]
2757    fn refresh_db_error_causes_lease_lost_abort() {
2758        use crate::persistence_memory::InMemoryWorkflowPersistence;
2759        use crate::traits::action_executor::{ActionOutput, ActionParams};
2760        use crate::traits::persistence::WorkflowPersistence;
2761        use std::sync::atomic::Ordering;
2762        use std::thread;
2763        use std::time::Duration;
2764
2765        struct BlockingExecutor {
2766            started: Arc<AtomicBool>,
2767            shutdown: Arc<AtomicBool>,
2768        }
2769        impl ActionExecutor for BlockingExecutor {
2770            fn name(&self) -> &str {
2771                "alpha"
2772            }
2773            fn execute(
2774                &self,
2775                _ctx: &dyn crate::traits::run_context::RunContext,
2776                _info: &crate::traits::action_executor::StepInfo,
2777                _: &ActionParams,
2778            ) -> Result<ActionOutput, EngineError> {
2779                self.started.store(true, Ordering::SeqCst);
2780                // Spin until the engine's shutdown flag is set by signal_lease_abort.
2781                loop {
2782                    if self.shutdown.load(Ordering::Relaxed) {
2783                        return Ok(ActionOutput::default());
2784                    }
2785                    std::thread::sleep(Duration::from_millis(1));
2786                }
2787            }
2788        }
2789
2790        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2791        let run = make_test_run(&persistence);
2792
2793        let started = Arc::new(AtomicBool::new(false));
2794        let started_clone = Arc::clone(&started);
2795        let persistence_clone = Arc::clone(&persistence);
2796
2797        // Side thread: wait until the executor has started (initial acquire done),
2798        // then flip fail_acquire_lease so the next refresh tick returns Err.
2799        let watcher = thread::spawn(move || {
2800            while !started_clone.load(Ordering::Relaxed) {
2801                std::thread::sleep(Duration::from_millis(1));
2802            }
2803            persistence_clone.set_fail_acquire_lease(true);
2804        });
2805
2806        // Pre-set the shutdown arc so the refresh thread's signal_lease_abort sets the
2807        // same instance we hand to BlockingExecutor — FlowEngine::run() reuses it when
2808        // exec_config.shutdown is already populated.
2809        let shared_shutdown = Arc::new(AtomicBool::new(false));
2810
2811        let mut m = HashMap::new();
2812        m.insert(
2813            "alpha".to_string(),
2814            Box::new(BlockingExecutor {
2815                started: Arc::clone(&started),
2816                shutdown: Arc::clone(&shared_shutdown),
2817            }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2818        );
2819        let mut state = make_bare_state("wf");
2820        state.persistence = Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>;
2821        state.action_registry = Arc::new(ActionRegistry::new(m, None));
2822        state.workflow_run_id = run.id.clone();
2823        state.exec_config.shutdown = Some(Arc::clone(&shared_shutdown));
2824        // Short refresh interval so the error is detected quickly.
2825        state.exec_config.lease_refresh_interval = Duration::from_millis(15);
2826
2827        let engine = FlowEngineBuilder::new().build().unwrap();
2828        let def = make_def("wf", vec![call_node("alpha")]);
2829
2830        let result = engine.run(&def, &mut state);
2831        watcher.join().unwrap();
2832
2833        assert!(
2834            matches!(
2835                result,
2836                Err(EngineError::Cancelled(CancellationReason::LeaseLost))
2837            ),
2838            "DB error in refresh should abort with LeaseLost; got {result:?}"
2839        );
2840    }
2841
2842    // AC: stale generation on step write — when another engine steals the lease mid-run,
2843    // the next executor step write sees a generation mismatch, returns LeaseLost, and
2844    // the engine aborts cleanly without duplicate side effects.
2845    #[test]
2846    fn stale_generation_on_step_write_aborts_with_lease_lost() {
2847        use crate::persistence_memory::InMemoryWorkflowPersistence;
2848        use crate::traits::action_executor::{ActionOutput, ActionParams};
2849        use crate::traits::persistence::WorkflowPersistence;
2850        use std::sync::atomic::Ordering;
2851        use std::thread;
2852        use std::time::Duration;
2853
2854        // Executor that signals when started, waits for proceed, then returns Ok.
2855        // The stealer steals the lease between started and proceed so the step
2856        // write (persist_completed_step) sees a stale generation.
2857        struct LatchedExecutor {
2858            started: Arc<AtomicBool>,
2859            proceed: Arc<AtomicBool>,
2860        }
2861        impl ActionExecutor for LatchedExecutor {
2862            fn name(&self) -> &str {
2863                "alpha"
2864            }
2865            fn execute(
2866                &self,
2867                _ctx: &dyn crate::traits::run_context::RunContext,
2868                _info: &crate::traits::action_executor::StepInfo,
2869                _: &ActionParams,
2870            ) -> Result<ActionOutput, EngineError> {
2871                self.started.store(true, Ordering::SeqCst);
2872                while !self.proceed.load(Ordering::SeqCst) {
2873                    std::thread::sleep(Duration::from_millis(1));
2874                }
2875                Ok(ActionOutput::default())
2876            }
2877        }
2878
2879        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2880        let run = make_test_run(&persistence);
2881        let run_id = run.id.clone();
2882
2883        let started = Arc::new(AtomicBool::new(false));
2884        let proceed = Arc::new(AtomicBool::new(false));
2885        let started_clone = Arc::clone(&started);
2886        let proceed_clone = Arc::clone(&proceed);
2887        let persistence_clone = Arc::clone(&persistence);
2888
2889        // Side thread: waits for the executor to start, steals the lease (bumps
2890        // generation), then lets the executor proceed so it tries to write the step.
2891        let stealer = thread::spawn(move || {
2892            while !started_clone.load(Ordering::SeqCst) {
2893                std::thread::sleep(Duration::from_millis(1));
2894            }
2895            persistence_clone.expire_and_steal_lease(&run_id, "thief-token");
2896            proceed_clone.store(true, Ordering::SeqCst);
2897        });
2898
2899        let mut m = HashMap::new();
2900        m.insert(
2901            "alpha".to_string(),
2902            Box::new(LatchedExecutor {
2903                started: Arc::clone(&started),
2904                proceed: Arc::clone(&proceed),
2905            }) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2906        );
2907
2908        let mut state = make_bare_state("wf");
2909        state.persistence = Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>;
2910        state.action_registry = Arc::new(ActionRegistry::new(m, None));
2911        state.workflow_run_id = run.id.clone();
2912        // Short refresh interval so the refresh thread also detects the steal quickly.
2913        state.exec_config.lease_refresh_interval = Duration::from_millis(15);
2914
2915        let engine = FlowEngineBuilder::new().build().unwrap();
2916        let def = make_def("wf", vec![call_node("alpha")]);
2917
2918        let result = engine.run(&def, &mut state);
2919        stealer.join().unwrap();
2920
2921        assert!(
2922            matches!(
2923                result,
2924                Err(EngineError::Cancelled(CancellationReason::LeaseLost))
2925            ),
2926            "stale generation on step write should abort with LeaseLost; got {result:?}"
2927        );
2928    }
2929
2930    // AC: cross-process cancel — is_run_cancelled returns true for Cancelling status.
2931    #[test]
2932    fn cross_process_cancel_via_db_poll() {
2933        use crate::persistence_memory::InMemoryWorkflowPersistence;
2934        use crate::status::WorkflowRunStatus;
2935        use crate::traits::persistence::WorkflowPersistence;
2936
2937        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2938        let run = make_test_run(&persistence);
2939
2940        // Simulate cross-process cancel by directly writing Cancelling to DB.
2941        persistence
2942            .update_run_status(&run.id, WorkflowRunStatus::Cancelling, None, None)
2943            .unwrap();
2944
2945        // is_run_cancelled must return true for Cancelling status.
2946        assert!(
2947            persistence.is_run_cancelled(&run.id).unwrap(),
2948            "is_run_cancelled should return true when status is Cancelling"
2949        );
2950    }
2951
2952    // ---------------------------------------------------------------------------
2953    // run_workflow / run_child entry-point tests (#32)
2954    // ---------------------------------------------------------------------------
2955
2956    fn make_alpha_registry() -> Arc<ActionRegistry> {
2957        let mut m = HashMap::new();
2958        m.insert(
2959            "alpha".to_string(),
2960            Box::new(AlphaExecutor) as Box<dyn crate::traits::action_executor::ActionExecutor>,
2961        );
2962        Arc::new(ActionRegistry::new(m, None))
2963    }
2964
2965    fn make_run_workflow_input(
2966        persistence: Arc<dyn crate::traits::persistence::WorkflowPersistence>,
2967        run_id: String,
2968    ) -> RunInput {
2969        use crate::traits::run_context::NoopRunContext;
2970        use crate::traits::script_env_provider::NoOpScriptEnvProvider;
2971        RunInput::new(
2972            persistence,
2973            run_id,
2974            "wf".to_string(),
2975            make_alpha_registry(),
2976            Arc::new(ItemProviderRegistry::new()),
2977            Arc::new(NoOpScriptEnvProvider),
2978            Arc::new(NoopRunContext::default()),
2979            CancellationToken::new(),
2980        )
2981    }
2982
2983    #[test]
2984    fn run_workflow_acquires_lease_and_runs_to_completion() {
2985        use crate::persistence_memory::InMemoryWorkflowPersistence;
2986        use crate::status::WorkflowRunStatus;
2987        use crate::traits::persistence::WorkflowPersistence;
2988
2989        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
2990        let run = make_test_run(&persistence);
2991
2992        let engine = FlowEngineBuilder::new()
2993            .action(Box::new(AlphaExecutor))
2994            .build()
2995            .unwrap();
2996        let def = make_def("wf", vec![call_node("alpha")]);
2997
2998        let result = engine
2999            .run_workflow(
3000                &def,
3001                make_run_workflow_input(
3002                    Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3003                    run.id.clone(),
3004                ),
3005            )
3006            .unwrap();
3007
3008        assert!(result.all_succeeded, "run_workflow should succeed");
3009        let row = persistence.get_run(&run.id).unwrap().unwrap();
3010        assert_eq!(
3011            row.status,
3012            WorkflowRunStatus::Completed,
3013            "run should be Completed in persistence"
3014        );
3015    }
3016
3017    #[test]
3018    fn run_workflow_does_not_inherit_lease_generation_some_zero() {
3019        use crate::persistence_memory::InMemoryWorkflowPersistence;
3020        use crate::traits::persistence::WorkflowPersistence;
3021
3022        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3023        let run1 = make_test_run(&persistence);
3024        let run2 = make_test_run(&persistence);
3025
3026        let engine = FlowEngineBuilder::new()
3027            .action(Box::new(AlphaExecutor))
3028            .build()
3029            .unwrap();
3030        let def = make_def("wf", vec![call_node("alpha")]);
3031
3032        // Neither call should panic with "lease_generation must be set after
3033        // FlowEngine::run/resume entry" — run_workflow starts with None.
3034        engine
3035            .run_workflow(
3036                &def,
3037                make_run_workflow_input(
3038                    Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3039                    run1.id,
3040                ),
3041            )
3042            .unwrap();
3043        engine
3044            .run_workflow(
3045                &def,
3046                make_run_workflow_input(
3047                    Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3048                    run2.id,
3049                ),
3050            )
3051            .unwrap();
3052    }
3053
3054    #[test]
3055    fn run_child_passes_parent_ctx_inputs_through_when_no_override() {
3056        use crate::engine::ChildWorkflowContext;
3057        use crate::persistence_memory::InMemoryWorkflowPersistence;
3058        use crate::traits::persistence::WorkflowPersistence;
3059        use crate::traits::run_context::NoopRunContext;
3060        use crate::traits::script_env_provider::NoOpScriptEnvProvider;
3061
3062        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3063        let run = make_test_run(&persistence);
3064
3065        let engine = FlowEngineBuilder::new()
3066            .action(Box::new(AlphaExecutor))
3067            .build()
3068            .unwrap();
3069        let def = make_def("wf", vec![call_node("alpha")]);
3070
3071        let mut parent_inputs = HashMap::new();
3072        parent_inputs.insert("key".to_string(), "parent_value".to_string());
3073
3074        let parent_ctx = ChildWorkflowContext {
3075            run_ctx: Arc::new(NoopRunContext::default()),
3076            extra_plugin_dirs: vec![],
3077            workflow_run_id: "parent-run-id".to_string(),
3078            model: None,
3079            exec_config: crate::types::WorkflowExecConfig::default(),
3080            inputs: parent_inputs,
3081            event_sinks: Arc::from(vec![]),
3082        };
3083
3084        let result = engine
3085            .run_child(
3086                &def,
3087                ChildRunInput {
3088                    workflow_run_id: run.id.clone(),
3089                    persistence: Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3090                    action_registry: make_alpha_registry(),
3091                    item_provider_registry: Arc::new(ItemProviderRegistry::new()),
3092                    script_env_provider: Arc::new(NoOpScriptEnvProvider),
3093                    child_runner: None,
3094                    schema_resolver: None,
3095                    as_identity: None,
3096                    depth: 1,
3097                    cancellation: CancellationToken::new(),
3098                    target_label: None,
3099                    triggered_by_hook: false,
3100                    inputs_override: None, // parent inputs flow through
3101                },
3102                &parent_ctx,
3103            )
3104            .unwrap();
3105
3106        assert!(
3107            result.all_succeeded,
3108            "run_child with no inputs_override should succeed"
3109        );
3110    }
3111
3112    #[test]
3113    fn run_child_inputs_override_replaces_parent_ctx_inputs() {
3114        use crate::engine::ChildWorkflowContext;
3115        use crate::persistence_memory::InMemoryWorkflowPersistence;
3116        use crate::traits::persistence::WorkflowPersistence;
3117        use crate::traits::run_context::NoopRunContext;
3118        use crate::traits::script_env_provider::NoOpScriptEnvProvider;
3119
3120        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3121        let run = make_test_run(&persistence);
3122
3123        let engine = FlowEngineBuilder::new()
3124            .action(Box::new(AlphaExecutor))
3125            .build()
3126            .unwrap();
3127        let def = make_def("wf", vec![call_node("alpha")]);
3128
3129        let mut parent_inputs = HashMap::new();
3130        parent_inputs.insert("key".to_string(), "parent_value".to_string());
3131
3132        let parent_ctx = ChildWorkflowContext {
3133            run_ctx: Arc::new(NoopRunContext::default()),
3134            extra_plugin_dirs: vec![],
3135            workflow_run_id: "parent-run-id".to_string(),
3136            model: None,
3137            exec_config: crate::types::WorkflowExecConfig::default(),
3138            inputs: parent_inputs,
3139            event_sinks: Arc::from(vec![]),
3140        };
3141
3142        let mut override_inputs = HashMap::new();
3143        override_inputs.insert("key".to_string(), "override_value".to_string());
3144
3145        let result = engine
3146            .run_child(
3147                &def,
3148                ChildRunInput {
3149                    workflow_run_id: run.id.clone(),
3150                    persistence: Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
3151                    action_registry: make_alpha_registry(),
3152                    item_provider_registry: Arc::new(ItemProviderRegistry::new()),
3153                    script_env_provider: Arc::new(NoOpScriptEnvProvider),
3154                    child_runner: None,
3155                    schema_resolver: None,
3156                    as_identity: None,
3157                    depth: 1,
3158                    cancellation: CancellationToken::new(),
3159                    target_label: None,
3160                    triggered_by_hook: false,
3161                    inputs_override: Some(override_inputs), // override replaces parent inputs
3162                },
3163                &parent_ctx,
3164            )
3165            .unwrap();
3166
3167        assert!(
3168            result.all_succeeded,
3169            "run_child with inputs_override should succeed"
3170        );
3171    }
3172
3173    #[test]
3174    fn child_run_input_new_sets_required_fields_and_zeros_optional() {
3175        use crate::cancellation::CancellationToken;
3176        use crate::persistence_memory::InMemoryWorkflowPersistence;
3177        use crate::traits::action_executor::ActionRegistry;
3178        use crate::traits::item_provider::ItemProviderRegistry;
3179        use crate::traits::script_env_provider::NoOpScriptEnvProvider;
3180
3181        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3182        let action_registry = Arc::new(ActionRegistry::new(HashMap::new(), None));
3183        let item_provider_registry = Arc::new(ItemProviderRegistry::new());
3184        let script_env_provider = Arc::new(NoOpScriptEnvProvider);
3185        let cancellation = CancellationToken::new();
3186
3187        let input = ChildRunInput::new(
3188            "run-child-1".to_string(),
3189            Arc::clone(&persistence) as Arc<dyn crate::traits::persistence::WorkflowPersistence>,
3190            Arc::clone(&action_registry),
3191            Arc::clone(&item_provider_registry),
3192            Arc::clone(&script_env_provider)
3193                as Arc<dyn crate::traits::script_env_provider::ScriptEnvProvider>,
3194            2,
3195            cancellation,
3196        );
3197
3198        assert_eq!(input.workflow_run_id, "run-child-1");
3199        assert_eq!(input.depth, 2);
3200        assert!(input.child_runner.is_none());
3201        assert!(input.schema_resolver.is_none());
3202        assert!(input.as_identity.is_none());
3203        assert!(input.target_label.is_none());
3204        assert!(!input.triggered_by_hook);
3205        assert!(input.inputs_override.is_none());
3206    }
3207
3208    #[test]
3209    fn run_input_new_sets_required_fields_and_zeros_optional() {
3210        use crate::cancellation::CancellationToken;
3211        use crate::persistence_memory::InMemoryWorkflowPersistence;
3212        use crate::traits::action_executor::ActionRegistry;
3213        use crate::traits::item_provider::ItemProviderRegistry;
3214        use crate::traits::run_context::NoopRunContext;
3215        use crate::traits::script_env_provider::NoOpScriptEnvProvider;
3216
3217        let persistence = Arc::new(InMemoryWorkflowPersistence::new());
3218        let action_registry = Arc::new(ActionRegistry::new(HashMap::new(), None));
3219        let item_provider_registry = Arc::new(ItemProviderRegistry::new());
3220        let script_env_provider = Arc::new(NoOpScriptEnvProvider);
3221        let run_ctx = Arc::new(NoopRunContext::default());
3222        let cancellation = CancellationToken::new();
3223
3224        let input = RunInput::new(
3225            Arc::clone(&persistence) as Arc<dyn crate::traits::persistence::WorkflowPersistence>,
3226            "run-top-1".to_string(),
3227            "my-workflow".to_string(),
3228            Arc::clone(&action_registry),
3229            Arc::clone(&item_provider_registry),
3230            Arc::clone(&script_env_provider)
3231                as Arc<dyn crate::traits::script_env_provider::ScriptEnvProvider>,
3232            run_ctx as Arc<dyn crate::traits::run_context::RunContext>,
3233            cancellation,
3234        );
3235
3236        assert_eq!(input.workflow_run_id, "run-top-1");
3237        assert_eq!(input.workflow_name, "my-workflow");
3238        assert!(input.extra_plugin_dirs.is_empty());
3239        assert!(input.model.is_none());
3240        assert!(input.inputs.is_empty());
3241        assert_eq!(input.parent_run_id, "");
3242        assert_eq!(input.depth, 0);
3243        assert!(input.target_label.is_none());
3244        assert!(input.default_as_identity.is_none());
3245        assert!(!input.triggered_by_hook);
3246        assert!(input.schema_resolver.is_none());
3247        assert!(input.child_runner.is_none());
3248        assert!(input.event_sinks.is_empty());
3249    }
3250}