Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::{DateTime, Utc};
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::Store;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::log_sender::LogSender;
32use crate::notify::{Event, EventPublisher, EventSubscriber};
33
34/// The workflow orchestration engine.
35///
36/// Holds references to the store, agent provider, and a registry of
37/// [`WorkflowHandler`]s.
38///
39/// # Examples
40///
41/// ```no_run
42/// use std::sync::Arc;
43/// use ironflow_engine::engine::Engine;
44/// use ironflow_engine::config::ShellConfig;
45/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
46/// use ironflow_engine::context::WorkflowContext;
47/// use ironflow_store::memory::InMemoryStore;
48/// use ironflow_store::models::TriggerKind;
49/// use ironflow_core::providers::claude::ClaudeCodeProvider;
50/// use serde_json::json;
51///
52/// struct CiWorkflow;
53/// impl WorkflowHandler for CiWorkflow {
54///     fn name(&self) -> &str { "ci" }
55///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
56///         Box::pin(async move {
57///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
58///             Ok(())
59///         })
60///     }
61/// }
62///
63/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
64/// let store = Arc::new(InMemoryStore::new());
65/// let provider = Arc::new(ClaudeCodeProvider::new());
66/// let mut engine = Engine::new(store, provider);
67/// engine.register(CiWorkflow)?;
68///
69/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
70/// println!("Run {} completed with status {:?}", run.id, run.status);
71/// # Ok(())
72/// # }
73/// ```
74pub struct Engine {
75    store: Arc<dyn Store>,
76    provider: Arc<dyn AgentProvider>,
77    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
78    event_publisher: EventPublisher,
79    log_sender: Option<LogSender>,
80}
81
82/// Validate a workflow category path.
83///
84/// A category is a `/`-separated list of non-empty segments. This function
85/// rejects empty paths, leading or trailing `/`, consecutive `/`, and
86/// segments containing only whitespace.
87///
88/// # Errors
89///
90/// Returns [`EngineError::InvalidWorkflow`] when the category is malformed.
91fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
92    let reject = |reason: &str| {
93        Err(EngineError::InvalidWorkflow(format!(
94            "handler '{handler_name}' has invalid category '{category}': {reason}"
95        )))
96    };
97
98    if category.is_empty() {
99        return reject("empty category");
100    }
101    if category.starts_with('/') {
102        return reject("leading '/'");
103    }
104    if category.ends_with('/') {
105        return reject("trailing '/'");
106    }
107    for segment in category.split('/') {
108        if segment.is_empty() {
109            return reject("empty segment (double '/')");
110        }
111        if segment.trim().is_empty() {
112            return reject("whitespace-only segment");
113        }
114    }
115    Ok(())
116}
117
118impl Engine {
119    /// Create a new engine with the given store and agent provider.
120    ///
121    /// # Examples
122    ///
123    /// ```no_run
124    /// use std::sync::Arc;
125    /// use ironflow_engine::engine::Engine;
126    /// use ironflow_store::memory::InMemoryStore;
127    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
128    ///
129    /// let engine = Engine::new(
130    ///     Arc::new(InMemoryStore::new()),
131    ///     Arc::new(ClaudeCodeProvider::new()),
132    /// );
133    /// ```
134    pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
135        Self {
136            store,
137            provider,
138            handlers: HashMap::new(),
139            event_publisher: EventPublisher::new(),
140            log_sender: None,
141        }
142    }
143
144    /// Attach a log sender for real-time step output streaming.
145    ///
146    /// When set, all workflow contexts created by this engine will forward
147    /// step output (shell stdout/stderr, agent system messages) to the
148    /// given sender.
149    pub fn set_log_sender(&mut self, sender: LogSender) {
150        self.log_sender = Some(sender);
151    }
152
153    /// Returns a reference to the backing store.
154    pub fn store(&self) -> &Arc<dyn Store> {
155        &self.store
156    }
157
158    /// Returns a reference to the agent provider.
159    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
160        &self.provider
161    }
162
163    /// Build a [`WorkflowContext`] with access to the handler registry.
164    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
165        let handlers = self.handlers.clone();
166        let resolver: crate::context::HandlerResolver =
167            Arc::new(move |name: &str| handlers.get(name).cloned());
168        let mut ctx = WorkflowContext::with_handler_resolver(
169            run_id,
170            self.store.clone(),
171            self.provider.clone(),
172            resolver,
173        );
174        if let Some(ref sender) = self.log_sender {
175            ctx.set_log_sender(sender.clone());
176        }
177        ctx
178    }
179
180    // -----------------------------------------------------------------------
181    // Handler registration
182    // -----------------------------------------------------------------------
183
184    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
185    ///
186    /// The handler is looked up by [`WorkflowHandler::name`] when executing
187    /// or enqueuing.
188    ///
189    /// # Errors
190    ///
191    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
192    /// name is already registered.
193    ///
194    /// # Examples
195    ///
196    /// ```no_run
197    /// use std::sync::Arc;
198    /// use ironflow_engine::engine::Engine;
199    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
200    /// use ironflow_engine::context::WorkflowContext;
201    /// use ironflow_engine::config::ShellConfig;
202    /// use ironflow_store::memory::InMemoryStore;
203    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
204    ///
205    /// struct MyWorkflow;
206    /// impl WorkflowHandler for MyWorkflow {
207    ///     fn name(&self) -> &str { "my-workflow" }
208    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
209    ///         Box::pin(async move {
210    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
211    ///             Ok(())
212    ///         })
213    ///     }
214    /// }
215    ///
216    /// let mut engine = Engine::new(
217    ///     Arc::new(InMemoryStore::new()),
218    ///     Arc::new(ClaudeCodeProvider::new()),
219    /// );
220    /// engine.register(MyWorkflow)?;
221    /// # Ok::<(), ironflow_engine::error::EngineError>(())
222    /// ```
223    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
224        let name = handler.name().to_string();
225        if self.handlers.contains_key(&name) {
226            return Err(EngineError::InvalidWorkflow(format!(
227                "handler '{}' already registered",
228                name
229            )));
230        }
231        if let Some(category) = handler.category() {
232            validate_category(&name, category)?;
233        }
234        self.handlers.insert(name, Arc::new(handler));
235        Ok(())
236    }
237
238    /// Register a pre-boxed workflow handler.
239    ///
240    /// # Errors
241    ///
242    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
243    /// name is already registered or if its category is invalid.
244    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
245        let name = handler.name().to_string();
246        if self.handlers.contains_key(&name) {
247            return Err(EngineError::InvalidWorkflow(format!(
248                "handler '{}' already registered",
249                name
250            )));
251        }
252        if let Some(category) = handler.category() {
253            validate_category(&name, category)?;
254        }
255        self.handlers.insert(name, Arc::from(handler));
256        Ok(())
257    }
258
259    /// Get a registered handler by name.
260    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
261        self.handlers.get(name)
262    }
263
264    /// List registered handler names.
265    pub fn handler_names(&self) -> Vec<&str> {
266        self.handlers.keys().map(|s| s.as_str()).collect()
267    }
268
269    /// Get detailed info about a registered workflow handler.
270    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
271        self.handlers.get(name).map(|h| h.describe())
272    }
273
274    /// Register an event subscriber for domain events.
275    ///
276    /// The subscriber is called only for events whose type is in
277    /// `event_types`. Pass [`Event::ALL`] to receive every event.
278    ///
279    /// # Examples
280    ///
281    /// ```no_run
282    /// use ironflow_engine::engine::Engine;
283    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
284    /// use ironflow_store::memory::InMemoryStore;
285    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
286    /// use std::sync::Arc;
287    ///
288    /// let mut engine = Engine::new(
289    ///     Arc::new(InMemoryStore::new()),
290    ///     Arc::new(ClaudeCodeProvider::new()),
291    /// );
292    ///
293    /// engine.subscribe(
294    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
295    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
296    /// );
297    /// ```
298    pub fn subscribe(
299        &mut self,
300        subscriber: impl EventSubscriber + 'static,
301        event_types: &[&'static str],
302    ) {
303        self.event_publisher.subscribe(subscriber, event_types);
304    }
305
306    /// Returns a reference to the event publisher.
307    ///
308    /// Useful for publishing events from outside the engine (e.g. auth
309    /// routes in the API layer).
310    pub fn event_publisher(&self) -> &EventPublisher {
311        &self.event_publisher
312    }
313
314    // -----------------------------------------------------------------------
315    // Dynamic workflow execution (WorkflowHandler)
316    // -----------------------------------------------------------------------
317
318    /// Execute a registered handler inline.
319    ///
320    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
321    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
322    ///
323    /// # Errors
324    ///
325    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
326    /// with that name. Returns [`EngineError`] if execution fails.
327    ///
328    /// # Examples
329    ///
330    /// ```no_run
331    /// use std::sync::Arc;
332    /// use ironflow_engine::engine::Engine;
333    /// use ironflow_store::memory::InMemoryStore;
334    /// use ironflow_store::models::TriggerKind;
335    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
336    /// use serde_json::json;
337    ///
338    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
339    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
340    /// # Ok(())
341    /// # }
342    /// ```
343    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
344    pub async fn run_handler(
345        &self,
346        handler_name: &str,
347        trigger: TriggerKind,
348        payload: Value,
349    ) -> Result<Run, EngineError> {
350        let handler = self
351            .handlers
352            .get(handler_name)
353            .ok_or_else(|| {
354                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
355            })?
356            .clone();
357
358        let handler_version = handler.version().map(str::to_string);
359        let run = self
360            .store
361            .create_run(NewRun {
362                workflow_name: handler_name.to_string(),
363                trigger,
364                payload,
365                max_retries: 0,
366                handler_version,
367                labels: handler.default_labels(),
368                scheduled_at: None,
369            })
370            .await?;
371
372        let run_id = run.id;
373        info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
374
375        self.store
376            .update_run_status(run_id, RunStatus::Running)
377            .await?;
378
379        #[cfg(feature = "prometheus")]
380        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
381
382        let run_start = Instant::now();
383        let mut ctx = self.build_context(run_id);
384
385        let result = handler.execute(&mut ctx).await;
386        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
387            .await
388    }
389
390    /// Enqueue a handler-based workflow for worker execution.
391    ///
392    /// The workflow name is stored in the run. The worker looks up the
393    /// handler by name when executing.
394    ///
395    /// # Errors
396    ///
397    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
398    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
399    pub async fn enqueue_handler(
400        &self,
401        handler_name: &str,
402        trigger: TriggerKind,
403        payload: Value,
404        max_retries: u32,
405    ) -> Result<Run, EngineError> {
406        self.enqueue_handler_with_options(
407            handler_name,
408            trigger,
409            payload,
410            max_retries,
411            HashMap::new(),
412            None,
413        )
414        .await
415    }
416
417    /// Enqueue a handler-based workflow with labels and optional deferred scheduling.
418    ///
419    /// # Errors
420    ///
421    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
422    #[tracing::instrument(name = "engine.enqueue_handler_with_options", skip_all, fields(workflow = %handler_name))]
423    pub async fn enqueue_handler_with_options(
424        &self,
425        handler_name: &str,
426        trigger: TriggerKind,
427        payload: Value,
428        max_retries: u32,
429        labels: HashMap<String, String>,
430        scheduled_at: Option<DateTime<Utc>>,
431    ) -> Result<Run, EngineError> {
432        let handler = self.handlers.get(handler_name).ok_or_else(|| {
433            EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
434        })?;
435
436        let handler_version = handler.version().map(str::to_string);
437        let mut merged_labels = handler.default_labels();
438        merged_labels.extend(labels);
439
440        let run = self
441            .store
442            .create_run(NewRun {
443                workflow_name: handler_name.to_string(),
444                trigger,
445                payload,
446                max_retries,
447                handler_version,
448                labels: merged_labels,
449                scheduled_at,
450            })
451            .await?;
452
453        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
454        Ok(run)
455    }
456
457    /// Execute a handler-based run (used by the worker after pick_next_pending).
458    ///
459    /// Looks up the handler by the run's `workflow_name` and executes it
460    /// with a fresh [`WorkflowContext`].
461    ///
462    /// # Errors
463    ///
464    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
465    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
466    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
467        let run = self
468            .store
469            .get_run(run_id)
470            .await?
471            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
472
473        let handler = self
474            .handlers
475            .get(&run.workflow_name)
476            .ok_or_else(|| {
477                EngineError::InvalidWorkflow(format!(
478                    "no handler registered: {}",
479                    run.workflow_name
480                ))
481            })?
482            .clone();
483
484        #[cfg(feature = "prometheus")]
485        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
486
487        let run_start = Instant::now();
488        let mut ctx = self.build_context(run_id);
489
490        let result = handler.execute(&mut ctx).await;
491        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
492            .await
493    }
494
495    /// Execute a run by its ID (used by the worker after pick_next_pending).
496    ///
497    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
498    ///
499    /// # Errors
500    ///
501    /// Returns [`EngineError`] if the run is not found or execution fails.
502    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
503    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
504        self.execute_handler_run(run_id).await
505    }
506
507    /// Resume a run after human approval.
508    ///
509    /// Re-executes the handler with step replay: completed steps return
510    /// cached output, approved approval steps are skipped, and execution
511    /// continues from the first unexecuted step.
512    ///
513    /// Supports multiple approval gates -- each resume replays all prior
514    /// steps and stops at the next approval (or completes the run).
515    ///
516    /// # Errors
517    ///
518    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
519    /// Returns [`EngineError`] if execution fails or hits another approval.
520    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
521    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
522        let run = self
523            .store
524            .get_run(run_id)
525            .await?
526            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
527
528        let handler = self
529            .handlers
530            .get(&run.workflow_name)
531            .ok_or_else(|| {
532                EngineError::InvalidWorkflow(format!(
533                    "no handler registered: {}",
534                    run.workflow_name
535                ))
536            })?
537            .clone();
538
539        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
540
541        let run_start = Instant::now();
542        let mut ctx = self.build_context(run_id);
543        ctx.load_replay_steps().await?;
544
545        let result = handler.execute(&mut ctx).await;
546        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
547            .await
548    }
549
550    /// Finalize a run with the given result and context.
551    ///
552    /// On success: updates run to Completed with cost, duration, and completed_at.
553    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
554    /// Always: fetches and returns the final Run.
555    async fn finalize_run(
556        &self,
557        run_id: Uuid,
558        workflow_name: &str,
559        result: Result<(), EngineError>,
560        ctx: &WorkflowContext,
561        run_start: Instant,
562    ) -> Result<Run, EngineError> {
563        let total_duration = run_start.elapsed().as_millis() as u64;
564        let completed_at = Utc::now();
565
566        let final_status;
567        let final_run;
568
569        match result {
570            Ok(()) => {
571                final_status = RunStatus::Completed;
572                final_run = self
573                    .store
574                    .update_run_returning(
575                        run_id,
576                        RunUpdate {
577                            status: Some(RunStatus::Completed),
578                            cost_usd: Some(ctx.total_cost_usd()),
579                            duration_ms: Some(total_duration),
580                            completed_at: Some(completed_at),
581                            ..RunUpdate::default()
582                        },
583                    )
584                    .await?;
585
586                info!(
587                    run_id = %run_id,
588                    cost_usd = %ctx.total_cost_usd(),
589                    duration_ms = total_duration,
590                    "run completed"
591                );
592            }
593            Err(EngineError::ApprovalRequired {
594                run_id: approval_run_id,
595                step_id,
596                ref message,
597            }) => {
598                final_status = RunStatus::AwaitingApproval;
599                final_run = self
600                    .store
601                    .update_run_returning(
602                        run_id,
603                        RunUpdate {
604                            status: Some(RunStatus::AwaitingApproval),
605                            cost_usd: Some(ctx.total_cost_usd()),
606                            duration_ms: Some(total_duration),
607                            ..RunUpdate::default()
608                        },
609                    )
610                    .await?;
611
612                info!(
613                    run_id = %approval_run_id,
614                    step_id = %step_id,
615                    message = %message,
616                    "run awaiting approval"
617                );
618            }
619            Err(err) => {
620                final_status = RunStatus::Failed;
621                if let Err(store_err) = self
622                    .store
623                    .update_run(
624                        run_id,
625                        RunUpdate {
626                            status: Some(RunStatus::Failed),
627                            error: Some(err.to_string()),
628                            cost_usd: Some(ctx.total_cost_usd()),
629                            duration_ms: Some(total_duration),
630                            completed_at: Some(completed_at),
631                            ..RunUpdate::default()
632                        },
633                    )
634                    .await
635                {
636                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
637                }
638
639                error!(run_id = %run_id, error = %err, "run failed");
640
641                self.publish_run_status_changed(
642                    workflow_name,
643                    run_id,
644                    final_status,
645                    Some(err.to_string()),
646                    ctx,
647                    total_duration,
648                );
649
650                #[cfg(feature = "prometheus")]
651                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
652
653                return Err(err);
654            }
655        }
656
657        self.publish_run_status_changed(
658            workflow_name,
659            run_id,
660            final_status,
661            None,
662            ctx,
663            total_duration,
664        );
665
666        #[cfg(feature = "prometheus")]
667        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
668
669        Ok(final_run)
670    }
671
672    /// Emit Prometheus metrics for a completed run.
673    #[cfg(feature = "prometheus")]
674    fn emit_run_metrics(
675        &self,
676        workflow_name: &str,
677        status: RunStatus,
678        duration_ms: u64,
679        ctx: &WorkflowContext,
680    ) {
681        let status_str = status.to_string();
682        let wf = workflow_name.to_string();
683
684        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
685        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
686            .record(duration_ms as f64 / 1000.0);
687        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
688            ctx.total_cost_usd()
689                .to_string()
690                .parse::<f64>()
691                .unwrap_or(0.0),
692        );
693        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
694    }
695
696    /// Publish a run status changed event to all registered subscribers.
697    ///
698    /// `from` is always `Running` because `finalize_run` is only called
699    /// from a running state.
700    fn publish_run_status_changed(
701        &self,
702        workflow_name: &str,
703        run_id: Uuid,
704        to: RunStatus,
705        error: Option<String>,
706        ctx: &WorkflowContext,
707        duration_ms: u64,
708    ) {
709        let now = Utc::now();
710        let cost_usd = ctx.total_cost_usd();
711        let wf = workflow_name.to_string();
712
713        self.event_publisher.publish(Event::RunStatusChanged {
714            run_id,
715            workflow_name: wf.clone(),
716            from: RunStatus::Running,
717            to,
718            error: error.clone(),
719            cost_usd,
720            duration_ms,
721            at: now,
722        });
723
724        if to == RunStatus::Failed {
725            self.event_publisher.publish(Event::RunFailed {
726                run_id,
727                workflow_name: wf,
728                error,
729                cost_usd,
730                duration_ms,
731                at: now,
732            });
733        }
734    }
735}
736
737impl fmt::Debug for Engine {
738    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
739        f.debug_struct("Engine")
740            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
741            .finish_non_exhaustive()
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748    use crate::config::ShellConfig;
749    use crate::handler::{HandlerFuture, WorkflowHandler};
750    use ironflow_core::providers::claude::ClaudeCodeProvider;
751    use ironflow_core::providers::record_replay::RecordReplayProvider;
752    use ironflow_store::memory::InMemoryStore;
753    use ironflow_store::models::StepStatus;
754    use serde_json::json;
755
756    // Test handler that echoes a message via shell
757    struct EchoWorkflow;
758
759    impl WorkflowHandler for EchoWorkflow {
760        fn name(&self) -> &str {
761            "echo-workflow"
762        }
763
764        fn describe(&self) -> WorkflowInfo {
765            WorkflowInfo {
766                description: "A simple workflow that echoes hello".to_string(),
767                source_code: None,
768                sub_workflows: Vec::new(),
769                category: None,
770                version: self.version().map(str::to_string),
771                input_schema: None,
772                default_labels: HashMap::new(),
773            }
774        }
775
776        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
777            Box::pin(async move {
778                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
779                Ok(())
780            })
781        }
782    }
783
784    // Test handler that fails
785    struct FailingWorkflow;
786
787    impl WorkflowHandler for FailingWorkflow {
788        fn name(&self) -> &str {
789            "failing-workflow"
790        }
791
792        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
793            Box::pin(async move {
794                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
795                Ok(())
796            })
797        }
798    }
799
800    fn create_test_engine() -> Engine {
801        let store = Arc::new(InMemoryStore::new());
802        let inner = ClaudeCodeProvider::new();
803        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
804            inner,
805            "/tmp/ironflow-fixtures",
806        ));
807        Engine::new(store, provider)
808    }
809
810    #[test]
811    fn engine_new_creates_instance() {
812        let engine = create_test_engine();
813        assert_eq!(engine.handler_names().len(), 0);
814    }
815
816    #[test]
817    fn engine_register_handler() {
818        let mut engine = create_test_engine();
819        let result = engine.register(EchoWorkflow);
820        assert!(result.is_ok());
821        assert_eq!(engine.handler_names().len(), 1);
822        assert!(engine.handler_names().contains(&"echo-workflow"));
823    }
824
825    #[test]
826    fn engine_register_duplicate_returns_error() {
827        let mut engine = create_test_engine();
828        engine.register(EchoWorkflow).unwrap();
829        let result = engine.register(EchoWorkflow);
830        assert!(result.is_err());
831    }
832
833    #[test]
834    fn engine_get_handler_found() {
835        let mut engine = create_test_engine();
836        engine.register(EchoWorkflow).unwrap();
837        let handler = engine.get_handler("echo-workflow");
838        assert!(handler.is_some());
839    }
840
841    #[test]
842    fn engine_get_handler_not_found() {
843        let engine = create_test_engine();
844        let handler = engine.get_handler("nonexistent");
845        assert!(handler.is_none());
846    }
847
848    #[test]
849    fn engine_handler_names_lists_all() {
850        let mut engine = create_test_engine();
851        engine.register(EchoWorkflow).unwrap();
852        engine.register(FailingWorkflow).unwrap();
853        let names = engine.handler_names();
854        assert_eq!(names.len(), 2);
855        assert!(names.contains(&"echo-workflow"));
856        assert!(names.contains(&"failing-workflow"));
857    }
858
859    #[test]
860    fn engine_handler_info_returns_description() {
861        let mut engine = create_test_engine();
862        engine.register(EchoWorkflow).unwrap();
863        let info = engine.handler_info("echo-workflow");
864        assert!(info.is_some());
865        let info = info.unwrap();
866        assert_eq!(info.description, "A simple workflow that echoes hello");
867    }
868
869    struct CategorizedWorkflow;
870
871    impl WorkflowHandler for CategorizedWorkflow {
872        fn name(&self) -> &str {
873            "categorized"
874        }
875        fn category(&self) -> Option<&str> {
876            Some("data/etl")
877        }
878        fn execute<'a>(
879            &'a self,
880            _ctx: &'a mut WorkflowContext,
881        ) -> crate::handler::HandlerFuture<'a> {
882            Box::pin(async move { Ok(()) })
883        }
884    }
885
886    #[test]
887    fn engine_default_describe_propagates_category() {
888        let mut engine = create_test_engine();
889        engine.register(CategorizedWorkflow).unwrap();
890        let info = engine.handler_info("categorized").unwrap();
891        assert_eq!(info.category.as_deref(), Some("data/etl"));
892    }
893
894    #[test]
895    fn engine_default_describe_without_category() {
896        let mut engine = create_test_engine();
897        engine.register(EchoWorkflow).unwrap();
898        let info = engine.handler_info("echo-workflow").unwrap();
899        assert!(info.category.is_none());
900    }
901
902    struct BadCategoryWorkflow(&'static str);
903
904    impl WorkflowHandler for BadCategoryWorkflow {
905        fn name(&self) -> &str {
906            "bad-category"
907        }
908        fn category(&self) -> Option<&str> {
909            Some(self.0)
910        }
911        fn execute<'a>(
912            &'a self,
913            _ctx: &'a mut WorkflowContext,
914        ) -> crate::handler::HandlerFuture<'a> {
915            Box::pin(async move { Ok(()) })
916        }
917    }
918
919    #[test]
920    fn engine_register_rejects_empty_category() {
921        let mut engine = create_test_engine();
922        let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
923        match err {
924            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
925            other => panic!("expected InvalidWorkflow, got {other:?}"),
926        }
927    }
928
929    #[test]
930    fn engine_register_rejects_leading_slash_category() {
931        let mut engine = create_test_engine();
932        let err = engine
933            .register(BadCategoryWorkflow("/data/etl"))
934            .unwrap_err();
935        match err {
936            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
937            other => panic!("expected InvalidWorkflow, got {other:?}"),
938        }
939    }
940
941    #[test]
942    fn engine_register_rejects_trailing_slash_category() {
943        let mut engine = create_test_engine();
944        let err = engine
945            .register(BadCategoryWorkflow("data/etl/"))
946            .unwrap_err();
947        match err {
948            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
949            other => panic!("expected InvalidWorkflow, got {other:?}"),
950        }
951    }
952
953    #[test]
954    fn engine_register_rejects_double_slash_category() {
955        let mut engine = create_test_engine();
956        let err = engine
957            .register(BadCategoryWorkflow("data//etl"))
958            .unwrap_err();
959        match err {
960            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
961            other => panic!("expected InvalidWorkflow, got {other:?}"),
962        }
963    }
964
965    #[test]
966    fn engine_register_rejects_whitespace_only_segment_category() {
967        let mut engine = create_test_engine();
968        let err = engine
969            .register(BadCategoryWorkflow("data/ /etl"))
970            .unwrap_err();
971        match err {
972            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
973            other => panic!("expected InvalidWorkflow, got {other:?}"),
974        }
975    }
976
977    #[test]
978    fn engine_register_accepts_valid_nested_category() {
979        let mut engine = create_test_engine();
980        assert!(engine.register(CategorizedWorkflow).is_ok());
981    }
982
983    #[tokio::test]
984    async fn engine_unknown_workflow_returns_error() {
985        let engine = create_test_engine();
986        let result = engine
987            .run_handler("unknown", TriggerKind::Manual, json!({}))
988            .await;
989        assert!(result.is_err());
990        match result {
991            Err(EngineError::InvalidWorkflow(msg)) => {
992                assert!(msg.contains("no handler registered"));
993            }
994            _ => panic!("expected InvalidWorkflow error"),
995        }
996    }
997
998    #[tokio::test]
999    async fn engine_enqueue_handler_creates_pending_run() {
1000        let mut engine = create_test_engine();
1001        engine.register(EchoWorkflow).unwrap();
1002
1003        let run = engine
1004            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
1005            .await
1006            .unwrap();
1007        assert_eq!(run.status.state, RunStatus::Pending);
1008        assert_eq!(run.workflow_name, "echo-workflow");
1009    }
1010
1011    #[tokio::test]
1012    async fn engine_register_boxed() {
1013        let mut engine = create_test_engine();
1014        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
1015        let result = engine.register_boxed(handler);
1016        assert!(result.is_ok());
1017        assert_eq!(engine.handler_names().len(), 1);
1018    }
1019
1020    #[tokio::test]
1021    async fn engine_store_and_provider_accessors() {
1022        let store = Arc::new(InMemoryStore::new());
1023        let inner = ClaudeCodeProvider::new();
1024        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
1025            inner,
1026            "/tmp/ironflow-fixtures",
1027        ));
1028        let engine = Engine::new(store.clone(), provider.clone());
1029
1030        // Verify accessors return references
1031        let _ = engine.store();
1032        let _ = engine.provider();
1033    }
1034
1035    // -----------------------------------------------------------------------
1036    // Operation trait tests
1037    // -----------------------------------------------------------------------
1038
1039    use crate::operation::Operation;
1040    use ironflow_store::models::StepKind;
1041    use std::future::Future;
1042    use std::pin::Pin;
1043
1044    struct FakeGitlabOp {
1045        project_id: u64,
1046        title: String,
1047    }
1048
1049    impl Operation for FakeGitlabOp {
1050        fn kind(&self) -> &str {
1051            "gitlab"
1052        }
1053
1054        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1055            Box::pin(async move {
1056                Ok(json!({
1057                    "issue_id": 42,
1058                    "project_id": self.project_id,
1059                    "title": self.title,
1060                }))
1061            })
1062        }
1063
1064        fn input(&self) -> Option<Value> {
1065            Some(json!({
1066                "project_id": self.project_id,
1067                "title": self.title,
1068            }))
1069        }
1070    }
1071
1072    struct FailingOp;
1073
1074    impl Operation for FailingOp {
1075        fn kind(&self) -> &str {
1076            "broken-service"
1077        }
1078
1079        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1080            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1081        }
1082    }
1083
1084    struct OperationWorkflow;
1085
1086    impl WorkflowHandler for OperationWorkflow {
1087        fn name(&self) -> &str {
1088            "operation-workflow"
1089        }
1090
1091        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1092            Box::pin(async move {
1093                let op = FakeGitlabOp {
1094                    project_id: 123,
1095                    title: "Bug report".to_string(),
1096                };
1097                ctx.operation("create-issue", &op).await?;
1098                Ok(())
1099            })
1100        }
1101    }
1102
1103    struct FailingOperationWorkflow;
1104
1105    impl WorkflowHandler for FailingOperationWorkflow {
1106        fn name(&self) -> &str {
1107            "failing-operation-workflow"
1108        }
1109
1110        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1111            Box::pin(async move {
1112                ctx.operation("broken-call", &FailingOp).await?;
1113                Ok(())
1114            })
1115        }
1116    }
1117
1118    struct MixedWorkflow;
1119
1120    impl WorkflowHandler for MixedWorkflow {
1121        fn name(&self) -> &str {
1122            "mixed-workflow"
1123        }
1124
1125        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1126            Box::pin(async move {
1127                ctx.shell("build", ShellConfig::new("echo built")).await?;
1128                let op = FakeGitlabOp {
1129                    project_id: 456,
1130                    title: "Deploy done".to_string(),
1131                };
1132                let result = ctx.operation("notify-gitlab", &op).await?;
1133                assert_eq!(result.output["issue_id"], 42);
1134                Ok(())
1135            })
1136        }
1137    }
1138
1139    #[tokio::test]
1140    async fn operation_step_happy_path() {
1141        let mut engine = create_test_engine();
1142        engine.register(OperationWorkflow).unwrap();
1143
1144        let run = engine
1145            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1146            .await
1147            .unwrap();
1148
1149        assert_eq!(run.status.state, RunStatus::Completed);
1150
1151        let steps = engine.store().list_steps(run.id).await.unwrap();
1152
1153        assert_eq!(steps.len(), 1);
1154        assert_eq!(steps[0].name, "create-issue");
1155        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1156        assert_eq!(
1157            steps[0].status.state,
1158            ironflow_store::models::StepStatus::Completed
1159        );
1160
1161        let output = steps[0].output.as_ref().unwrap();
1162        assert_eq!(output["issue_id"], 42);
1163        assert_eq!(output["project_id"], 123);
1164
1165        let input = steps[0].input.as_ref().unwrap();
1166        assert_eq!(input["project_id"], 123);
1167        assert_eq!(input["title"], "Bug report");
1168    }
1169
1170    #[tokio::test]
1171    async fn operation_step_failure_marks_run_failed() {
1172        let mut engine = create_test_engine();
1173        engine.register(FailingOperationWorkflow).unwrap();
1174
1175        let result = engine
1176            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1177            .await;
1178
1179        assert!(result.is_err());
1180    }
1181
1182    #[tokio::test]
1183    async fn operation_mixed_with_shell_steps() {
1184        let mut engine = create_test_engine();
1185        engine.register(MixedWorkflow).unwrap();
1186
1187        let run = engine
1188            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1189            .await
1190            .unwrap();
1191
1192        assert_eq!(run.status.state, RunStatus::Completed);
1193
1194        let steps = engine.store().list_steps(run.id).await.unwrap();
1195
1196        assert_eq!(steps.len(), 2);
1197        assert_eq!(steps[0].kind, StepKind::Shell);
1198        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1199        assert_eq!(steps[0].position, 0);
1200        assert_eq!(steps[1].position, 1);
1201    }
1202
1203    // -----------------------------------------------------------------------
1204    // Approval + resume tests
1205    // -----------------------------------------------------------------------
1206
1207    use crate::config::ApprovalConfig;
1208
1209    struct SingleApprovalWorkflow;
1210
1211    impl WorkflowHandler for SingleApprovalWorkflow {
1212        fn name(&self) -> &str {
1213            "single-approval"
1214        }
1215
1216        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1217            Box::pin(async move {
1218                ctx.shell("build", ShellConfig::new("echo built")).await?;
1219                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1220                ctx.shell("deploy", ShellConfig::new("echo deployed"))
1221                    .await?;
1222                Ok(())
1223            })
1224        }
1225    }
1226
1227    struct DoubleApprovalWorkflow;
1228
1229    impl WorkflowHandler for DoubleApprovalWorkflow {
1230        fn name(&self) -> &str {
1231            "double-approval"
1232        }
1233
1234        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1235            Box::pin(async move {
1236                ctx.shell("build", ShellConfig::new("echo built")).await?;
1237                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1238                    .await?;
1239                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1240                    .await?;
1241                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1242                    .await?;
1243                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1244                    .await?;
1245                Ok(())
1246            })
1247        }
1248    }
1249
1250    #[tokio::test]
1251    async fn approval_pauses_run() {
1252        let mut engine = create_test_engine();
1253        engine.register(SingleApprovalWorkflow).unwrap();
1254
1255        let run = engine
1256            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1257            .await
1258            .unwrap();
1259
1260        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1261
1262        let steps = engine.store().list_steps(run.id).await.unwrap();
1263        assert_eq!(steps.len(), 2); // build + approval gate
1264        assert_eq!(steps[0].kind, StepKind::Shell);
1265        assert_eq!(steps[0].status.state, StepStatus::Completed);
1266        assert_eq!(steps[1].kind, StepKind::Approval);
1267        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1268    }
1269
1270    #[tokio::test]
1271    async fn approval_resume_completes_run() {
1272        let mut engine = create_test_engine();
1273        engine.register(SingleApprovalWorkflow).unwrap();
1274
1275        // First execution: pauses at approval
1276        let run = engine
1277            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1278            .await
1279            .unwrap();
1280        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1281
1282        // Simulate approval: transition to Running
1283        engine
1284            .store()
1285            .update_run_status(run.id, RunStatus::Running)
1286            .await
1287            .unwrap();
1288
1289        // Resume: replays build, skips approval, executes deploy
1290        let resumed = engine.resume_run(run.id).await.unwrap();
1291        assert_eq!(resumed.status.state, RunStatus::Completed);
1292
1293        let steps = engine.store().list_steps(run.id).await.unwrap();
1294        assert_eq!(steps.len(), 3); // build + approval + deploy
1295        assert_eq!(steps[0].name, "build");
1296        assert_eq!(steps[0].status.state, StepStatus::Completed);
1297        assert_eq!(steps[1].name, "gate");
1298        assert_eq!(steps[1].kind, StepKind::Approval);
1299        assert_eq!(steps[1].status.state, StepStatus::Completed);
1300        assert_eq!(steps[2].name, "deploy");
1301        assert_eq!(steps[2].status.state, StepStatus::Completed);
1302    }
1303
1304    #[tokio::test]
1305    async fn double_approval_two_resumes() {
1306        let mut engine = create_test_engine();
1307        engine.register(DoubleApprovalWorkflow).unwrap();
1308
1309        // First execution: pauses at staging-gate
1310        let run = engine
1311            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1312            .await
1313            .unwrap();
1314        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1315
1316        let steps = engine.store().list_steps(run.id).await.unwrap();
1317        assert_eq!(steps.len(), 2); // build + staging-gate
1318
1319        // First approval
1320        engine
1321            .store()
1322            .update_run_status(run.id, RunStatus::Running)
1323            .await
1324            .unwrap();
1325
1326        let resumed = engine.resume_run(run.id).await.unwrap();
1327        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1328
1329        let steps = engine.store().list_steps(run.id).await.unwrap();
1330        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1331
1332        // Second approval
1333        engine
1334            .store()
1335            .update_run_status(run.id, RunStatus::Running)
1336            .await
1337            .unwrap();
1338
1339        let final_run = engine.resume_run(run.id).await.unwrap();
1340        assert_eq!(final_run.status.state, RunStatus::Completed);
1341
1342        let steps = engine.store().list_steps(run.id).await.unwrap();
1343        assert_eq!(steps.len(), 5);
1344        assert_eq!(steps[0].name, "build");
1345        assert_eq!(steps[1].name, "staging-gate");
1346        assert_eq!(steps[2].name, "deploy-staging");
1347        assert_eq!(steps[3].name, "prod-gate");
1348        assert_eq!(steps[4].name, "deploy-prod");
1349
1350        for step in &steps {
1351            assert_eq!(step.status.state, StepStatus::Completed);
1352        }
1353    }
1354}