Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::{DateTime, Utc};
15use serde_json::Value;
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{
24    NewRun, Run, RunStatus, RunUpdate, StepStatus, StepUpdate, TriggerKind,
25};
26use ironflow_store::store::Store;
27#[cfg(feature = "prometheus")]
28use metrics::{counter, gauge, histogram};
29
30use crate::context::WorkflowContext;
31use crate::error::EngineError;
32use crate::handler::{WorkflowHandler, WorkflowInfo};
33use crate::log_sender::LogSender;
34use crate::notify::{Event, EventPublisher, EventSubscriber};
35
36/// The workflow orchestration engine.
37///
38/// Holds references to the store, agent provider, and a registry of
39/// [`WorkflowHandler`]s.
40///
41/// # Examples
42///
43/// ```no_run
44/// use std::sync::Arc;
45/// use ironflow_engine::engine::Engine;
46/// use ironflow_engine::config::ShellConfig;
47/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
48/// use ironflow_engine::context::WorkflowContext;
49/// use ironflow_store::memory::InMemoryStore;
50/// use ironflow_store::models::TriggerKind;
51/// use ironflow_core::providers::claude::ClaudeCodeProvider;
52/// use serde_json::json;
53///
54/// struct CiWorkflow;
55/// impl WorkflowHandler for CiWorkflow {
56///     fn name(&self) -> &str { "ci" }
57///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
58///         Box::pin(async move {
59///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
60///             Ok(())
61///         })
62///     }
63/// }
64///
65/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
66/// let store = Arc::new(InMemoryStore::new());
67/// let provider = Arc::new(ClaudeCodeProvider::new());
68/// let mut engine = Engine::new(store, provider);
69/// engine.register(CiWorkflow)?;
70///
71/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
72/// println!("Run {} completed with status {:?}", run.id, run.status);
73/// # Ok(())
74/// # }
75/// ```
76pub struct Engine {
77    store: Arc<dyn Store>,
78    provider: Arc<dyn AgentProvider>,
79    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
80    event_publisher: EventPublisher,
81    log_sender: Option<LogSender>,
82}
83
84/// Validate a workflow category path.
85///
86/// A category is a `/`-separated list of non-empty segments. This function
87/// rejects empty paths, leading or trailing `/`, consecutive `/`, and
88/// segments containing only whitespace.
89///
90/// # Errors
91///
92/// Returns [`EngineError::InvalidWorkflow`] when the category is malformed.
93fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
94    let reject = |reason: &str| {
95        Err(EngineError::InvalidWorkflow(format!(
96            "handler '{handler_name}' has invalid category '{category}': {reason}"
97        )))
98    };
99
100    if category.is_empty() {
101        return reject("empty category");
102    }
103    if category.starts_with('/') {
104        return reject("leading '/'");
105    }
106    if category.ends_with('/') {
107        return reject("trailing '/'");
108    }
109    for segment in category.split('/') {
110        if segment.is_empty() {
111            return reject("empty segment (double '/')");
112        }
113        if segment.trim().is_empty() {
114            return reject("whitespace-only segment");
115        }
116    }
117    Ok(())
118}
119
120impl Engine {
121    /// Create a new engine with the given store and agent provider.
122    ///
123    /// # Examples
124    ///
125    /// ```no_run
126    /// use std::sync::Arc;
127    /// use ironflow_engine::engine::Engine;
128    /// use ironflow_store::memory::InMemoryStore;
129    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
130    ///
131    /// let engine = Engine::new(
132    ///     Arc::new(InMemoryStore::new()),
133    ///     Arc::new(ClaudeCodeProvider::new()),
134    /// );
135    /// ```
136    pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
137        Self {
138            store,
139            provider,
140            handlers: HashMap::new(),
141            event_publisher: EventPublisher::new(),
142            log_sender: None,
143        }
144    }
145
146    /// Attach a log sender for real-time step output streaming.
147    ///
148    /// When set, all workflow contexts created by this engine will forward
149    /// step output (shell stdout/stderr, agent system messages) to the
150    /// given sender.
151    pub fn set_log_sender(&mut self, sender: LogSender) {
152        self.log_sender = Some(sender);
153    }
154
155    /// Returns a reference to the backing store.
156    pub fn store(&self) -> &Arc<dyn Store> {
157        &self.store
158    }
159
160    /// Returns a reference to the agent provider.
161    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
162        &self.provider
163    }
164
165    /// Build a [`WorkflowContext`] with access to the handler registry.
166    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
167        let handlers = self.handlers.clone();
168        let resolver: crate::context::HandlerResolver =
169            Arc::new(move |name: &str| handlers.get(name).cloned());
170        let mut ctx = WorkflowContext::with_handler_resolver(
171            run_id,
172            self.store.clone(),
173            self.provider.clone(),
174            resolver,
175        );
176        if let Some(ref sender) = self.log_sender {
177            ctx.set_log_sender(sender.clone());
178        }
179        ctx
180    }
181
182    // -----------------------------------------------------------------------
183    // Handler registration
184    // -----------------------------------------------------------------------
185
186    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
187    ///
188    /// The handler is looked up by [`WorkflowHandler::name`] when executing
189    /// or enqueuing.
190    ///
191    /// # Errors
192    ///
193    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
194    /// name is already registered.
195    ///
196    /// # Examples
197    ///
198    /// ```no_run
199    /// use std::sync::Arc;
200    /// use ironflow_engine::engine::Engine;
201    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
202    /// use ironflow_engine::context::WorkflowContext;
203    /// use ironflow_engine::config::ShellConfig;
204    /// use ironflow_store::memory::InMemoryStore;
205    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
206    ///
207    /// struct MyWorkflow;
208    /// impl WorkflowHandler for MyWorkflow {
209    ///     fn name(&self) -> &str { "my-workflow" }
210    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
211    ///         Box::pin(async move {
212    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
213    ///             Ok(())
214    ///         })
215    ///     }
216    /// }
217    ///
218    /// let mut engine = Engine::new(
219    ///     Arc::new(InMemoryStore::new()),
220    ///     Arc::new(ClaudeCodeProvider::new()),
221    /// );
222    /// engine.register(MyWorkflow)?;
223    /// # Ok::<(), ironflow_engine::error::EngineError>(())
224    /// ```
225    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
226        let name = handler.name().to_string();
227        if self.handlers.contains_key(&name) {
228            return Err(EngineError::InvalidWorkflow(format!(
229                "handler '{}' already registered",
230                name
231            )));
232        }
233        if let Some(category) = handler.category() {
234            validate_category(&name, category)?;
235        }
236        self.handlers.insert(name, Arc::new(handler));
237        Ok(())
238    }
239
240    /// Register a pre-boxed workflow handler.
241    ///
242    /// # Errors
243    ///
244    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
245    /// name is already registered or if its category is invalid.
246    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
247        let name = handler.name().to_string();
248        if self.handlers.contains_key(&name) {
249            return Err(EngineError::InvalidWorkflow(format!(
250                "handler '{}' already registered",
251                name
252            )));
253        }
254        if let Some(category) = handler.category() {
255            validate_category(&name, category)?;
256        }
257        self.handlers.insert(name, Arc::from(handler));
258        Ok(())
259    }
260
261    /// Get a registered handler by name.
262    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
263        self.handlers.get(name)
264    }
265
266    /// List registered handler names.
267    pub fn handler_names(&self) -> Vec<&str> {
268        self.handlers.keys().map(|s| s.as_str()).collect()
269    }
270
271    /// Get detailed info about a registered workflow handler.
272    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
273        self.handlers.get(name).map(|h| h.describe())
274    }
275
276    /// Register an event subscriber for domain events.
277    ///
278    /// The subscriber is called only for events whose type is in
279    /// `event_types`. Pass [`Event::ALL`] to receive every event.
280    ///
281    /// # Examples
282    ///
283    /// ```no_run
284    /// use ironflow_engine::engine::Engine;
285    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
286    /// use ironflow_store::memory::InMemoryStore;
287    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
288    /// use std::sync::Arc;
289    ///
290    /// let mut engine = Engine::new(
291    ///     Arc::new(InMemoryStore::new()),
292    ///     Arc::new(ClaudeCodeProvider::new()),
293    /// );
294    ///
295    /// engine.subscribe(
296    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
297    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
298    /// );
299    /// ```
300    pub fn subscribe(
301        &mut self,
302        subscriber: impl EventSubscriber + 'static,
303        event_types: &[&'static str],
304    ) {
305        self.event_publisher.subscribe(subscriber, event_types);
306    }
307
308    /// Returns a reference to the event publisher.
309    ///
310    /// Useful for publishing events from outside the engine (e.g. auth
311    /// routes in the API layer).
312    pub fn event_publisher(&self) -> &EventPublisher {
313        &self.event_publisher
314    }
315
316    // -----------------------------------------------------------------------
317    // Dynamic workflow execution (WorkflowHandler)
318    // -----------------------------------------------------------------------
319
320    /// Execute a registered handler inline.
321    ///
322    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
323    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
324    ///
325    /// # Errors
326    ///
327    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
328    /// with that name. Returns [`EngineError`] if execution fails.
329    ///
330    /// # Examples
331    ///
332    /// ```no_run
333    /// use std::sync::Arc;
334    /// use ironflow_engine::engine::Engine;
335    /// use ironflow_store::memory::InMemoryStore;
336    /// use ironflow_store::models::TriggerKind;
337    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
338    /// use serde_json::json;
339    ///
340    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
341    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
342    /// # Ok(())
343    /// # }
344    /// ```
345    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
346    pub async fn run_handler(
347        &self,
348        handler_name: &str,
349        trigger: TriggerKind,
350        payload: Value,
351    ) -> Result<Run, EngineError> {
352        let handler = self
353            .handlers
354            .get(handler_name)
355            .ok_or_else(|| {
356                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
357            })?
358            .clone();
359
360        let handler_version = handler.version().map(str::to_string);
361        let run = self
362            .store
363            .create_run(NewRun {
364                workflow_name: handler_name.to_string(),
365                trigger,
366                payload,
367                max_retries: 0,
368                handler_version,
369                labels: handler.default_labels(),
370                scheduled_at: None,
371            })
372            .await?;
373
374        let run_id = run.id;
375        info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
376
377        self.store
378            .update_run_status(run_id, RunStatus::Running)
379            .await?;
380
381        #[cfg(feature = "prometheus")]
382        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
383
384        let run_start = Instant::now();
385        let mut ctx = self.build_context(run_id);
386
387        let result = handler.execute(&mut ctx).await;
388        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
389            .await
390    }
391
392    /// Enqueue a handler-based workflow for worker execution.
393    ///
394    /// The workflow name is stored in the run. The worker looks up the
395    /// handler by name when executing.
396    ///
397    /// # Errors
398    ///
399    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
400    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
401    pub async fn enqueue_handler(
402        &self,
403        handler_name: &str,
404        trigger: TriggerKind,
405        payload: Value,
406        max_retries: u32,
407    ) -> Result<Run, EngineError> {
408        self.enqueue_handler_with_options(
409            handler_name,
410            trigger,
411            payload,
412            max_retries,
413            HashMap::new(),
414            None,
415        )
416        .await
417    }
418
419    /// Enqueue a handler-based workflow with labels and optional deferred scheduling.
420    ///
421    /// # Errors
422    ///
423    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
424    #[tracing::instrument(name = "engine.enqueue_handler_with_options", skip_all, fields(workflow = %handler_name))]
425    pub async fn enqueue_handler_with_options(
426        &self,
427        handler_name: &str,
428        trigger: TriggerKind,
429        payload: Value,
430        max_retries: u32,
431        labels: HashMap<String, String>,
432        scheduled_at: Option<DateTime<Utc>>,
433    ) -> Result<Run, EngineError> {
434        let handler = self.handlers.get(handler_name).ok_or_else(|| {
435            EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
436        })?;
437
438        let handler_version = handler.version().map(str::to_string);
439        let mut merged_labels = handler.default_labels();
440        merged_labels.extend(labels);
441
442        let run = self
443            .store
444            .create_run(NewRun {
445                workflow_name: handler_name.to_string(),
446                trigger,
447                payload,
448                max_retries,
449                handler_version,
450                labels: merged_labels,
451                scheduled_at,
452            })
453            .await?;
454
455        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
456        Ok(run)
457    }
458
459    /// Execute a handler-based run (used by the worker after pick_next_pending).
460    ///
461    /// Looks up the handler by the run's `workflow_name` and executes it
462    /// with a fresh [`WorkflowContext`].
463    ///
464    /// # Errors
465    ///
466    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
467    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
468    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
469        let run = self
470            .store
471            .get_run(run_id)
472            .await?
473            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
474
475        let handler = self
476            .handlers
477            .get(&run.workflow_name)
478            .ok_or_else(|| {
479                EngineError::InvalidWorkflow(format!(
480                    "no handler registered: {}",
481                    run.workflow_name
482                ))
483            })?
484            .clone();
485
486        #[cfg(feature = "prometheus")]
487        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
488
489        let run_start = Instant::now();
490        let mut ctx = self.build_context(run_id);
491
492        let result = handler.execute(&mut ctx).await;
493        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
494            .await
495    }
496
497    /// Execute a run by its ID (used by the worker after pick_next_pending).
498    ///
499    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
500    ///
501    /// # Errors
502    ///
503    /// Returns [`EngineError`] if the run is not found or execution fails.
504    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
505    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
506        self.execute_handler_run(run_id).await
507    }
508
509    /// Resume a run after human approval.
510    ///
511    /// Re-executes the handler with step replay: completed steps return
512    /// cached output, approved approval steps are skipped, and execution
513    /// continues from the first unexecuted step.
514    ///
515    /// Supports multiple approval gates -- each resume replays all prior
516    /// steps and stops at the next approval (or completes the run).
517    ///
518    /// # Errors
519    ///
520    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
521    /// Returns [`EngineError`] if execution fails or hits another approval.
522    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
523    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
524        let run = self
525            .store
526            .get_run(run_id)
527            .await?
528            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
529
530        let handler = self
531            .handlers
532            .get(&run.workflow_name)
533            .ok_or_else(|| {
534                EngineError::InvalidWorkflow(format!(
535                    "no handler registered: {}",
536                    run.workflow_name
537                ))
538            })?
539            .clone();
540
541        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
542
543        let run_start = Instant::now();
544        let mut ctx = self.build_context(run_id);
545        ctx.load_replay_steps().await?;
546
547        let result = handler.execute(&mut ctx).await;
548        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
549            .await
550    }
551
552    /// Fail all non-terminal steps for a run.
553    ///
554    /// Called after a run is marked as failed (timeout, error, panic) to clean up
555    /// orphaned steps that are still in `Running`, `Pending`, or `AwaitingApproval`.
556    ///
557    /// - `Running` / `AwaitingApproval` steps are marked `Failed`.
558    /// - `Pending` steps are marked `Skipped` (FSM does not allow Pending -> Failed).
559    ///
560    /// Errors from individual step updates are logged but do not abort the cleanup.
561    ///
562    /// # Errors
563    ///
564    /// Returns [`EngineError`] if listing steps fails.
565    pub async fn fail_orphaned_steps(
566        &self,
567        run_id: Uuid,
568        error_message: &str,
569    ) -> Result<(), EngineError> {
570        let steps = self.store.list_steps(run_id).await?;
571        let now = Utc::now();
572
573        for step in steps {
574            if step.status.state.is_terminal() {
575                continue;
576            }
577
578            let (target_status, error) = match step.status.state {
579                StepStatus::Running | StepStatus::AwaitingApproval => {
580                    (StepStatus::Failed, Some(error_message.to_string()))
581                }
582                StepStatus::Pending => (StepStatus::Skipped, None),
583                _ => continue,
584            };
585
586            if let Err(e) = self
587                .store
588                .update_step(
589                    step.id,
590                    StepUpdate {
591                        status: Some(target_status),
592                        error,
593                        completed_at: Some(now),
594                        ..StepUpdate::default()
595                    },
596                )
597                .await
598            {
599                warn!(
600                    run_id = %run_id,
601                    step_id = %step.id,
602                    step_name = %step.name,
603                    error = %e,
604                    "failed to cleanup orphaned step"
605                );
606            } else {
607                info!(
608                    run_id = %run_id,
609                    step_id = %step.id,
610                    step_name = %step.name,
611                    from = %step.status.state,
612                    to = %target_status,
613                    "cleaned up orphaned step"
614                );
615            }
616        }
617
618        Ok(())
619    }
620
621    /// Finalize a run with the given result and context.
622    ///
623    /// On success: updates run to Completed with cost, duration, and completed_at.
624    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
625    /// Always: fetches and returns the final Run.
626    async fn finalize_run(
627        &self,
628        run_id: Uuid,
629        workflow_name: &str,
630        result: Result<(), EngineError>,
631        ctx: &WorkflowContext,
632        run_start: Instant,
633    ) -> Result<Run, EngineError> {
634        let total_duration = run_start.elapsed().as_millis() as u64;
635        let completed_at = Utc::now();
636
637        let final_status;
638        let final_run;
639
640        match result {
641            Ok(()) => {
642                final_status = RunStatus::Completed;
643                final_run = self
644                    .store
645                    .update_run_returning(
646                        run_id,
647                        RunUpdate {
648                            status: Some(RunStatus::Completed),
649                            cost_usd: Some(ctx.total_cost_usd()),
650                            duration_ms: Some(total_duration),
651                            completed_at: Some(completed_at),
652                            ..RunUpdate::default()
653                        },
654                    )
655                    .await?;
656
657                info!(
658                    run_id = %run_id,
659                    cost_usd = %ctx.total_cost_usd(),
660                    duration_ms = total_duration,
661                    "run completed"
662                );
663            }
664            Err(EngineError::ApprovalRequired {
665                run_id: approval_run_id,
666                step_id,
667                ref message,
668            }) => {
669                final_status = RunStatus::AwaitingApproval;
670                final_run = self
671                    .store
672                    .update_run_returning(
673                        run_id,
674                        RunUpdate {
675                            status: Some(RunStatus::AwaitingApproval),
676                            cost_usd: Some(ctx.total_cost_usd()),
677                            duration_ms: Some(total_duration),
678                            ..RunUpdate::default()
679                        },
680                    )
681                    .await?;
682
683                info!(
684                    run_id = %approval_run_id,
685                    step_id = %step_id,
686                    message = %message,
687                    "run awaiting approval"
688                );
689            }
690            Err(err) => {
691                final_status = RunStatus::Failed;
692                if let Err(store_err) = self
693                    .store
694                    .update_run(
695                        run_id,
696                        RunUpdate {
697                            status: Some(RunStatus::Failed),
698                            error: Some(err.to_string()),
699                            cost_usd: Some(ctx.total_cost_usd()),
700                            duration_ms: Some(total_duration),
701                            completed_at: Some(completed_at),
702                            ..RunUpdate::default()
703                        },
704                    )
705                    .await
706                {
707                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
708                }
709
710                error!(run_id = %run_id, error = %err, "run failed");
711
712                self.publish_run_status_changed(
713                    workflow_name,
714                    run_id,
715                    final_status,
716                    Some(err.to_string()),
717                    ctx,
718                    total_duration,
719                );
720
721                #[cfg(feature = "prometheus")]
722                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
723
724                return Err(err);
725            }
726        }
727
728        self.publish_run_status_changed(
729            workflow_name,
730            run_id,
731            final_status,
732            None,
733            ctx,
734            total_duration,
735        );
736
737        #[cfg(feature = "prometheus")]
738        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
739
740        Ok(final_run)
741    }
742
743    /// Emit Prometheus metrics for a completed run.
744    #[cfg(feature = "prometheus")]
745    fn emit_run_metrics(
746        &self,
747        workflow_name: &str,
748        status: RunStatus,
749        duration_ms: u64,
750        ctx: &WorkflowContext,
751    ) {
752        let status_str = status.to_string();
753        let wf = workflow_name.to_string();
754
755        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
756        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
757            .record(duration_ms as f64 / 1000.0);
758        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
759            ctx.total_cost_usd()
760                .to_string()
761                .parse::<f64>()
762                .unwrap_or(0.0),
763        );
764        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
765    }
766
767    /// Publish a run status changed event to all registered subscribers.
768    ///
769    /// `from` is always `Running` because `finalize_run` is only called
770    /// from a running state.
771    fn publish_run_status_changed(
772        &self,
773        workflow_name: &str,
774        run_id: Uuid,
775        to: RunStatus,
776        error: Option<String>,
777        ctx: &WorkflowContext,
778        duration_ms: u64,
779    ) {
780        let now = Utc::now();
781        let cost_usd = ctx.total_cost_usd();
782        let wf = workflow_name.to_string();
783
784        self.event_publisher.publish(Event::RunStatusChanged {
785            run_id,
786            workflow_name: wf.clone(),
787            from: RunStatus::Running,
788            to,
789            error: error.clone(),
790            cost_usd,
791            duration_ms,
792            at: now,
793        });
794
795        if to == RunStatus::Failed {
796            self.event_publisher.publish(Event::RunFailed {
797                run_id,
798                workflow_name: wf,
799                error,
800                cost_usd,
801                duration_ms,
802                at: now,
803            });
804        }
805    }
806}
807
808impl fmt::Debug for Engine {
809    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
810        f.debug_struct("Engine")
811            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
812            .finish_non_exhaustive()
813    }
814}
815
816#[cfg(test)]
817mod tests {
818    use super::*;
819    use crate::config::ShellConfig;
820    use crate::handler::{HandlerFuture, WorkflowHandler};
821    use ironflow_core::providers::claude::ClaudeCodeProvider;
822    use ironflow_core::providers::record_replay::RecordReplayProvider;
823    use ironflow_store::memory::InMemoryStore;
824    use ironflow_store::models::StepStatus;
825    use serde_json::json;
826
827    // Test handler that echoes a message via shell
828    struct EchoWorkflow;
829
830    impl WorkflowHandler for EchoWorkflow {
831        fn name(&self) -> &str {
832            "echo-workflow"
833        }
834
835        fn describe(&self) -> WorkflowInfo {
836            WorkflowInfo {
837                description: "A simple workflow that echoes hello".to_string(),
838                source_code: None,
839                sub_workflows: Vec::new(),
840                category: None,
841                version: self.version().map(str::to_string),
842                input_schema: None,
843                default_labels: HashMap::new(),
844            }
845        }
846
847        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
848            Box::pin(async move {
849                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
850                Ok(())
851            })
852        }
853    }
854
855    // Test handler that fails
856    struct FailingWorkflow;
857
858    impl WorkflowHandler for FailingWorkflow {
859        fn name(&self) -> &str {
860            "failing-workflow"
861        }
862
863        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
864            Box::pin(async move {
865                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
866                Ok(())
867            })
868        }
869    }
870
871    fn create_test_engine() -> Engine {
872        let store = Arc::new(InMemoryStore::new());
873        let inner = ClaudeCodeProvider::new();
874        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
875            inner,
876            "/tmp/ironflow-fixtures",
877        ));
878        Engine::new(store, provider)
879    }
880
881    #[test]
882    fn engine_new_creates_instance() {
883        let engine = create_test_engine();
884        assert_eq!(engine.handler_names().len(), 0);
885    }
886
887    #[test]
888    fn engine_register_handler() {
889        let mut engine = create_test_engine();
890        let result = engine.register(EchoWorkflow);
891        assert!(result.is_ok());
892        assert_eq!(engine.handler_names().len(), 1);
893        assert!(engine.handler_names().contains(&"echo-workflow"));
894    }
895
896    #[test]
897    fn engine_register_duplicate_returns_error() {
898        let mut engine = create_test_engine();
899        engine.register(EchoWorkflow).unwrap();
900        let result = engine.register(EchoWorkflow);
901        assert!(result.is_err());
902    }
903
904    #[test]
905    fn engine_get_handler_found() {
906        let mut engine = create_test_engine();
907        engine.register(EchoWorkflow).unwrap();
908        let handler = engine.get_handler("echo-workflow");
909        assert!(handler.is_some());
910    }
911
912    #[test]
913    fn engine_get_handler_not_found() {
914        let engine = create_test_engine();
915        let handler = engine.get_handler("nonexistent");
916        assert!(handler.is_none());
917    }
918
919    #[test]
920    fn engine_handler_names_lists_all() {
921        let mut engine = create_test_engine();
922        engine.register(EchoWorkflow).unwrap();
923        engine.register(FailingWorkflow).unwrap();
924        let names = engine.handler_names();
925        assert_eq!(names.len(), 2);
926        assert!(names.contains(&"echo-workflow"));
927        assert!(names.contains(&"failing-workflow"));
928    }
929
930    #[test]
931    fn engine_handler_info_returns_description() {
932        let mut engine = create_test_engine();
933        engine.register(EchoWorkflow).unwrap();
934        let info = engine.handler_info("echo-workflow");
935        assert!(info.is_some());
936        let info = info.unwrap();
937        assert_eq!(info.description, "A simple workflow that echoes hello");
938    }
939
940    struct CategorizedWorkflow;
941
942    impl WorkflowHandler for CategorizedWorkflow {
943        fn name(&self) -> &str {
944            "categorized"
945        }
946        fn category(&self) -> Option<&str> {
947            Some("data/etl")
948        }
949        fn execute<'a>(
950            &'a self,
951            _ctx: &'a mut WorkflowContext,
952        ) -> crate::handler::HandlerFuture<'a> {
953            Box::pin(async move { Ok(()) })
954        }
955    }
956
957    #[test]
958    fn engine_default_describe_propagates_category() {
959        let mut engine = create_test_engine();
960        engine.register(CategorizedWorkflow).unwrap();
961        let info = engine.handler_info("categorized").unwrap();
962        assert_eq!(info.category.as_deref(), Some("data/etl"));
963    }
964
965    #[test]
966    fn engine_default_describe_without_category() {
967        let mut engine = create_test_engine();
968        engine.register(EchoWorkflow).unwrap();
969        let info = engine.handler_info("echo-workflow").unwrap();
970        assert!(info.category.is_none());
971    }
972
973    struct BadCategoryWorkflow(&'static str);
974
975    impl WorkflowHandler for BadCategoryWorkflow {
976        fn name(&self) -> &str {
977            "bad-category"
978        }
979        fn category(&self) -> Option<&str> {
980            Some(self.0)
981        }
982        fn execute<'a>(
983            &'a self,
984            _ctx: &'a mut WorkflowContext,
985        ) -> crate::handler::HandlerFuture<'a> {
986            Box::pin(async move { Ok(()) })
987        }
988    }
989
990    #[test]
991    fn engine_register_rejects_empty_category() {
992        let mut engine = create_test_engine();
993        let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
994        match err {
995            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
996            other => panic!("expected InvalidWorkflow, got {other:?}"),
997        }
998    }
999
1000    #[test]
1001    fn engine_register_rejects_leading_slash_category() {
1002        let mut engine = create_test_engine();
1003        let err = engine
1004            .register(BadCategoryWorkflow("/data/etl"))
1005            .unwrap_err();
1006        match err {
1007            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
1008            other => panic!("expected InvalidWorkflow, got {other:?}"),
1009        }
1010    }
1011
1012    #[test]
1013    fn engine_register_rejects_trailing_slash_category() {
1014        let mut engine = create_test_engine();
1015        let err = engine
1016            .register(BadCategoryWorkflow("data/etl/"))
1017            .unwrap_err();
1018        match err {
1019            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
1020            other => panic!("expected InvalidWorkflow, got {other:?}"),
1021        }
1022    }
1023
1024    #[test]
1025    fn engine_register_rejects_double_slash_category() {
1026        let mut engine = create_test_engine();
1027        let err = engine
1028            .register(BadCategoryWorkflow("data//etl"))
1029            .unwrap_err();
1030        match err {
1031            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
1032            other => panic!("expected InvalidWorkflow, got {other:?}"),
1033        }
1034    }
1035
1036    #[test]
1037    fn engine_register_rejects_whitespace_only_segment_category() {
1038        let mut engine = create_test_engine();
1039        let err = engine
1040            .register(BadCategoryWorkflow("data/ /etl"))
1041            .unwrap_err();
1042        match err {
1043            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
1044            other => panic!("expected InvalidWorkflow, got {other:?}"),
1045        }
1046    }
1047
1048    #[test]
1049    fn engine_register_accepts_valid_nested_category() {
1050        let mut engine = create_test_engine();
1051        assert!(engine.register(CategorizedWorkflow).is_ok());
1052    }
1053
1054    #[tokio::test]
1055    async fn engine_unknown_workflow_returns_error() {
1056        let engine = create_test_engine();
1057        let result = engine
1058            .run_handler("unknown", TriggerKind::Manual, json!({}))
1059            .await;
1060        assert!(result.is_err());
1061        match result {
1062            Err(EngineError::InvalidWorkflow(msg)) => {
1063                assert!(msg.contains("no handler registered"));
1064            }
1065            _ => panic!("expected InvalidWorkflow error"),
1066        }
1067    }
1068
1069    #[tokio::test]
1070    async fn engine_enqueue_handler_creates_pending_run() {
1071        let mut engine = create_test_engine();
1072        engine.register(EchoWorkflow).unwrap();
1073
1074        let run = engine
1075            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
1076            .await
1077            .unwrap();
1078        assert_eq!(run.status.state, RunStatus::Pending);
1079        assert_eq!(run.workflow_name, "echo-workflow");
1080    }
1081
1082    #[tokio::test]
1083    async fn engine_register_boxed() {
1084        let mut engine = create_test_engine();
1085        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
1086        let result = engine.register_boxed(handler);
1087        assert!(result.is_ok());
1088        assert_eq!(engine.handler_names().len(), 1);
1089    }
1090
1091    #[tokio::test]
1092    async fn engine_store_and_provider_accessors() {
1093        let store = Arc::new(InMemoryStore::new());
1094        let inner = ClaudeCodeProvider::new();
1095        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
1096            inner,
1097            "/tmp/ironflow-fixtures",
1098        ));
1099        let engine = Engine::new(store.clone(), provider.clone());
1100
1101        // Verify accessors return references
1102        let _ = engine.store();
1103        let _ = engine.provider();
1104    }
1105
1106    // -----------------------------------------------------------------------
1107    // Operation trait tests
1108    // -----------------------------------------------------------------------
1109
1110    use crate::operation::Operation;
1111    use ironflow_store::models::StepKind;
1112    use std::future::Future;
1113    use std::pin::Pin;
1114
1115    struct FakeGitlabOp {
1116        project_id: u64,
1117        title: String,
1118    }
1119
1120    impl Operation for FakeGitlabOp {
1121        fn kind(&self) -> &str {
1122            "gitlab"
1123        }
1124
1125        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1126            Box::pin(async move {
1127                Ok(json!({
1128                    "issue_id": 42,
1129                    "project_id": self.project_id,
1130                    "title": self.title,
1131                }))
1132            })
1133        }
1134
1135        fn input(&self) -> Option<Value> {
1136            Some(json!({
1137                "project_id": self.project_id,
1138                "title": self.title,
1139            }))
1140        }
1141    }
1142
1143    struct FailingOp;
1144
1145    impl Operation for FailingOp {
1146        fn kind(&self) -> &str {
1147            "broken-service"
1148        }
1149
1150        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1151            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1152        }
1153    }
1154
1155    struct OperationWorkflow;
1156
1157    impl WorkflowHandler for OperationWorkflow {
1158        fn name(&self) -> &str {
1159            "operation-workflow"
1160        }
1161
1162        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1163            Box::pin(async move {
1164                let op = FakeGitlabOp {
1165                    project_id: 123,
1166                    title: "Bug report".to_string(),
1167                };
1168                ctx.operation("create-issue", &op).await?;
1169                Ok(())
1170            })
1171        }
1172    }
1173
1174    struct FailingOperationWorkflow;
1175
1176    impl WorkflowHandler for FailingOperationWorkflow {
1177        fn name(&self) -> &str {
1178            "failing-operation-workflow"
1179        }
1180
1181        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1182            Box::pin(async move {
1183                ctx.operation("broken-call", &FailingOp).await?;
1184                Ok(())
1185            })
1186        }
1187    }
1188
1189    struct MixedWorkflow;
1190
1191    impl WorkflowHandler for MixedWorkflow {
1192        fn name(&self) -> &str {
1193            "mixed-workflow"
1194        }
1195
1196        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1197            Box::pin(async move {
1198                ctx.shell("build", ShellConfig::new("echo built")).await?;
1199                let op = FakeGitlabOp {
1200                    project_id: 456,
1201                    title: "Deploy done".to_string(),
1202                };
1203                let result = ctx.operation("notify-gitlab", &op).await?;
1204                assert_eq!(result.output["issue_id"], 42);
1205                Ok(())
1206            })
1207        }
1208    }
1209
1210    #[tokio::test]
1211    async fn operation_step_happy_path() {
1212        let mut engine = create_test_engine();
1213        engine.register(OperationWorkflow).unwrap();
1214
1215        let run = engine
1216            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1217            .await
1218            .unwrap();
1219
1220        assert_eq!(run.status.state, RunStatus::Completed);
1221
1222        let steps = engine.store().list_steps(run.id).await.unwrap();
1223
1224        assert_eq!(steps.len(), 1);
1225        assert_eq!(steps[0].name, "create-issue");
1226        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1227        assert_eq!(
1228            steps[0].status.state,
1229            ironflow_store::models::StepStatus::Completed
1230        );
1231
1232        let output = steps[0].output.as_ref().unwrap();
1233        assert_eq!(output["issue_id"], 42);
1234        assert_eq!(output["project_id"], 123);
1235
1236        let input = steps[0].input.as_ref().unwrap();
1237        assert_eq!(input["project_id"], 123);
1238        assert_eq!(input["title"], "Bug report");
1239    }
1240
1241    #[tokio::test]
1242    async fn operation_step_failure_marks_run_failed() {
1243        let mut engine = create_test_engine();
1244        engine.register(FailingOperationWorkflow).unwrap();
1245
1246        let result = engine
1247            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1248            .await;
1249
1250        assert!(result.is_err());
1251    }
1252
1253    #[tokio::test]
1254    async fn operation_mixed_with_shell_steps() {
1255        let mut engine = create_test_engine();
1256        engine.register(MixedWorkflow).unwrap();
1257
1258        let run = engine
1259            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1260            .await
1261            .unwrap();
1262
1263        assert_eq!(run.status.state, RunStatus::Completed);
1264
1265        let steps = engine.store().list_steps(run.id).await.unwrap();
1266
1267        assert_eq!(steps.len(), 2);
1268        assert_eq!(steps[0].kind, StepKind::Shell);
1269        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1270        assert_eq!(steps[0].position, 0);
1271        assert_eq!(steps[1].position, 1);
1272    }
1273
1274    // -----------------------------------------------------------------------
1275    // Approval + resume tests
1276    // -----------------------------------------------------------------------
1277
1278    use crate::config::ApprovalConfig;
1279
1280    struct SingleApprovalWorkflow;
1281
1282    impl WorkflowHandler for SingleApprovalWorkflow {
1283        fn name(&self) -> &str {
1284            "single-approval"
1285        }
1286
1287        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1288            Box::pin(async move {
1289                ctx.shell("build", ShellConfig::new("echo built")).await?;
1290                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1291                ctx.shell("deploy", ShellConfig::new("echo deployed"))
1292                    .await?;
1293                Ok(())
1294            })
1295        }
1296    }
1297
1298    struct DoubleApprovalWorkflow;
1299
1300    impl WorkflowHandler for DoubleApprovalWorkflow {
1301        fn name(&self) -> &str {
1302            "double-approval"
1303        }
1304
1305        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1306            Box::pin(async move {
1307                ctx.shell("build", ShellConfig::new("echo built")).await?;
1308                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1309                    .await?;
1310                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1311                    .await?;
1312                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1313                    .await?;
1314                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1315                    .await?;
1316                Ok(())
1317            })
1318        }
1319    }
1320
1321    #[tokio::test]
1322    async fn approval_pauses_run() {
1323        let mut engine = create_test_engine();
1324        engine.register(SingleApprovalWorkflow).unwrap();
1325
1326        let run = engine
1327            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1328            .await
1329            .unwrap();
1330
1331        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1332
1333        let steps = engine.store().list_steps(run.id).await.unwrap();
1334        assert_eq!(steps.len(), 2); // build + approval gate
1335        assert_eq!(steps[0].kind, StepKind::Shell);
1336        assert_eq!(steps[0].status.state, StepStatus::Completed);
1337        assert_eq!(steps[1].kind, StepKind::Approval);
1338        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1339    }
1340
1341    #[tokio::test]
1342    async fn approval_resume_completes_run() {
1343        let mut engine = create_test_engine();
1344        engine.register(SingleApprovalWorkflow).unwrap();
1345
1346        // First execution: pauses at approval
1347        let run = engine
1348            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1349            .await
1350            .unwrap();
1351        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1352
1353        // Simulate approval: transition to Running
1354        engine
1355            .store()
1356            .update_run_status(run.id, RunStatus::Running)
1357            .await
1358            .unwrap();
1359
1360        // Resume: replays build, skips approval, executes deploy
1361        let resumed = engine.resume_run(run.id).await.unwrap();
1362        assert_eq!(resumed.status.state, RunStatus::Completed);
1363
1364        let steps = engine.store().list_steps(run.id).await.unwrap();
1365        assert_eq!(steps.len(), 3); // build + approval + deploy
1366        assert_eq!(steps[0].name, "build");
1367        assert_eq!(steps[0].status.state, StepStatus::Completed);
1368        assert_eq!(steps[1].name, "gate");
1369        assert_eq!(steps[1].kind, StepKind::Approval);
1370        assert_eq!(steps[1].status.state, StepStatus::Completed);
1371        assert_eq!(steps[2].name, "deploy");
1372        assert_eq!(steps[2].status.state, StepStatus::Completed);
1373    }
1374
1375    #[tokio::test]
1376    async fn double_approval_two_resumes() {
1377        let mut engine = create_test_engine();
1378        engine.register(DoubleApprovalWorkflow).unwrap();
1379
1380        // First execution: pauses at staging-gate
1381        let run = engine
1382            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1383            .await
1384            .unwrap();
1385        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1386
1387        let steps = engine.store().list_steps(run.id).await.unwrap();
1388        assert_eq!(steps.len(), 2); // build + staging-gate
1389
1390        // First approval
1391        engine
1392            .store()
1393            .update_run_status(run.id, RunStatus::Running)
1394            .await
1395            .unwrap();
1396
1397        let resumed = engine.resume_run(run.id).await.unwrap();
1398        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1399
1400        let steps = engine.store().list_steps(run.id).await.unwrap();
1401        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1402
1403        // Second approval
1404        engine
1405            .store()
1406            .update_run_status(run.id, RunStatus::Running)
1407            .await
1408            .unwrap();
1409
1410        let final_run = engine.resume_run(run.id).await.unwrap();
1411        assert_eq!(final_run.status.state, RunStatus::Completed);
1412
1413        let steps = engine.store().list_steps(run.id).await.unwrap();
1414        assert_eq!(steps.len(), 5);
1415        assert_eq!(steps[0].name, "build");
1416        assert_eq!(steps[1].name, "staging-gate");
1417        assert_eq!(steps[2].name, "deploy-staging");
1418        assert_eq!(steps[3].name, "prod-gate");
1419        assert_eq!(steps[4].name, "deploy-prod");
1420
1421        for step in &steps {
1422            assert_eq!(step.status.state, StepStatus::Completed);
1423        }
1424    }
1425
1426    // -----------------------------------------------------------------------
1427    // fail_orphaned_steps tests
1428    // -----------------------------------------------------------------------
1429
1430    use ironflow_store::models::{NewStep, StepUpdate};
1431
1432    async fn create_step_with_status(
1433        store: &Arc<dyn Store>,
1434        run_id: Uuid,
1435        name: &str,
1436        position: u32,
1437        status: StepStatus,
1438    ) -> ironflow_store::models::Step {
1439        let step = store
1440            .create_step(NewStep {
1441                run_id,
1442                name: name.to_string(),
1443                kind: StepKind::Shell,
1444                position,
1445                input: None,
1446            })
1447            .await
1448            .unwrap();
1449
1450        match status {
1451            StepStatus::Pending => {}
1452            StepStatus::Running => {
1453                store
1454                    .update_step(
1455                        step.id,
1456                        StepUpdate {
1457                            status: Some(StepStatus::Running),
1458                            ..StepUpdate::default()
1459                        },
1460                    )
1461                    .await
1462                    .unwrap();
1463            }
1464            StepStatus::Completed => {
1465                store
1466                    .update_step(
1467                        step.id,
1468                        StepUpdate {
1469                            status: Some(StepStatus::Running),
1470                            ..StepUpdate::default()
1471                        },
1472                    )
1473                    .await
1474                    .unwrap();
1475                store
1476                    .update_step(
1477                        step.id,
1478                        StepUpdate {
1479                            status: Some(StepStatus::Completed),
1480                            ..StepUpdate::default()
1481                        },
1482                    )
1483                    .await
1484                    .unwrap();
1485            }
1486            StepStatus::AwaitingApproval => {
1487                store
1488                    .update_step(
1489                        step.id,
1490                        StepUpdate {
1491                            status: Some(StepStatus::Running),
1492                            ..StepUpdate::default()
1493                        },
1494                    )
1495                    .await
1496                    .unwrap();
1497                store
1498                    .update_step(
1499                        step.id,
1500                        StepUpdate {
1501                            status: Some(StepStatus::AwaitingApproval),
1502                            ..StepUpdate::default()
1503                        },
1504                    )
1505                    .await
1506                    .unwrap();
1507            }
1508            _ => panic!("unsupported status for test helper: {status}"),
1509        }
1510
1511        store.get_step(step.id).await.unwrap().unwrap()
1512    }
1513
1514    #[tokio::test]
1515    async fn fail_orphaned_steps_marks_running_as_failed() {
1516        let engine = create_test_engine();
1517        let run = engine
1518            .store()
1519            .create_run(NewRun {
1520                workflow_name: "test".to_string(),
1521                trigger: TriggerKind::Manual,
1522                payload: json!({}),
1523                max_retries: 0,
1524                handler_version: None,
1525                labels: HashMap::new(),
1526                scheduled_at: None,
1527            })
1528            .await
1529            .unwrap();
1530
1531        let step = create_step_with_status(
1532            engine.store(),
1533            run.id,
1534            "running-step",
1535            0,
1536            StepStatus::Running,
1537        )
1538        .await;
1539
1540        engine
1541            .fail_orphaned_steps(run.id, "parent run timed out")
1542            .await
1543            .unwrap();
1544
1545        let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1546        assert_eq!(updated.status.state, StepStatus::Failed);
1547        assert_eq!(updated.error.as_deref(), Some("parent run timed out"));
1548        assert!(updated.completed_at.is_some());
1549    }
1550
1551    #[tokio::test]
1552    async fn fail_orphaned_steps_marks_pending_as_skipped() {
1553        let engine = create_test_engine();
1554        let run = engine
1555            .store()
1556            .create_run(NewRun {
1557                workflow_name: "test".to_string(),
1558                trigger: TriggerKind::Manual,
1559                payload: json!({}),
1560                max_retries: 0,
1561                handler_version: None,
1562                labels: HashMap::new(),
1563                scheduled_at: None,
1564            })
1565            .await
1566            .unwrap();
1567
1568        let step = create_step_with_status(
1569            engine.store(),
1570            run.id,
1571            "pending-step",
1572            0,
1573            StepStatus::Pending,
1574        )
1575        .await;
1576
1577        engine
1578            .fail_orphaned_steps(run.id, "parent run timed out")
1579            .await
1580            .unwrap();
1581
1582        let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1583        assert_eq!(updated.status.state, StepStatus::Skipped);
1584        assert!(updated.error.is_none());
1585        assert!(updated.completed_at.is_some());
1586    }
1587
1588    #[tokio::test]
1589    async fn fail_orphaned_steps_marks_awaiting_approval_as_failed() {
1590        let engine = create_test_engine();
1591        let run = engine
1592            .store()
1593            .create_run(NewRun {
1594                workflow_name: "test".to_string(),
1595                trigger: TriggerKind::Manual,
1596                payload: json!({}),
1597                max_retries: 0,
1598                handler_version: None,
1599                labels: HashMap::new(),
1600                scheduled_at: None,
1601            })
1602            .await
1603            .unwrap();
1604
1605        let step = create_step_with_status(
1606            engine.store(),
1607            run.id,
1608            "approval-step",
1609            0,
1610            StepStatus::AwaitingApproval,
1611        )
1612        .await;
1613
1614        engine
1615            .fail_orphaned_steps(run.id, "parent run timed out")
1616            .await
1617            .unwrap();
1618
1619        let updated = engine.store().get_step(step.id).await.unwrap().unwrap();
1620        assert_eq!(updated.status.state, StepStatus::Failed);
1621        assert_eq!(updated.error.as_deref(), Some("parent run timed out"));
1622        assert!(updated.completed_at.is_some());
1623    }
1624
1625    #[tokio::test]
1626    async fn fail_orphaned_steps_skips_terminal_steps() {
1627        let engine = create_test_engine();
1628        let run = engine
1629            .store()
1630            .create_run(NewRun {
1631                workflow_name: "test".to_string(),
1632                trigger: TriggerKind::Manual,
1633                payload: json!({}),
1634                max_retries: 0,
1635                handler_version: None,
1636                labels: HashMap::new(),
1637                scheduled_at: None,
1638            })
1639            .await
1640            .unwrap();
1641
1642        let completed_step =
1643            create_step_with_status(engine.store(), run.id, "done", 0, StepStatus::Completed).await;
1644        let running_step =
1645            create_step_with_status(engine.store(), run.id, "in-flight", 1, StepStatus::Running)
1646                .await;
1647
1648        engine
1649            .fail_orphaned_steps(run.id, "parent run timed out")
1650            .await
1651            .unwrap();
1652
1653        let completed = engine
1654            .store()
1655            .get_step(completed_step.id)
1656            .await
1657            .unwrap()
1658            .unwrap();
1659        assert_eq!(completed.status.state, StepStatus::Completed);
1660
1661        let failed = engine
1662            .store()
1663            .get_step(running_step.id)
1664            .await
1665            .unwrap()
1666            .unwrap();
1667        assert_eq!(failed.status.state, StepStatus::Failed);
1668    }
1669
1670    #[tokio::test]
1671    async fn fail_orphaned_steps_mixed_states() {
1672        let engine = create_test_engine();
1673        let run = engine
1674            .store()
1675            .create_run(NewRun {
1676                workflow_name: "test".to_string(),
1677                trigger: TriggerKind::Manual,
1678                payload: json!({}),
1679                max_retries: 0,
1680                handler_version: None,
1681                labels: HashMap::new(),
1682                scheduled_at: None,
1683            })
1684            .await
1685            .unwrap();
1686
1687        let s_completed =
1688            create_step_with_status(engine.store(), run.id, "step-1", 0, StepStatus::Completed)
1689                .await;
1690        let s_running =
1691            create_step_with_status(engine.store(), run.id, "step-2", 1, StepStatus::Running).await;
1692        let s_pending =
1693            create_step_with_status(engine.store(), run.id, "step-3", 2, StepStatus::Pending).await;
1694
1695        engine.fail_orphaned_steps(run.id, "timeout").await.unwrap();
1696
1697        let r_completed = engine
1698            .store()
1699            .get_step(s_completed.id)
1700            .await
1701            .unwrap()
1702            .unwrap();
1703        assert_eq!(r_completed.status.state, StepStatus::Completed);
1704
1705        let r_running = engine
1706            .store()
1707            .get_step(s_running.id)
1708            .await
1709            .unwrap()
1710            .unwrap();
1711        assert_eq!(r_running.status.state, StepStatus::Failed);
1712        assert_eq!(r_running.error.as_deref(), Some("timeout"));
1713
1714        let r_pending = engine
1715            .store()
1716            .get_step(s_pending.id)
1717            .await
1718            .unwrap()
1719            .unwrap();
1720        assert_eq!(r_pending.status.state, StepStatus::Skipped);
1721        assert!(r_pending.error.is_none());
1722    }
1723
1724    #[tokio::test]
1725    async fn fail_orphaned_steps_no_steps_is_noop() {
1726        let engine = create_test_engine();
1727        let run = engine
1728            .store()
1729            .create_run(NewRun {
1730                workflow_name: "test".to_string(),
1731                trigger: TriggerKind::Manual,
1732                payload: json!({}),
1733                max_retries: 0,
1734                handler_version: None,
1735                labels: HashMap::new(),
1736                scheduled_at: None,
1737            })
1738            .await
1739            .unwrap();
1740
1741        let result = engine.fail_orphaned_steps(run.id, "timeout").await;
1742        assert!(result.is_ok());
1743    }
1744}