Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a `RunStore` for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::{DateTime, Utc};
15use serde_json::Value;
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{
24    NewRun, Run, RunStatus, RunUpdate, StepStatus, StepUpdate, TriggerKind,
25};
26use ironflow_store::store::Store;
27#[cfg(feature = "prometheus")]
28use metrics::{counter, gauge, histogram};
29
30use crate::context::WorkflowContext;
31use crate::error::EngineError;
32use crate::handler::{WorkflowHandler, WorkflowInfo};
33use crate::log_sender::LogSender;
34use crate::notify::{Event, EventPublisher, EventSubscriber};
35
36/// The workflow orchestration engine.
37///
38/// Holds references to the store, agent provider, and a registry of
39/// [`WorkflowHandler`]s.
40///
41/// # Examples
42///
43/// ```no_run
44/// use std::sync::Arc;
45/// use ironflow_engine::engine::Engine;
46/// use ironflow_engine::config::ShellConfig;
47/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
48/// use ironflow_engine::context::WorkflowContext;
49/// use ironflow_store::memory::InMemoryStore;
50/// use ironflow_store::models::TriggerKind;
51/// use ironflow_core::providers::claude::ClaudeCodeProvider;
52/// use serde_json::json;
53///
54/// struct CiWorkflow;
55/// impl WorkflowHandler for CiWorkflow {
56///     fn name(&self) -> &str { "ci" }
57///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
58///         Box::pin(async move {
59///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
60///             Ok(())
61///         })
62///     }
63/// }
64///
65/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
66/// let store = Arc::new(InMemoryStore::new());
67/// let provider = Arc::new(ClaudeCodeProvider::new());
68/// let mut engine = Engine::new(store, provider);
69/// engine.register(CiWorkflow)?;
70///
71/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
72/// println!("Run {} completed with status {:?}", run.id, run.status);
73/// # Ok(())
74/// # }
75/// ```
76pub struct Engine {
77    store: Arc<dyn Store>,
78    provider: Arc<dyn AgentProvider>,
79    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
80    event_publisher: EventPublisher,
81    log_sender: Option<LogSender>,
82}
83
84/// Validate a workflow category path.
85///
86/// A category is a `/`-separated list of non-empty segments. This function
87/// rejects empty paths, leading or trailing `/`, consecutive `/`, and
88/// segments containing only whitespace.
89///
90/// # Errors
91///
92/// Returns [`EngineError::InvalidWorkflow`] when the category is malformed.
93fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
94    let reject = |reason: &str| {
95        Err(EngineError::InvalidWorkflow(format!(
96            "handler '{handler_name}' has invalid category '{category}': {reason}"
97        )))
98    };
99
100    if category.is_empty() {
101        return reject("empty category");
102    }
103    if category.starts_with('/') {
104        return reject("leading '/'");
105    }
106    if category.ends_with('/') {
107        return reject("trailing '/'");
108    }
109    for segment in category.split('/') {
110        if segment.is_empty() {
111            return reject("empty segment (double '/')");
112        }
113        if segment.trim().is_empty() {
114            return reject("whitespace-only segment");
115        }
116    }
117    Ok(())
118}
119
120impl Engine {
121    /// Create a new engine with the given store and agent provider.
122    ///
123    /// # Examples
124    ///
125    /// ```no_run
126    /// use std::sync::Arc;
127    /// use ironflow_engine::engine::Engine;
128    /// use ironflow_store::memory::InMemoryStore;
129    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
130    ///
131    /// let engine = Engine::new(
132    ///     Arc::new(InMemoryStore::new()),
133    ///     Arc::new(ClaudeCodeProvider::new()),
134    /// );
135    /// ```
136    pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
137        Self {
138            store,
139            provider,
140            handlers: HashMap::new(),
141            event_publisher: EventPublisher::new(),
142            log_sender: None,
143        }
144    }
145
146    /// Attach a log sender for real-time step output streaming.
147    ///
148    /// When set, all workflow contexts created by this engine will forward
149    /// step output (shell stdout/stderr, agent system messages) to the
150    /// given sender.
151    pub fn set_log_sender(&mut self, sender: LogSender) {
152        self.log_sender = Some(sender);
153    }
154
155    /// Returns a reference to the backing store.
156    pub fn store(&self) -> &Arc<dyn Store> {
157        &self.store
158    }
159
160    /// Returns a reference to the agent provider.
161    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
162        &self.provider
163    }
164
165    /// Build a [`WorkflowContext`] with access to the handler registry.
166    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
167        let handlers = self.handlers.clone();
168        let resolver: crate::context::HandlerResolver =
169            Arc::new(move |name: &str| handlers.get(name).cloned());
170        let mut ctx = WorkflowContext::with_handler_resolver(
171            run_id,
172            self.store.clone(),
173            self.provider.clone(),
174            resolver,
175        );
176        if let Some(ref sender) = self.log_sender {
177            ctx.set_log_sender(sender.clone());
178        }
179        ctx
180    }
181
182    // -----------------------------------------------------------------------
183    // Handler registration
184    // -----------------------------------------------------------------------
185
186    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
187    ///
188    /// The handler is looked up by [`WorkflowHandler::name`] when executing
189    /// or enqueuing.
190    ///
191    /// # Errors
192    ///
193    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
194    /// name is already registered.
195    ///
196    /// # Examples
197    ///
198    /// ```no_run
199    /// use std::sync::Arc;
200    /// use ironflow_engine::engine::Engine;
201    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
202    /// use ironflow_engine::context::WorkflowContext;
203    /// use ironflow_engine::config::ShellConfig;
204    /// use ironflow_store::memory::InMemoryStore;
205    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
206    ///
207    /// struct MyWorkflow;
208    /// impl WorkflowHandler for MyWorkflow {
209    ///     fn name(&self) -> &str { "my-workflow" }
210    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
211    ///         Box::pin(async move {
212    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
213    ///             Ok(())
214    ///         })
215    ///     }
216    /// }
217    ///
218    /// let mut engine = Engine::new(
219    ///     Arc::new(InMemoryStore::new()),
220    ///     Arc::new(ClaudeCodeProvider::new()),
221    /// );
222    /// engine.register(MyWorkflow)?;
223    /// # Ok::<(), ironflow_engine::error::EngineError>(())
224    /// ```
225    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
226        let name = handler.name().to_string();
227        if self.handlers.contains_key(&name) {
228            return Err(EngineError::InvalidWorkflow(format!(
229                "handler '{}' already registered",
230                name
231            )));
232        }
233        if let Some(category) = handler.category() {
234            validate_category(&name, category)?;
235        }
236        self.handlers.insert(name, Arc::new(handler));
237        Ok(())
238    }
239
240    /// Register a pre-boxed workflow handler.
241    ///
242    /// # Errors
243    ///
244    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
245    /// name is already registered or if its category is invalid.
246    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
247        let name = handler.name().to_string();
248        if self.handlers.contains_key(&name) {
249            return Err(EngineError::InvalidWorkflow(format!(
250                "handler '{}' already registered",
251                name
252            )));
253        }
254        if let Some(category) = handler.category() {
255            validate_category(&name, category)?;
256        }
257        self.handlers.insert(name, Arc::from(handler));
258        Ok(())
259    }
260
261    /// Get a registered handler by name.
262    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
263        self.handlers.get(name)
264    }
265
266    /// List registered handler names.
267    pub fn handler_names(&self) -> Vec<&str> {
268        self.handlers.keys().map(|s| s.as_str()).collect()
269    }
270
271    /// Get detailed info about a registered workflow handler.
272    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
273        self.handlers.get(name).map(|h| h.describe())
274    }
275
276    /// Register an event subscriber for domain events.
277    ///
278    /// The subscriber is called only for events whose type is in
279    /// `event_types`. Pass [`Event::ALL`] to receive every event.
280    ///
281    /// # Examples
282    ///
283    /// ```no_run
284    /// use ironflow_engine::engine::Engine;
285    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
286    /// use ironflow_store::memory::InMemoryStore;
287    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
288    /// use std::sync::Arc;
289    ///
290    /// let mut engine = Engine::new(
291    ///     Arc::new(InMemoryStore::new()),
292    ///     Arc::new(ClaudeCodeProvider::new()),
293    /// );
294    ///
295    /// engine.subscribe(
296    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
297    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
298    /// );
299    /// ```
300    pub fn subscribe(
301        &mut self,
302        subscriber: impl EventSubscriber + 'static,
303        event_types: &[&'static str],
304    ) {
305        self.event_publisher.subscribe(subscriber, event_types);
306    }
307
308    /// Returns a reference to the event publisher.
309    ///
310    /// Useful for publishing events from outside the engine (e.g. auth
311    /// routes in the API layer).
312    pub fn event_publisher(&self) -> &EventPublisher {
313        &self.event_publisher
314    }
315
316    // -----------------------------------------------------------------------
317    // Dynamic workflow execution (WorkflowHandler)
318    // -----------------------------------------------------------------------
319
320    /// Execute a registered handler inline.
321    ///
322    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
323    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
324    ///
325    /// # Errors
326    ///
327    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
328    /// with that name. Returns [`EngineError`] if execution fails.
329    ///
330    /// # Examples
331    ///
332    /// ```no_run
333    /// use std::sync::Arc;
334    /// use ironflow_engine::engine::Engine;
335    /// use ironflow_store::memory::InMemoryStore;
336    /// use ironflow_store::models::TriggerKind;
337    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
338    /// use serde_json::json;
339    ///
340    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
341    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
342    /// # Ok(())
343    /// # }
344    /// ```
345    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
346    pub async fn run_handler(
347        &self,
348        handler_name: &str,
349        trigger: TriggerKind,
350        payload: Value,
351    ) -> Result<Run, EngineError> {
352        let handler = self
353            .handlers
354            .get(handler_name)
355            .ok_or_else(|| {
356                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
357            })?
358            .clone();
359
360        let handler_version = handler.version().map(str::to_string);
361        let run = self
362            .store
363            .create_run(NewRun {
364                workflow_name: handler_name.to_string(),
365                trigger,
366                payload,
367                max_retries: 0,
368                handler_version,
369                labels: handler.default_labels(),
370                scheduled_at: None,
371            })
372            .await?;
373
374        let run_id = run.id;
375        info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
376
377        self.store
378            .update_run_status(run_id, RunStatus::Running)
379            .await?;
380
381        #[cfg(feature = "prometheus")]
382        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
383
384        let run_start = Instant::now();
385        let mut ctx = self.build_context(run_id);
386
387        let result = handler.execute(&mut ctx).await;
388        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
389            .await
390    }
391
392    /// Enqueue a handler-based workflow for worker execution.
393    ///
394    /// The workflow name is stored in the run. The worker looks up the
395    /// handler by name when executing.
396    ///
397    /// # Errors
398    ///
399    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
400    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
401    pub async fn enqueue_handler(
402        &self,
403        handler_name: &str,
404        trigger: TriggerKind,
405        payload: Value,
406        max_retries: u32,
407    ) -> Result<Run, EngineError> {
408        self.enqueue_handler_with_options(
409            handler_name,
410            trigger,
411            payload,
412            max_retries,
413            HashMap::new(),
414            None,
415        )
416        .await
417    }
418
419    /// Enqueue a handler-based workflow with labels and optional deferred scheduling.
420    ///
421    /// # Errors
422    ///
423    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
424    #[tracing::instrument(name = "engine.enqueue_handler_with_options", skip_all, fields(workflow = %handler_name))]
425    pub async fn enqueue_handler_with_options(
426        &self,
427        handler_name: &str,
428        trigger: TriggerKind,
429        payload: Value,
430        max_retries: u32,
431        labels: HashMap<String, String>,
432        scheduled_at: Option<DateTime<Utc>>,
433    ) -> Result<Run, EngineError> {
434        let handler = self.handlers.get(handler_name).ok_or_else(|| {
435            EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
436        })?;
437
438        let handler_version = handler.version().map(str::to_string);
439        let mut merged_labels = handler.default_labels();
440        merged_labels.extend(labels);
441
442        let run = self
443            .store
444            .create_run(NewRun {
445                workflow_name: handler_name.to_string(),
446                trigger,
447                payload,
448                max_retries,
449                handler_version,
450                labels: merged_labels,
451                scheduled_at,
452            })
453            .await?;
454
455        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
456        Ok(run)
457    }
458
459    /// Execute a handler-based run (used by the worker after pick_next_pending).
460    ///
461    /// Looks up the handler by the run's `workflow_name` and executes it
462    /// with a fresh [`WorkflowContext`].
463    ///
464    /// # Errors
465    ///
466    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
467    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
468    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
469        let run = self
470            .store
471            .get_run(run_id)
472            .await?
473            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
474
475        let handler = self
476            .handlers
477            .get(&run.workflow_name)
478            .ok_or_else(|| {
479                EngineError::InvalidWorkflow(format!(
480                    "no handler registered: {}",
481                    run.workflow_name
482                ))
483            })?
484            .clone();
485
486        #[cfg(feature = "prometheus")]
487        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
488
489        let run_start = Instant::now();
490        let mut ctx = self.build_context(run_id);
491
492        let result = handler.execute(&mut ctx).await;
493        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
494            .await
495    }
496
497    /// Execute a run by its ID (used by the worker after pick_next_pending).
498    ///
499    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
500    ///
501    /// # Errors
502    ///
503    /// Returns [`EngineError`] if the run is not found or execution fails.
504    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
505    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
506        self.execute_handler_run(run_id).await
507    }
508
509    /// Resume a run after human approval.
510    ///
511    /// Re-executes the handler with step replay: completed steps return
512    /// cached output, approved approval steps are skipped, and execution
513    /// continues from the first unexecuted step.
514    ///
515    /// Supports multiple approval gates -- each resume replays all prior
516    /// steps and stops at the next approval (or completes the run).
517    ///
518    /// # Errors
519    ///
520    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
521    /// Returns [`EngineError`] if execution fails or hits another approval.
522    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
523    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
524        let run = self
525            .store
526            .get_run(run_id)
527            .await?
528            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
529
530        let handler = self
531            .handlers
532            .get(&run.workflow_name)
533            .ok_or_else(|| {
534                EngineError::InvalidWorkflow(format!(
535                    "no handler registered: {}",
536                    run.workflow_name
537                ))
538            })?
539            .clone();
540
541        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
542
543        let run_start = Instant::now();
544        let mut ctx = self.build_context(run_id);
545        ctx.load_replay_steps().await?;
546
547        let result = handler.execute(&mut ctx).await;
548        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
549            .await
550    }
551
552    /// Fail all non-terminal steps for a run.
553    ///
554    /// Called after a run is marked as failed (timeout, error, panic) to clean up
555    /// orphaned steps that are still in `Running`, `Pending`, or `AwaitingApproval`.
556    ///
557    /// - `Running` / `AwaitingApproval` steps are marked `Failed`.
558    /// - `Pending` steps are marked `Skipped` (FSM does not allow Pending -> Failed).
559    ///
560    /// Errors from individual step updates are logged but do not abort the cleanup.
561    ///
562    /// # Errors
563    ///
564    /// Returns [`EngineError`] if listing steps fails.
565    pub async fn fail_orphaned_steps(
566        &self,
567        run_id: Uuid,
568        error_message: &str,
569    ) -> Result<(), EngineError> {
570        let steps = self.store.list_steps(run_id).await?;
571        let now = Utc::now();
572
573        for step in steps {
574            if step.status.state.is_terminal() {
575                continue;
576            }
577
578            let (target_status, error) = match step.status.state {
579                StepStatus::Running | StepStatus::AwaitingApproval => {
580                    let err = if step.error.is_some() {
581                        None
582                    } else {
583                        Some(error_message.to_string())
584                    };
585                    (StepStatus::Failed, err)
586                }
587                StepStatus::Pending => (StepStatus::Skipped, None),
588                _ => continue,
589            };
590
591            if let Err(e) = self
592                .store
593                .update_step(
594                    step.id,
595                    StepUpdate {
596                        status: Some(target_status),
597                        error,
598                        completed_at: Some(now),
599                        ..StepUpdate::default()
600                    },
601                )
602                .await
603            {
604                warn!(
605                    run_id = %run_id,
606                    step_id = %step.id,
607                    step_name = %step.name,
608                    error = %e,
609                    "failed to cleanup orphaned step"
610                );
611            } else {
612                info!(
613                    run_id = %run_id,
614                    step_id = %step.id,
615                    step_name = %step.name,
616                    from = %step.status.state,
617                    to = %target_status,
618                    "cleaned up orphaned step"
619                );
620            }
621        }
622
623        Ok(())
624    }
625
626    /// Finalize a run with the given result and context.
627    ///
628    /// On success: updates run to Completed with cost, duration, and completed_at.
629    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
630    /// Always: fetches and returns the final Run.
631    async fn finalize_run(
632        &self,
633        run_id: Uuid,
634        workflow_name: &str,
635        result: Result<(), EngineError>,
636        ctx: &WorkflowContext,
637        run_start: Instant,
638    ) -> Result<Run, EngineError> {
639        let total_duration = run_start.elapsed().as_millis() as u64;
640        let completed_at = Utc::now();
641
642        let final_status;
643        let final_run;
644
645        match result {
646            Ok(()) => {
647                final_status = RunStatus::Completed;
648                final_run = self
649                    .store
650                    .update_run_returning(
651                        run_id,
652                        RunUpdate {
653                            status: Some(RunStatus::Completed),
654                            cost_usd: Some(ctx.total_cost_usd()),
655                            duration_ms: Some(total_duration),
656                            completed_at: Some(completed_at),
657                            ..RunUpdate::default()
658                        },
659                    )
660                    .await?;
661
662                info!(
663                    run_id = %run_id,
664                    cost_usd = %ctx.total_cost_usd(),
665                    duration_ms = total_duration,
666                    "run completed"
667                );
668            }
669            Err(EngineError::ApprovalRequired {
670                run_id: approval_run_id,
671                step_id,
672                ref message,
673            }) => {
674                final_status = RunStatus::AwaitingApproval;
675                final_run = self
676                    .store
677                    .update_run_returning(
678                        run_id,
679                        RunUpdate {
680                            status: Some(RunStatus::AwaitingApproval),
681                            cost_usd: Some(ctx.total_cost_usd()),
682                            duration_ms: Some(total_duration),
683                            ..RunUpdate::default()
684                        },
685                    )
686                    .await?;
687
688                info!(
689                    run_id = %approval_run_id,
690                    step_id = %step_id,
691                    message = %message,
692                    "run awaiting approval"
693                );
694            }
695            Err(err) => {
696                final_status = RunStatus::Failed;
697                if let Err(store_err) = self
698                    .store
699                    .update_run(
700                        run_id,
701                        RunUpdate {
702                            status: Some(RunStatus::Failed),
703                            error: Some(err.to_string()),
704                            cost_usd: Some(ctx.total_cost_usd()),
705                            duration_ms: Some(total_duration),
706                            completed_at: Some(completed_at),
707                            ..RunUpdate::default()
708                        },
709                    )
710                    .await
711                {
712                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
713                }
714
715                error!(run_id = %run_id, error = %err, "run failed");
716
717                self.publish_run_status_changed(
718                    workflow_name,
719                    run_id,
720                    final_status,
721                    Some(err.to_string()),
722                    ctx,
723                    total_duration,
724                );
725
726                #[cfg(feature = "prometheus")]
727                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
728
729                return Err(err);
730            }
731        }
732
733        self.publish_run_status_changed(
734            workflow_name,
735            run_id,
736            final_status,
737            None,
738            ctx,
739            total_duration,
740        );
741
742        #[cfg(feature = "prometheus")]
743        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
744
745        Ok(final_run)
746    }
747
748    /// Emit Prometheus metrics for a completed run.
749    #[cfg(feature = "prometheus")]
750    fn emit_run_metrics(
751        &self,
752        workflow_name: &str,
753        status: RunStatus,
754        duration_ms: u64,
755        ctx: &WorkflowContext,
756    ) {
757        let status_str = status.to_string();
758        let wf = workflow_name.to_string();
759
760        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
761        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
762            .record(duration_ms as f64 / 1000.0);
763        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
764            ctx.total_cost_usd()
765                .to_string()
766                .parse::<f64>()
767                .unwrap_or(0.0),
768        );
769        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
770    }
771
772    /// Publish a run status changed event to all registered subscribers.
773    ///
774    /// `from` is always `Running` because `finalize_run` is only called
775    /// from a running state.
776    fn publish_run_status_changed(
777        &self,
778        workflow_name: &str,
779        run_id: Uuid,
780        to: RunStatus,
781        error: Option<String>,
782        ctx: &WorkflowContext,
783        duration_ms: u64,
784    ) {
785        let now = Utc::now();
786        let cost_usd = ctx.total_cost_usd();
787        let wf = workflow_name.to_string();
788
789        self.event_publisher.publish(Event::RunStatusChanged {
790            run_id,
791            workflow_name: wf.clone(),
792            from: RunStatus::Running,
793            to,
794            error: error.clone(),
795            cost_usd,
796            duration_ms,
797            at: now,
798        });
799
800        if to == RunStatus::Failed {
801            self.event_publisher.publish(Event::RunFailed {
802                run_id,
803                workflow_name: wf,
804                error,
805                cost_usd,
806                duration_ms,
807                at: now,
808            });
809        }
810    }
811}
812
813impl fmt::Debug for Engine {
814    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815        f.debug_struct("Engine")
816            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
817            .finish_non_exhaustive()
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use super::*;
824    use crate::config::ShellConfig;
825    use crate::handler::{HandlerFuture, WorkflowHandler};
826    use ironflow_core::providers::claude::ClaudeCodeProvider;
827    use ironflow_core::providers::record_replay::RecordReplayProvider;
828    use ironflow_store::memory::InMemoryStore;
829    use ironflow_store::models::StepStatus;
830    use serde_json::json;
831
832    // Test handler that echoes a message via shell
833    struct EchoWorkflow;
834
835    impl WorkflowHandler for EchoWorkflow {
836        fn name(&self) -> &str {
837            "echo-workflow"
838        }
839
840        fn describe(&self) -> WorkflowInfo {
841            WorkflowInfo {
842                description: "A simple workflow that echoes hello".to_string(),
843                source_code: None,
844                sub_workflows: Vec::new(),
845                category: None,
846                version: self.version().map(str::to_string),
847                input_schema: None,
848                default_labels: HashMap::new(),
849            }
850        }
851
852        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
853            Box::pin(async move {
854                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
855                Ok(())
856            })
857        }
858    }
859
860    // Test handler that fails
861    struct FailingWorkflow;
862
863    impl WorkflowHandler for FailingWorkflow {
864        fn name(&self) -> &str {
865            "failing-workflow"
866        }
867
868        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
869            Box::pin(async move {
870                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
871                Ok(())
872            })
873        }
874    }
875
876    fn create_test_engine() -> Engine {
877        let store = Arc::new(InMemoryStore::new());
878        let inner = ClaudeCodeProvider::new();
879        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
880            inner,
881            "/tmp/ironflow-fixtures",
882        ));
883        Engine::new(store, provider)
884    }
885
886    #[test]
887    fn engine_new_creates_instance() {
888        let engine = create_test_engine();
889        assert_eq!(engine.handler_names().len(), 0);
890    }
891
892    #[test]
893    fn engine_register_handler() {
894        let mut engine = create_test_engine();
895        let result = engine.register(EchoWorkflow);
896        assert!(result.is_ok());
897        assert_eq!(engine.handler_names().len(), 1);
898        assert!(engine.handler_names().contains(&"echo-workflow"));
899    }
900
901    #[test]
902    fn engine_register_duplicate_returns_error() {
903        let mut engine = create_test_engine();
904        engine.register(EchoWorkflow).unwrap();
905        let result = engine.register(EchoWorkflow);
906        assert!(result.is_err());
907    }
908
909    #[test]
910    fn engine_get_handler_found() {
911        let mut engine = create_test_engine();
912        engine.register(EchoWorkflow).unwrap();
913        let handler = engine.get_handler("echo-workflow");
914        assert!(handler.is_some());
915    }
916
917    #[test]
918    fn engine_get_handler_not_found() {
919        let engine = create_test_engine();
920        let handler = engine.get_handler("nonexistent");
921        assert!(handler.is_none());
922    }
923
924    #[test]
925    fn engine_handler_names_lists_all() {
926        let mut engine = create_test_engine();
927        engine.register(EchoWorkflow).unwrap();
928        engine.register(FailingWorkflow).unwrap();
929        let names = engine.handler_names();
930        assert_eq!(names.len(), 2);
931        assert!(names.contains(&"echo-workflow"));
932        assert!(names.contains(&"failing-workflow"));
933    }
934
935    #[test]
936    fn engine_handler_info_returns_description() {
937        let mut engine = create_test_engine();
938        engine.register(EchoWorkflow).unwrap();
939        let info = engine.handler_info("echo-workflow");
940        assert!(info.is_some());
941        let info = info.unwrap();
942        assert_eq!(info.description, "A simple workflow that echoes hello");
943    }
944
945    struct CategorizedWorkflow;
946
947    impl WorkflowHandler for CategorizedWorkflow {
948        fn name(&self) -> &str {
949            "categorized"
950        }
951        fn category(&self) -> Option<&str> {
952            Some("data/etl")
953        }
954        fn execute<'a>(
955            &'a self,
956            _ctx: &'a mut WorkflowContext,
957        ) -> crate::handler::HandlerFuture<'a> {
958            Box::pin(async move { Ok(()) })
959        }
960    }
961
962    #[test]
963    fn engine_default_describe_propagates_category() {
964        let mut engine = create_test_engine();
965        engine.register(CategorizedWorkflow).unwrap();
966        let info = engine.handler_info("categorized").unwrap();
967        assert_eq!(info.category.as_deref(), Some("data/etl"));
968    }
969
970    #[test]
971    fn engine_default_describe_without_category() {
972        let mut engine = create_test_engine();
973        engine.register(EchoWorkflow).unwrap();
974        let info = engine.handler_info("echo-workflow").unwrap();
975        assert!(info.category.is_none());
976    }
977
978    struct BadCategoryWorkflow(&'static str);
979
980    impl WorkflowHandler for BadCategoryWorkflow {
981        fn name(&self) -> &str {
982            "bad-category"
983        }
984        fn category(&self) -> Option<&str> {
985            Some(self.0)
986        }
987        fn execute<'a>(
988            &'a self,
989            _ctx: &'a mut WorkflowContext,
990        ) -> crate::handler::HandlerFuture<'a> {
991            Box::pin(async move { Ok(()) })
992        }
993    }
994
995    #[test]
996    fn engine_register_rejects_empty_category() {
997        let mut engine = create_test_engine();
998        let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
999        match err {
1000            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
1001            other => panic!("expected InvalidWorkflow, got {other:?}"),
1002        }
1003    }
1004
1005    #[test]
1006    fn engine_register_rejects_leading_slash_category() {
1007        let mut engine = create_test_engine();
1008        let err = engine
1009            .register(BadCategoryWorkflow("/data/etl"))
1010            .unwrap_err();
1011        match err {
1012            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
1013            other => panic!("expected InvalidWorkflow, got {other:?}"),
1014        }
1015    }
1016
1017    #[test]
1018    fn engine_register_rejects_trailing_slash_category() {
1019        let mut engine = create_test_engine();
1020        let err = engine
1021            .register(BadCategoryWorkflow("data/etl/"))
1022            .unwrap_err();
1023        match err {
1024            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
1025            other => panic!("expected InvalidWorkflow, got {other:?}"),
1026        }
1027    }
1028
1029    #[test]
1030    fn engine_register_rejects_double_slash_category() {
1031        let mut engine = create_test_engine();
1032        let err = engine
1033            .register(BadCategoryWorkflow("data//etl"))
1034            .unwrap_err();
1035        match err {
1036            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
1037            other => panic!("expected InvalidWorkflow, got {other:?}"),
1038        }
1039    }
1040
1041    #[test]
1042    fn engine_register_rejects_whitespace_only_segment_category() {
1043        let mut engine = create_test_engine();
1044        let err = engine
1045            .register(BadCategoryWorkflow("data/ /etl"))
1046            .unwrap_err();
1047        match err {
1048            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
1049            other => panic!("expected InvalidWorkflow, got {other:?}"),
1050        }
1051    }
1052
1053    #[test]
1054    fn engine_register_accepts_valid_nested_category() {
1055        let mut engine = create_test_engine();
1056        assert!(engine.register(CategorizedWorkflow).is_ok());
1057    }
1058
1059    #[tokio::test]
1060    async fn engine_unknown_workflow_returns_error() {
1061        let engine = create_test_engine();
1062        let result = engine
1063            .run_handler("unknown", TriggerKind::Manual, json!({}))
1064            .await;
1065        assert!(result.is_err());
1066        match result {
1067            Err(EngineError::InvalidWorkflow(msg)) => {
1068                assert!(msg.contains("no handler registered"));
1069            }
1070            _ => panic!("expected InvalidWorkflow error"),
1071        }
1072    }
1073
1074    #[tokio::test]
1075    async fn engine_enqueue_handler_creates_pending_run() {
1076        let mut engine = create_test_engine();
1077        engine.register(EchoWorkflow).unwrap();
1078
1079        let run = engine
1080            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
1081            .await
1082            .unwrap();
1083        assert_eq!(run.status.state, RunStatus::Pending);
1084        assert_eq!(run.workflow_name, "echo-workflow");
1085    }
1086
1087    #[tokio::test]
1088    async fn engine_register_boxed() {
1089        let mut engine = create_test_engine();
1090        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
1091        let result = engine.register_boxed(handler);
1092        assert!(result.is_ok());
1093        assert_eq!(engine.handler_names().len(), 1);
1094    }
1095
1096    #[tokio::test]
1097    async fn engine_store_and_provider_accessors() {
1098        let store = Arc::new(InMemoryStore::new());
1099        let inner = ClaudeCodeProvider::new();
1100        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
1101            inner,
1102            "/tmp/ironflow-fixtures",
1103        ));
1104        let engine = Engine::new(store.clone(), provider.clone());
1105
1106        // Verify accessors return references
1107        let _ = engine.store();
1108        let _ = engine.provider();
1109    }
1110
1111    // -----------------------------------------------------------------------
1112    // Operation trait tests
1113    // -----------------------------------------------------------------------
1114
1115    use crate::operation::Operation;
1116    use ironflow_store::models::StepKind;
1117    use std::future::Future;
1118    use std::pin::Pin;
1119
1120    struct FakeGitlabOp {
1121        project_id: u64,
1122        title: String,
1123    }
1124
1125    impl Operation for FakeGitlabOp {
1126        fn kind(&self) -> &str {
1127            "gitlab"
1128        }
1129
1130        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1131            Box::pin(async move {
1132                Ok(json!({
1133                    "issue_id": 42,
1134                    "project_id": self.project_id,
1135                    "title": self.title,
1136                }))
1137            })
1138        }
1139
1140        fn input(&self) -> Option<Value> {
1141            Some(json!({
1142                "project_id": self.project_id,
1143                "title": self.title,
1144            }))
1145        }
1146    }
1147
1148    struct FailingOp;
1149
1150    impl Operation for FailingOp {
1151        fn kind(&self) -> &str {
1152            "broken-service"
1153        }
1154
1155        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1156            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1157        }
1158    }
1159
1160    struct OperationWorkflow;
1161
1162    impl WorkflowHandler for OperationWorkflow {
1163        fn name(&self) -> &str {
1164            "operation-workflow"
1165        }
1166
1167        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1168            Box::pin(async move {
1169                let op = FakeGitlabOp {
1170                    project_id: 123,
1171                    title: "Bug report".to_string(),
1172                };
1173                ctx.operation("create-issue", &op).await?;
1174                Ok(())
1175            })
1176        }
1177    }
1178
1179    struct FailingOperationWorkflow;
1180
1181    impl WorkflowHandler for FailingOperationWorkflow {
1182        fn name(&self) -> &str {
1183            "failing-operation-workflow"
1184        }
1185
1186        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1187            Box::pin(async move {
1188                ctx.operation("broken-call", &FailingOp).await?;
1189                Ok(())
1190            })
1191        }
1192    }
1193
1194    struct MixedWorkflow;
1195
1196    impl WorkflowHandler for MixedWorkflow {
1197        fn name(&self) -> &str {
1198            "mixed-workflow"
1199        }
1200
1201        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1202            Box::pin(async move {
1203                ctx.shell("build", ShellConfig::new("echo built")).await?;
1204                let op = FakeGitlabOp {
1205                    project_id: 456,
1206                    title: "Deploy done".to_string(),
1207                };
1208                let result = ctx.operation("notify-gitlab", &op).await?;
1209                assert_eq!(result.output["issue_id"], 42);
1210                Ok(())
1211            })
1212        }
1213    }
1214
1215    #[tokio::test]
1216    async fn operation_step_happy_path() {
1217        let mut engine = create_test_engine();
1218        engine.register(OperationWorkflow).unwrap();
1219
1220        let run = engine
1221            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1222            .await
1223            .unwrap();
1224
1225        assert_eq!(run.status.state, RunStatus::Completed);
1226
1227        let steps = engine.store().list_steps(run.id).await.unwrap();
1228
1229        assert_eq!(steps.len(), 1);
1230        assert_eq!(steps[0].name, "create-issue");
1231        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1232        assert_eq!(
1233            steps[0].status.state,
1234            ironflow_store::models::StepStatus::Completed
1235        );
1236
1237        let output = steps[0].output.as_ref().unwrap();
1238        assert_eq!(output["issue_id"], 42);
1239        assert_eq!(output["project_id"], 123);
1240
1241        let input = steps[0].input.as_ref().unwrap();
1242        assert_eq!(input["project_id"], 123);
1243        assert_eq!(input["title"], "Bug report");
1244    }
1245
1246    #[tokio::test]
1247    async fn operation_step_failure_marks_run_failed() {
1248        let mut engine = create_test_engine();
1249        engine.register(FailingOperationWorkflow).unwrap();
1250
1251        let result = engine
1252            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1253            .await;
1254
1255        assert!(result.is_err());
1256    }
1257
1258    #[tokio::test]
1259    async fn operation_mixed_with_shell_steps() {
1260        let mut engine = create_test_engine();
1261        engine.register(MixedWorkflow).unwrap();
1262
1263        let run = engine
1264            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1265            .await
1266            .unwrap();
1267
1268        assert_eq!(run.status.state, RunStatus::Completed);
1269
1270        let steps = engine.store().list_steps(run.id).await.unwrap();
1271
1272        assert_eq!(steps.len(), 2);
1273        assert_eq!(steps[0].kind, StepKind::Shell);
1274        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1275        assert_eq!(steps[0].position, 0);
1276        assert_eq!(steps[1].position, 1);
1277    }
1278
1279    // -----------------------------------------------------------------------
1280    // Approval + resume tests
1281    // -----------------------------------------------------------------------
1282
1283    use crate::config::ApprovalConfig;
1284
1285    struct SingleApprovalWorkflow;
1286
1287    impl WorkflowHandler for SingleApprovalWorkflow {
1288        fn name(&self) -> &str {
1289            "single-approval"
1290        }
1291
1292        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1293            Box::pin(async move {
1294                ctx.shell("build", ShellConfig::new("echo built")).await?;
1295                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1296                ctx.shell("deploy", ShellConfig::new("echo deployed"))
1297                    .await?;
1298                Ok(())
1299            })
1300        }
1301    }
1302
1303    struct DoubleApprovalWorkflow;
1304
1305    impl WorkflowHandler for DoubleApprovalWorkflow {
1306        fn name(&self) -> &str {
1307            "double-approval"
1308        }
1309
1310        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1311            Box::pin(async move {
1312                ctx.shell("build", ShellConfig::new("echo built")).await?;
1313                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1314                    .await?;
1315                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1316                    .await?;
1317                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1318                    .await?;
1319                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1320                    .await?;
1321                Ok(())
1322            })
1323        }
1324    }
1325
1326    #[tokio::test]
1327    async fn approval_pauses_run() {
1328        let mut engine = create_test_engine();
1329        engine.register(SingleApprovalWorkflow).unwrap();
1330
1331        let run = engine
1332            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1333            .await
1334            .unwrap();
1335
1336        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1337
1338        let steps = engine.store().list_steps(run.id).await.unwrap();
1339        assert_eq!(steps.len(), 2); // build + approval gate
1340        assert_eq!(steps[0].kind, StepKind::Shell);
1341        assert_eq!(steps[0].status.state, StepStatus::Completed);
1342        assert_eq!(steps[1].kind, StepKind::Approval);
1343        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1344    }
1345
1346    #[tokio::test]
1347    async fn approval_resume_completes_run() {
1348        let mut engine = create_test_engine();
1349        engine.register(SingleApprovalWorkflow).unwrap();
1350
1351        // First execution: pauses at approval
1352        let run = engine
1353            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1354            .await
1355            .unwrap();
1356        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1357
1358        // Simulate approval: transition to Running
1359        engine
1360            .store()
1361            .update_run_status(run.id, RunStatus::Running)
1362            .await
1363            .unwrap();
1364
1365        // Resume: replays build, skips approval, executes deploy
1366        let resumed = engine.resume_run(run.id).await.unwrap();
1367        assert_eq!(resumed.status.state, RunStatus::Completed);
1368
1369        let steps = engine.store().list_steps(run.id).await.unwrap();
1370        assert_eq!(steps.len(), 3); // build + approval + deploy
1371        assert_eq!(steps[0].name, "build");
1372        assert_eq!(steps[0].status.state, StepStatus::Completed);
1373        assert_eq!(steps[1].name, "gate");
1374        assert_eq!(steps[1].kind, StepKind::Approval);
1375        assert_eq!(steps[1].status.state, StepStatus::Completed);
1376        assert_eq!(steps[2].name, "deploy");
1377        assert_eq!(steps[2].status.state, StepStatus::Completed);
1378    }
1379
1380    #[tokio::test]
1381    async fn double_approval_two_resumes() {
1382        let mut engine = create_test_engine();
1383        engine.register(DoubleApprovalWorkflow).unwrap();
1384
1385        // First execution: pauses at staging-gate
1386        let run = engine
1387            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1388            .await
1389            .unwrap();
1390        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1391
1392        let steps = engine.store().list_steps(run.id).await.unwrap();
1393        assert_eq!(steps.len(), 2); // build + staging-gate
1394
1395        // First approval
1396        engine
1397            .store()
1398            .update_run_status(run.id, RunStatus::Running)
1399            .await
1400            .unwrap();
1401
1402        let resumed = engine.resume_run(run.id).await.unwrap();
1403        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1404
1405        let steps = engine.store().list_steps(run.id).await.unwrap();
1406        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1407
1408        // Second approval
1409        engine
1410            .store()
1411            .update_run_status(run.id, RunStatus::Running)
1412            .await
1413            .unwrap();
1414
1415        let final_run = engine.resume_run(run.id).await.unwrap();
1416        assert_eq!(final_run.status.state, RunStatus::Completed);
1417
1418        let steps = engine.store().list_steps(run.id).await.unwrap();
1419        assert_eq!(steps.len(), 5);
1420        assert_eq!(steps[0].name, "build");
1421        assert_eq!(steps[1].name, "staging-gate");
1422        assert_eq!(steps[2].name, "deploy-staging");
1423        assert_eq!(steps[3].name, "prod-gate");
1424        assert_eq!(steps[4].name, "deploy-prod");
1425
1426        for step in &steps {
1427            assert_eq!(step.status.state, StepStatus::Completed);
1428        }
1429    }
1430
1431    // -----------------------------------------------------------------------
1432    // fail_orphaned_steps tests
1433    // -----------------------------------------------------------------------
1434
1435    use ironflow_store::models::{NewStep, StepUpdate};
1436
1437    async fn create_step_with_status(
1438        store: &Arc<dyn Store>,
1439        run_id: Uuid,
1440        name: &str,
1441        position: u32,
1442        status: StepStatus,
1443    ) -> ironflow_store::models::Step {
1444        let step = store
1445            .create_step(NewStep {
1446                run_id,
1447                name: name.to_string(),
1448                kind: StepKind::Shell,
1449                position,
1450                input: None,
1451            })
1452            .await
1453            .unwrap();
1454
1455        match status {
1456            StepStatus::Pending => {}
1457            StepStatus::Running => {
1458                store
1459                    .update_step(
1460                        step.id,
1461                        StepUpdate {
1462                            status: Some(StepStatus::Running),
1463                            ..StepUpdate::default()
1464                        },
1465                    )
1466                    .await
1467                    .unwrap();
1468            }
1469            StepStatus::Completed => {
1470                store
1471                    .update_step(
1472                        step.id,
1473                        StepUpdate {
1474                            status: Some(StepStatus::Running),
1475                            ..StepUpdate::default()
1476                        },
1477                    )
1478                    .await
1479                    .unwrap();
1480                store
1481                    .update_step(
1482                        step.id,
1483                        StepUpdate {
1484                            status: Some(StepStatus::Completed),
1485                            ..StepUpdate::default()
1486                        },
1487                    )
1488                    .await
1489                    .unwrap();
1490            }
1491            StepStatus::AwaitingApproval => {
1492                store
1493                    .update_step(
1494                        step.id,
1495                        StepUpdate {
1496                            status: Some(StepStatus::Running),
1497                            ..StepUpdate::default()
1498                        },
1499                    )
1500                    .await
1501                    .unwrap();
1502                store
1503                    .update_step(
1504                        step.id,
1505                        StepUpdate {
1506                            status: Some(StepStatus::AwaitingApproval),
1507                            ..StepUpdate::default()
1508                        },
1509                    )
1510                    .await
1511                    .unwrap();
1512            }
1513            _ => panic!("unsupported status for test helper: {status}"),
1514        }
1515
1516        store.get_step(step.id).await.unwrap().unwrap()
1517    }
1518
1519    #[tokio::test]
1520    async fn fail_orphaned_steps_marks_running_as_failed() {
1521        let engine = create_test_engine();
1522        let run = engine
1523            .store()
1524            .create_run(NewRun {
1525                workflow_name: "test".to_string(),
1526                trigger: TriggerKind::Manual,
1527                payload: json!({}),
1528                max_retries: 0,
1529                handler_version: None,
1530                labels: HashMap::new(),
1531                scheduled_at: None,
1532            })
1533            .await
1534            .unwrap();
1535
1536        let step = create_step_with_status(
1537            engine.store(),
1538            run.id,
1539            "running-step",
1540            0,
1541            StepStatus::Running,
1542        )
1543        .await;
1544
1545        engine
1546            .fail_orphaned_steps(run.id, "parent run timed out")
1547            .await
1548            .unwrap();
1549
1550        let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1551        assert_eq!(updated.status.state, StepStatus::Failed);
1552        assert_eq!(updated.error.as_deref(), Some("parent run timed out"));
1553        assert!(updated.completed_at.is_some());
1554    }
1555
1556    #[tokio::test]
1557    async fn fail_orphaned_steps_marks_pending_as_skipped() {
1558        let engine = create_test_engine();
1559        let run = engine
1560            .store()
1561            .create_run(NewRun {
1562                workflow_name: "test".to_string(),
1563                trigger: TriggerKind::Manual,
1564                payload: json!({}),
1565                max_retries: 0,
1566                handler_version: None,
1567                labels: HashMap::new(),
1568                scheduled_at: None,
1569            })
1570            .await
1571            .unwrap();
1572
1573        let step = create_step_with_status(
1574            engine.store(),
1575            run.id,
1576            "pending-step",
1577            0,
1578            StepStatus::Pending,
1579        )
1580        .await;
1581
1582        engine
1583            .fail_orphaned_steps(run.id, "parent run timed out")
1584            .await
1585            .unwrap();
1586
1587        let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1588        assert_eq!(updated.status.state, StepStatus::Skipped);
1589        assert!(updated.error.is_none());
1590        assert!(updated.completed_at.is_some());
1591    }
1592
1593    #[tokio::test]
1594    async fn fail_orphaned_steps_marks_awaiting_approval_as_failed() {
1595        let engine = create_test_engine();
1596        let run = engine
1597            .store()
1598            .create_run(NewRun {
1599                workflow_name: "test".to_string(),
1600                trigger: TriggerKind::Manual,
1601                payload: json!({}),
1602                max_retries: 0,
1603                handler_version: None,
1604                labels: HashMap::new(),
1605                scheduled_at: None,
1606            })
1607            .await
1608            .unwrap();
1609
1610        let step = create_step_with_status(
1611            engine.store(),
1612            run.id,
1613            "approval-step",
1614            0,
1615            StepStatus::AwaitingApproval,
1616        )
1617        .await;
1618
1619        engine
1620            .fail_orphaned_steps(run.id, "parent run timed out")
1621            .await
1622            .unwrap();
1623
1624        let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1625        assert_eq!(updated.status.state, StepStatus::Failed);
1626        assert_eq!(updated.error.as_deref(), Some("parent run timed out"));
1627        assert!(updated.completed_at.is_some());
1628    }
1629
1630    #[tokio::test]
1631    async fn fail_orphaned_steps_skips_terminal_steps() {
1632        let engine = create_test_engine();
1633        let run = engine
1634            .store()
1635            .create_run(NewRun {
1636                workflow_name: "test".to_string(),
1637                trigger: TriggerKind::Manual,
1638                payload: json!({}),
1639                max_retries: 0,
1640                handler_version: None,
1641                labels: HashMap::new(),
1642                scheduled_at: None,
1643            })
1644            .await
1645            .unwrap();
1646
1647        let completed_step =
1648            create_step_with_status(engine.store(), run.id, "done", 0, StepStatus::Completed).await;
1649        let running_step =
1650            create_step_with_status(engine.store(), run.id, "in-flight", 1, StepStatus::Running)
1651                .await;
1652
1653        engine
1654            .fail_orphaned_steps(run.id, "parent run timed out")
1655            .await
1656            .unwrap();
1657
1658        let completed = engine
1659            .store()
1660            .get_step(completed_step.id)
1661            .await
1662            .unwrap()
1663            .unwrap();
1664        assert_eq!(completed.status.state, StepStatus::Completed);
1665
1666        let failed = engine
1667            .store()
1668            .get_step(running_step.id)
1669            .await
1670            .unwrap()
1671            .unwrap();
1672        assert_eq!(failed.status.state, StepStatus::Failed);
1673    }
1674
1675    #[tokio::test]
1676    async fn fail_orphaned_steps_mixed_states() {
1677        let engine = create_test_engine();
1678        let run = engine
1679            .store()
1680            .create_run(NewRun {
1681                workflow_name: "test".to_string(),
1682                trigger: TriggerKind::Manual,
1683                payload: json!({}),
1684                max_retries: 0,
1685                handler_version: None,
1686                labels: HashMap::new(),
1687                scheduled_at: None,
1688            })
1689            .await
1690            .unwrap();
1691
1692        let s_completed =
1693            create_step_with_status(engine.store(), run.id, "step-1", 0, StepStatus::Completed)
1694                .await;
1695        let s_running =
1696            create_step_with_status(engine.store(), run.id, "step-2", 1, StepStatus::Running).await;
1697        let s_pending =
1698            create_step_with_status(engine.store(), run.id, "step-3", 2, StepStatus::Pending).await;
1699
1700        engine.fail_orphaned_steps(run.id, "timeout").await.unwrap();
1701
1702        let r_completed = engine
1703            .store()
1704            .get_step(s_completed.id)
1705            .await
1706            .unwrap()
1707            .unwrap();
1708        assert_eq!(r_completed.status.state, StepStatus::Completed);
1709
1710        let r_running = engine
1711            .store()
1712            .get_step(s_running.id)
1713            .await
1714            .unwrap()
1715            .unwrap();
1716        assert_eq!(r_running.status.state, StepStatus::Failed);
1717        assert_eq!(r_running.error.as_deref(), Some("timeout"));
1718
1719        let r_pending = engine
1720            .store()
1721            .get_step(s_pending.id)
1722            .await
1723            .unwrap()
1724            .unwrap();
1725        assert_eq!(r_pending.status.state, StepStatus::Skipped);
1726        assert!(r_pending.error.is_none());
1727    }
1728
1729    #[tokio::test]
1730    async fn fail_orphaned_steps_no_steps_is_noop() {
1731        let engine = create_test_engine();
1732        let run = engine
1733            .store()
1734            .create_run(NewRun {
1735                workflow_name: "test".to_string(),
1736                trigger: TriggerKind::Manual,
1737                payload: json!({}),
1738                max_retries: 0,
1739                handler_version: None,
1740                labels: HashMap::new(),
1741                scheduled_at: None,
1742            })
1743            .await
1744            .unwrap();
1745
1746        let result = engine.fail_orphaned_steps(run.id, "timeout").await;
1747        assert!(result.is_ok());
1748    }
1749
1750    #[tokio::test]
1751    async fn fail_orphaned_steps_preserves_existing_error() {
1752        let engine = create_test_engine();
1753        let run = engine
1754            .store()
1755            .create_run(NewRun {
1756                workflow_name: "test".to_string(),
1757                trigger: TriggerKind::Manual,
1758                payload: json!({}),
1759                max_retries: 0,
1760                handler_version: None,
1761                labels: HashMap::new(),
1762                scheduled_at: None,
1763            })
1764            .await
1765            .unwrap();
1766
1767        let step_with_error = create_step_with_status(
1768            engine.store(),
1769            run.id,
1770            "already-errored",
1771            0,
1772            StepStatus::Running,
1773        )
1774        .await;
1775
1776        engine
1777            .store()
1778            .update_step(
1779                step_with_error.id,
1780                StepUpdate {
1781                    error: Some("real error from provider".to_string()),
1782                    ..StepUpdate::default()
1783                },
1784            )
1785            .await
1786            .unwrap();
1787
1788        let step_no_error = create_step_with_status(
1789            engine.store(),
1790            run.id,
1791            "no-error-yet",
1792            1,
1793            StepStatus::Running,
1794        )
1795        .await;
1796
1797        engine
1798            .fail_orphaned_steps(run.id, "parent run failed")
1799            .await
1800            .unwrap();
1801
1802        let updated_with = engine
1803            .store()
1804            .get_step(step_with_error.id)
1805            .await
1806            .unwrap()
1807            .unwrap();
1808        assert_eq!(updated_with.status.state, StepStatus::Failed);
1809        assert_eq!(
1810            updated_with.error.as_deref(),
1811            Some("real error from provider"),
1812        );
1813
1814        let updated_without = engine
1815            .store()
1816            .get_step(step_no_error.id)
1817            .await
1818            .unwrap()
1819            .unwrap();
1820        assert_eq!(updated_without.status.state, StepStatus::Failed);
1821        assert_eq!(updated_without.error.as_deref(), Some("parent run failed"),);
1822    }
1823}