Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::Store;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33/// The workflow orchestration engine.
34///
35/// Holds references to the store, agent provider, and a registry of
36/// [`WorkflowHandler`]s.
37///
38/// # Examples
39///
40/// ```no_run
41/// use std::sync::Arc;
42/// use ironflow_engine::engine::Engine;
43/// use ironflow_engine::config::ShellConfig;
44/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
45/// use ironflow_engine::context::WorkflowContext;
46/// use ironflow_store::memory::InMemoryStore;
47/// use ironflow_store::models::TriggerKind;
48/// use ironflow_core::providers::claude::ClaudeCodeProvider;
49/// use serde_json::json;
50///
51/// struct CiWorkflow;
52/// impl WorkflowHandler for CiWorkflow {
53///     fn name(&self) -> &str { "ci" }
54///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
55///         Box::pin(async move {
56///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
57///             Ok(())
58///         })
59///     }
60/// }
61///
62/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
63/// let store = Arc::new(InMemoryStore::new());
64/// let provider = Arc::new(ClaudeCodeProvider::new());
65/// let mut engine = Engine::new(store, provider);
66/// engine.register(CiWorkflow)?;
67///
68/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
69/// println!("Run {} completed with status {:?}", run.id, run.status);
70/// # Ok(())
71/// # }
72/// ```
73pub struct Engine {
74    store: Arc<dyn Store>,
75    provider: Arc<dyn AgentProvider>,
76    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77    event_publisher: EventPublisher,
78}
79
80/// Validate a workflow category path.
81///
82/// A category is a `/`-separated list of non-empty segments. This function
83/// rejects empty paths, leading or trailing `/`, consecutive `/`, and
84/// segments containing only whitespace.
85///
86/// # Errors
87///
88/// Returns [`EngineError::InvalidWorkflow`] when the category is malformed.
89fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
90    let reject = |reason: &str| {
91        Err(EngineError::InvalidWorkflow(format!(
92            "handler '{handler_name}' has invalid category '{category}': {reason}"
93        )))
94    };
95
96    if category.is_empty() {
97        return reject("empty category");
98    }
99    if category.starts_with('/') {
100        return reject("leading '/'");
101    }
102    if category.ends_with('/') {
103        return reject("trailing '/'");
104    }
105    for segment in category.split('/') {
106        if segment.is_empty() {
107            return reject("empty segment (double '/')");
108        }
109        if segment.trim().is_empty() {
110            return reject("whitespace-only segment");
111        }
112    }
113    Ok(())
114}
115
116impl Engine {
117    /// Create a new engine with the given store and agent provider.
118    ///
119    /// # Examples
120    ///
121    /// ```no_run
122    /// use std::sync::Arc;
123    /// use ironflow_engine::engine::Engine;
124    /// use ironflow_store::memory::InMemoryStore;
125    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
126    ///
127    /// let engine = Engine::new(
128    ///     Arc::new(InMemoryStore::new()),
129    ///     Arc::new(ClaudeCodeProvider::new()),
130    /// );
131    /// ```
132    pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
133        Self {
134            store,
135            provider,
136            handlers: HashMap::new(),
137            event_publisher: EventPublisher::new(),
138        }
139    }
140
141    /// Returns a reference to the backing store.
142    pub fn store(&self) -> &Arc<dyn Store> {
143        &self.store
144    }
145
146    /// Returns a reference to the agent provider.
147    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
148        &self.provider
149    }
150
151    /// Build a [`WorkflowContext`] with access to the handler registry.
152    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
153        let handlers = self.handlers.clone();
154        let resolver: crate::context::HandlerResolver =
155            Arc::new(move |name: &str| handlers.get(name).cloned());
156        WorkflowContext::with_handler_resolver(
157            run_id,
158            self.store.clone(),
159            self.provider.clone(),
160            resolver,
161        )
162    }
163
164    // -----------------------------------------------------------------------
165    // Handler registration
166    // -----------------------------------------------------------------------
167
168    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
169    ///
170    /// The handler is looked up by [`WorkflowHandler::name`] when executing
171    /// or enqueuing.
172    ///
173    /// # Errors
174    ///
175    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
176    /// name is already registered.
177    ///
178    /// # Examples
179    ///
180    /// ```no_run
181    /// use std::sync::Arc;
182    /// use ironflow_engine::engine::Engine;
183    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
184    /// use ironflow_engine::context::WorkflowContext;
185    /// use ironflow_engine::config::ShellConfig;
186    /// use ironflow_store::memory::InMemoryStore;
187    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
188    ///
189    /// struct MyWorkflow;
190    /// impl WorkflowHandler for MyWorkflow {
191    ///     fn name(&self) -> &str { "my-workflow" }
192    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
193    ///         Box::pin(async move {
194    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
195    ///             Ok(())
196    ///         })
197    ///     }
198    /// }
199    ///
200    /// let mut engine = Engine::new(
201    ///     Arc::new(InMemoryStore::new()),
202    ///     Arc::new(ClaudeCodeProvider::new()),
203    /// );
204    /// engine.register(MyWorkflow)?;
205    /// # Ok::<(), ironflow_engine::error::EngineError>(())
206    /// ```
207    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
208        let name = handler.name().to_string();
209        if self.handlers.contains_key(&name) {
210            return Err(EngineError::InvalidWorkflow(format!(
211                "handler '{}' already registered",
212                name
213            )));
214        }
215        if let Some(category) = handler.category() {
216            validate_category(&name, category)?;
217        }
218        self.handlers.insert(name, Arc::new(handler));
219        Ok(())
220    }
221
222    /// Register a pre-boxed workflow handler.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
227    /// name is already registered or if its category is invalid.
228    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
229        let name = handler.name().to_string();
230        if self.handlers.contains_key(&name) {
231            return Err(EngineError::InvalidWorkflow(format!(
232                "handler '{}' already registered",
233                name
234            )));
235        }
236        if let Some(category) = handler.category() {
237            validate_category(&name, category)?;
238        }
239        self.handlers.insert(name, Arc::from(handler));
240        Ok(())
241    }
242
243    /// Get a registered handler by name.
244    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
245        self.handlers.get(name)
246    }
247
248    /// List registered handler names.
249    pub fn handler_names(&self) -> Vec<&str> {
250        self.handlers.keys().map(|s| s.as_str()).collect()
251    }
252
253    /// Get detailed info about a registered workflow handler.
254    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
255        self.handlers.get(name).map(|h| h.describe())
256    }
257
258    /// Register an event subscriber for domain events.
259    ///
260    /// The subscriber is called only for events whose type is in
261    /// `event_types`. Pass [`Event::ALL`] to receive every event.
262    ///
263    /// # Examples
264    ///
265    /// ```no_run
266    /// use ironflow_engine::engine::Engine;
267    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
268    /// use ironflow_store::memory::InMemoryStore;
269    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
270    /// use std::sync::Arc;
271    ///
272    /// let mut engine = Engine::new(
273    ///     Arc::new(InMemoryStore::new()),
274    ///     Arc::new(ClaudeCodeProvider::new()),
275    /// );
276    ///
277    /// engine.subscribe(
278    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
279    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
280    /// );
281    /// ```
282    pub fn subscribe(
283        &mut self,
284        subscriber: impl EventSubscriber + 'static,
285        event_types: &[&'static str],
286    ) {
287        self.event_publisher.subscribe(subscriber, event_types);
288    }
289
290    /// Returns a reference to the event publisher.
291    ///
292    /// Useful for publishing events from outside the engine (e.g. auth
293    /// routes in the API layer).
294    pub fn event_publisher(&self) -> &EventPublisher {
295        &self.event_publisher
296    }
297
298    // -----------------------------------------------------------------------
299    // Dynamic workflow execution (WorkflowHandler)
300    // -----------------------------------------------------------------------
301
302    /// Execute a registered handler inline.
303    ///
304    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
305    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
310    /// with that name. Returns [`EngineError`] if execution fails.
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// use std::sync::Arc;
316    /// use ironflow_engine::engine::Engine;
317    /// use ironflow_store::memory::InMemoryStore;
318    /// use ironflow_store::models::TriggerKind;
319    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
320    /// use serde_json::json;
321    ///
322    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
323    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
324    /// # Ok(())
325    /// # }
326    /// ```
327    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
328    pub async fn run_handler(
329        &self,
330        handler_name: &str,
331        trigger: TriggerKind,
332        payload: Value,
333    ) -> Result<Run, EngineError> {
334        let handler = self
335            .handlers
336            .get(handler_name)
337            .ok_or_else(|| {
338                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
339            })?
340            .clone();
341
342        let handler_version = handler.version().map(str::to_string);
343        let run = self
344            .store
345            .create_run(NewRun {
346                workflow_name: handler_name.to_string(),
347                trigger,
348                payload,
349                max_retries: 0,
350                handler_version,
351            })
352            .await?;
353
354        let run_id = run.id;
355        info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
356
357        self.store
358            .update_run_status(run_id, RunStatus::Running)
359            .await?;
360
361        #[cfg(feature = "prometheus")]
362        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
363
364        let run_start = Instant::now();
365        let mut ctx = self.build_context(run_id);
366
367        let result = handler.execute(&mut ctx).await;
368        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
369            .await
370    }
371
372    /// Enqueue a handler-based workflow for worker execution.
373    ///
374    /// The workflow name is stored in the run. The worker looks up the
375    /// handler by name when executing.
376    ///
377    /// # Errors
378    ///
379    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
380    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
381    pub async fn enqueue_handler(
382        &self,
383        handler_name: &str,
384        trigger: TriggerKind,
385        payload: Value,
386        max_retries: u32,
387    ) -> Result<Run, EngineError> {
388        let handler = self.handlers.get(handler_name).ok_or_else(|| {
389            EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
390        })?;
391
392        let handler_version = handler.version().map(str::to_string);
393        let run = self
394            .store
395            .create_run(NewRun {
396                workflow_name: handler_name.to_string(),
397                trigger,
398                payload,
399                max_retries,
400                handler_version,
401            })
402            .await?;
403
404        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
405        Ok(run)
406    }
407
408    /// Execute a handler-based run (used by the worker after pick_next_pending).
409    ///
410    /// Looks up the handler by the run's `workflow_name` and executes it
411    /// with a fresh [`WorkflowContext`].
412    ///
413    /// # Errors
414    ///
415    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
416    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
417    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
418        let run = self
419            .store
420            .get_run(run_id)
421            .await?
422            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
423
424        let handler = self
425            .handlers
426            .get(&run.workflow_name)
427            .ok_or_else(|| {
428                EngineError::InvalidWorkflow(format!(
429                    "no handler registered: {}",
430                    run.workflow_name
431                ))
432            })?
433            .clone();
434
435        #[cfg(feature = "prometheus")]
436        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
437
438        let run_start = Instant::now();
439        let mut ctx = self.build_context(run_id);
440
441        let result = handler.execute(&mut ctx).await;
442        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
443            .await
444    }
445
446    /// Execute a run by its ID (used by the worker after pick_next_pending).
447    ///
448    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
449    ///
450    /// # Errors
451    ///
452    /// Returns [`EngineError`] if the run is not found or execution fails.
453    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
454    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
455        self.execute_handler_run(run_id).await
456    }
457
458    /// Resume a run after human approval.
459    ///
460    /// Re-executes the handler with step replay: completed steps return
461    /// cached output, approved approval steps are skipped, and execution
462    /// continues from the first unexecuted step.
463    ///
464    /// Supports multiple approval gates -- each resume replays all prior
465    /// steps and stops at the next approval (or completes the run).
466    ///
467    /// # Errors
468    ///
469    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
470    /// Returns [`EngineError`] if execution fails or hits another approval.
471    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
472    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
473        let run = self
474            .store
475            .get_run(run_id)
476            .await?
477            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
478
479        let handler = self
480            .handlers
481            .get(&run.workflow_name)
482            .ok_or_else(|| {
483                EngineError::InvalidWorkflow(format!(
484                    "no handler registered: {}",
485                    run.workflow_name
486                ))
487            })?
488            .clone();
489
490        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
491
492        let run_start = Instant::now();
493        let mut ctx = self.build_context(run_id);
494        ctx.load_replay_steps().await?;
495
496        let result = handler.execute(&mut ctx).await;
497        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
498            .await
499    }
500
501    /// Finalize a run with the given result and context.
502    ///
503    /// On success: updates run to Completed with cost, duration, and completed_at.
504    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
505    /// Always: fetches and returns the final Run.
506    async fn finalize_run(
507        &self,
508        run_id: Uuid,
509        workflow_name: &str,
510        result: Result<(), EngineError>,
511        ctx: &WorkflowContext,
512        run_start: Instant,
513    ) -> Result<Run, EngineError> {
514        let total_duration = run_start.elapsed().as_millis() as u64;
515        let completed_at = Utc::now();
516
517        let final_status;
518        let final_run;
519
520        match result {
521            Ok(()) => {
522                final_status = RunStatus::Completed;
523                final_run = self
524                    .store
525                    .update_run_returning(
526                        run_id,
527                        RunUpdate {
528                            status: Some(RunStatus::Completed),
529                            cost_usd: Some(ctx.total_cost_usd()),
530                            duration_ms: Some(total_duration),
531                            completed_at: Some(completed_at),
532                            ..RunUpdate::default()
533                        },
534                    )
535                    .await?;
536
537                info!(
538                    run_id = %run_id,
539                    cost_usd = %ctx.total_cost_usd(),
540                    duration_ms = total_duration,
541                    "run completed"
542                );
543            }
544            Err(EngineError::ApprovalRequired {
545                run_id: approval_run_id,
546                step_id,
547                ref message,
548            }) => {
549                final_status = RunStatus::AwaitingApproval;
550                final_run = self
551                    .store
552                    .update_run_returning(
553                        run_id,
554                        RunUpdate {
555                            status: Some(RunStatus::AwaitingApproval),
556                            cost_usd: Some(ctx.total_cost_usd()),
557                            duration_ms: Some(total_duration),
558                            ..RunUpdate::default()
559                        },
560                    )
561                    .await?;
562
563                info!(
564                    run_id = %approval_run_id,
565                    step_id = %step_id,
566                    message = %message,
567                    "run awaiting approval"
568                );
569            }
570            Err(err) => {
571                final_status = RunStatus::Failed;
572                if let Err(store_err) = self
573                    .store
574                    .update_run(
575                        run_id,
576                        RunUpdate {
577                            status: Some(RunStatus::Failed),
578                            error: Some(err.to_string()),
579                            cost_usd: Some(ctx.total_cost_usd()),
580                            duration_ms: Some(total_duration),
581                            completed_at: Some(completed_at),
582                            ..RunUpdate::default()
583                        },
584                    )
585                    .await
586                {
587                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
588                }
589
590                error!(run_id = %run_id, error = %err, "run failed");
591
592                self.publish_run_status_changed(
593                    workflow_name,
594                    run_id,
595                    final_status,
596                    Some(err.to_string()),
597                    ctx,
598                    total_duration,
599                );
600
601                #[cfg(feature = "prometheus")]
602                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
603
604                return Err(err);
605            }
606        }
607
608        self.publish_run_status_changed(
609            workflow_name,
610            run_id,
611            final_status,
612            None,
613            ctx,
614            total_duration,
615        );
616
617        #[cfg(feature = "prometheus")]
618        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
619
620        Ok(final_run)
621    }
622
623    /// Emit Prometheus metrics for a completed run.
624    #[cfg(feature = "prometheus")]
625    fn emit_run_metrics(
626        &self,
627        workflow_name: &str,
628        status: RunStatus,
629        duration_ms: u64,
630        ctx: &WorkflowContext,
631    ) {
632        let status_str = status.to_string();
633        let wf = workflow_name.to_string();
634
635        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
636        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
637            .record(duration_ms as f64 / 1000.0);
638        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
639            ctx.total_cost_usd()
640                .to_string()
641                .parse::<f64>()
642                .unwrap_or(0.0),
643        );
644        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
645    }
646
647    /// Publish a run status changed event to all registered subscribers.
648    ///
649    /// `from` is always `Running` because `finalize_run` is only called
650    /// from a running state.
651    fn publish_run_status_changed(
652        &self,
653        workflow_name: &str,
654        run_id: Uuid,
655        to: RunStatus,
656        error: Option<String>,
657        ctx: &WorkflowContext,
658        duration_ms: u64,
659    ) {
660        let now = Utc::now();
661        let cost_usd = ctx.total_cost_usd();
662        let wf = workflow_name.to_string();
663
664        self.event_publisher.publish(Event::RunStatusChanged {
665            run_id,
666            workflow_name: wf.clone(),
667            from: RunStatus::Running,
668            to,
669            error: error.clone(),
670            cost_usd,
671            duration_ms,
672            at: now,
673        });
674
675        if to == RunStatus::Failed {
676            self.event_publisher.publish(Event::RunFailed {
677                run_id,
678                workflow_name: wf,
679                error,
680                cost_usd,
681                duration_ms,
682                at: now,
683            });
684        }
685    }
686}
687
688impl fmt::Debug for Engine {
689    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
690        f.debug_struct("Engine")
691            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
692            .finish_non_exhaustive()
693    }
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699    use crate::config::ShellConfig;
700    use crate::handler::{HandlerFuture, WorkflowHandler};
701    use ironflow_core::providers::claude::ClaudeCodeProvider;
702    use ironflow_core::providers::record_replay::RecordReplayProvider;
703    use ironflow_store::memory::InMemoryStore;
704    use ironflow_store::models::StepStatus;
705    use serde_json::json;
706
707    // Test handler that echoes a message via shell
708    struct EchoWorkflow;
709
710    impl WorkflowHandler for EchoWorkflow {
711        fn name(&self) -> &str {
712            "echo-workflow"
713        }
714
715        fn describe(&self) -> WorkflowInfo {
716            WorkflowInfo {
717                description: "A simple workflow that echoes hello".to_string(),
718                source_code: None,
719                sub_workflows: Vec::new(),
720                category: None,
721                version: self.version().map(str::to_string),
722            }
723        }
724
725        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
726            Box::pin(async move {
727                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
728                Ok(())
729            })
730        }
731    }
732
733    // Test handler that fails
734    struct FailingWorkflow;
735
736    impl WorkflowHandler for FailingWorkflow {
737        fn name(&self) -> &str {
738            "failing-workflow"
739        }
740
741        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
742            Box::pin(async move {
743                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
744                Ok(())
745            })
746        }
747    }
748
749    fn create_test_engine() -> Engine {
750        let store = Arc::new(InMemoryStore::new());
751        let inner = ClaudeCodeProvider::new();
752        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
753            inner,
754            "/tmp/ironflow-fixtures",
755        ));
756        Engine::new(store, provider)
757    }
758
759    #[test]
760    fn engine_new_creates_instance() {
761        let engine = create_test_engine();
762        assert_eq!(engine.handler_names().len(), 0);
763    }
764
765    #[test]
766    fn engine_register_handler() {
767        let mut engine = create_test_engine();
768        let result = engine.register(EchoWorkflow);
769        assert!(result.is_ok());
770        assert_eq!(engine.handler_names().len(), 1);
771        assert!(engine.handler_names().contains(&"echo-workflow"));
772    }
773
774    #[test]
775    fn engine_register_duplicate_returns_error() {
776        let mut engine = create_test_engine();
777        engine.register(EchoWorkflow).unwrap();
778        let result = engine.register(EchoWorkflow);
779        assert!(result.is_err());
780    }
781
782    #[test]
783    fn engine_get_handler_found() {
784        let mut engine = create_test_engine();
785        engine.register(EchoWorkflow).unwrap();
786        let handler = engine.get_handler("echo-workflow");
787        assert!(handler.is_some());
788    }
789
790    #[test]
791    fn engine_get_handler_not_found() {
792        let engine = create_test_engine();
793        let handler = engine.get_handler("nonexistent");
794        assert!(handler.is_none());
795    }
796
797    #[test]
798    fn engine_handler_names_lists_all() {
799        let mut engine = create_test_engine();
800        engine.register(EchoWorkflow).unwrap();
801        engine.register(FailingWorkflow).unwrap();
802        let names = engine.handler_names();
803        assert_eq!(names.len(), 2);
804        assert!(names.contains(&"echo-workflow"));
805        assert!(names.contains(&"failing-workflow"));
806    }
807
808    #[test]
809    fn engine_handler_info_returns_description() {
810        let mut engine = create_test_engine();
811        engine.register(EchoWorkflow).unwrap();
812        let info = engine.handler_info("echo-workflow");
813        assert!(info.is_some());
814        let info = info.unwrap();
815        assert_eq!(info.description, "A simple workflow that echoes hello");
816    }
817
818    struct CategorizedWorkflow;
819
820    impl WorkflowHandler for CategorizedWorkflow {
821        fn name(&self) -> &str {
822            "categorized"
823        }
824        fn category(&self) -> Option<&str> {
825            Some("data/etl")
826        }
827        fn execute<'a>(
828            &'a self,
829            _ctx: &'a mut WorkflowContext,
830        ) -> crate::handler::HandlerFuture<'a> {
831            Box::pin(async move { Ok(()) })
832        }
833    }
834
835    #[test]
836    fn engine_default_describe_propagates_category() {
837        let mut engine = create_test_engine();
838        engine.register(CategorizedWorkflow).unwrap();
839        let info = engine.handler_info("categorized").unwrap();
840        assert_eq!(info.category.as_deref(), Some("data/etl"));
841    }
842
843    #[test]
844    fn engine_default_describe_without_category() {
845        let mut engine = create_test_engine();
846        engine.register(EchoWorkflow).unwrap();
847        let info = engine.handler_info("echo-workflow").unwrap();
848        assert!(info.category.is_none());
849    }
850
851    struct BadCategoryWorkflow(&'static str);
852
853    impl WorkflowHandler for BadCategoryWorkflow {
854        fn name(&self) -> &str {
855            "bad-category"
856        }
857        fn category(&self) -> Option<&str> {
858            Some(self.0)
859        }
860        fn execute<'a>(
861            &'a self,
862            _ctx: &'a mut WorkflowContext,
863        ) -> crate::handler::HandlerFuture<'a> {
864            Box::pin(async move { Ok(()) })
865        }
866    }
867
868    #[test]
869    fn engine_register_rejects_empty_category() {
870        let mut engine = create_test_engine();
871        let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
872        match err {
873            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
874            other => panic!("expected InvalidWorkflow, got {other:?}"),
875        }
876    }
877
878    #[test]
879    fn engine_register_rejects_leading_slash_category() {
880        let mut engine = create_test_engine();
881        let err = engine
882            .register(BadCategoryWorkflow("/data/etl"))
883            .unwrap_err();
884        match err {
885            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
886            other => panic!("expected InvalidWorkflow, got {other:?}"),
887        }
888    }
889
890    #[test]
891    fn engine_register_rejects_trailing_slash_category() {
892        let mut engine = create_test_engine();
893        let err = engine
894            .register(BadCategoryWorkflow("data/etl/"))
895            .unwrap_err();
896        match err {
897            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
898            other => panic!("expected InvalidWorkflow, got {other:?}"),
899        }
900    }
901
902    #[test]
903    fn engine_register_rejects_double_slash_category() {
904        let mut engine = create_test_engine();
905        let err = engine
906            .register(BadCategoryWorkflow("data//etl"))
907            .unwrap_err();
908        match err {
909            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
910            other => panic!("expected InvalidWorkflow, got {other:?}"),
911        }
912    }
913
914    #[test]
915    fn engine_register_rejects_whitespace_only_segment_category() {
916        let mut engine = create_test_engine();
917        let err = engine
918            .register(BadCategoryWorkflow("data/ /etl"))
919            .unwrap_err();
920        match err {
921            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
922            other => panic!("expected InvalidWorkflow, got {other:?}"),
923        }
924    }
925
926    #[test]
927    fn engine_register_accepts_valid_nested_category() {
928        let mut engine = create_test_engine();
929        assert!(engine.register(CategorizedWorkflow).is_ok());
930    }
931
932    #[tokio::test]
933    async fn engine_unknown_workflow_returns_error() {
934        let engine = create_test_engine();
935        let result = engine
936            .run_handler("unknown", TriggerKind::Manual, json!({}))
937            .await;
938        assert!(result.is_err());
939        match result {
940            Err(EngineError::InvalidWorkflow(msg)) => {
941                assert!(msg.contains("no handler registered"));
942            }
943            _ => panic!("expected InvalidWorkflow error"),
944        }
945    }
946
947    #[tokio::test]
948    async fn engine_enqueue_handler_creates_pending_run() {
949        let mut engine = create_test_engine();
950        engine.register(EchoWorkflow).unwrap();
951
952        let run = engine
953            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
954            .await
955            .unwrap();
956        assert_eq!(run.status.state, RunStatus::Pending);
957        assert_eq!(run.workflow_name, "echo-workflow");
958    }
959
960    #[tokio::test]
961    async fn engine_register_boxed() {
962        let mut engine = create_test_engine();
963        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
964        let result = engine.register_boxed(handler);
965        assert!(result.is_ok());
966        assert_eq!(engine.handler_names().len(), 1);
967    }
968
969    #[tokio::test]
970    async fn engine_store_and_provider_accessors() {
971        let store = Arc::new(InMemoryStore::new());
972        let inner = ClaudeCodeProvider::new();
973        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
974            inner,
975            "/tmp/ironflow-fixtures",
976        ));
977        let engine = Engine::new(store.clone(), provider.clone());
978
979        // Verify accessors return references
980        let _ = engine.store();
981        let _ = engine.provider();
982    }
983
984    // -----------------------------------------------------------------------
985    // Operation trait tests
986    // -----------------------------------------------------------------------
987
988    use crate::operation::Operation;
989    use ironflow_store::models::StepKind;
990    use std::future::Future;
991    use std::pin::Pin;
992
993    struct FakeGitlabOp {
994        project_id: u64,
995        title: String,
996    }
997
998    impl Operation for FakeGitlabOp {
999        fn kind(&self) -> &str {
1000            "gitlab"
1001        }
1002
1003        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1004            Box::pin(async move {
1005                Ok(json!({
1006                    "issue_id": 42,
1007                    "project_id": self.project_id,
1008                    "title": self.title,
1009                }))
1010            })
1011        }
1012
1013        fn input(&self) -> Option<Value> {
1014            Some(json!({
1015                "project_id": self.project_id,
1016                "title": self.title,
1017            }))
1018        }
1019    }
1020
1021    struct FailingOp;
1022
1023    impl Operation for FailingOp {
1024        fn kind(&self) -> &str {
1025            "broken-service"
1026        }
1027
1028        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1029            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1030        }
1031    }
1032
1033    struct OperationWorkflow;
1034
1035    impl WorkflowHandler for OperationWorkflow {
1036        fn name(&self) -> &str {
1037            "operation-workflow"
1038        }
1039
1040        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1041            Box::pin(async move {
1042                let op = FakeGitlabOp {
1043                    project_id: 123,
1044                    title: "Bug report".to_string(),
1045                };
1046                ctx.operation("create-issue", &op).await?;
1047                Ok(())
1048            })
1049        }
1050    }
1051
1052    struct FailingOperationWorkflow;
1053
1054    impl WorkflowHandler for FailingOperationWorkflow {
1055        fn name(&self) -> &str {
1056            "failing-operation-workflow"
1057        }
1058
1059        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1060            Box::pin(async move {
1061                ctx.operation("broken-call", &FailingOp).await?;
1062                Ok(())
1063            })
1064        }
1065    }
1066
1067    struct MixedWorkflow;
1068
1069    impl WorkflowHandler for MixedWorkflow {
1070        fn name(&self) -> &str {
1071            "mixed-workflow"
1072        }
1073
1074        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1075            Box::pin(async move {
1076                ctx.shell("build", ShellConfig::new("echo built")).await?;
1077                let op = FakeGitlabOp {
1078                    project_id: 456,
1079                    title: "Deploy done".to_string(),
1080                };
1081                let result = ctx.operation("notify-gitlab", &op).await?;
1082                assert_eq!(result.output["issue_id"], 42);
1083                Ok(())
1084            })
1085        }
1086    }
1087
1088    #[tokio::test]
1089    async fn operation_step_happy_path() {
1090        let mut engine = create_test_engine();
1091        engine.register(OperationWorkflow).unwrap();
1092
1093        let run = engine
1094            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1095            .await
1096            .unwrap();
1097
1098        assert_eq!(run.status.state, RunStatus::Completed);
1099
1100        let steps = engine.store().list_steps(run.id).await.unwrap();
1101
1102        assert_eq!(steps.len(), 1);
1103        assert_eq!(steps[0].name, "create-issue");
1104        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1105        assert_eq!(
1106            steps[0].status.state,
1107            ironflow_store::models::StepStatus::Completed
1108        );
1109
1110        let output = steps[0].output.as_ref().unwrap();
1111        assert_eq!(output["issue_id"], 42);
1112        assert_eq!(output["project_id"], 123);
1113
1114        let input = steps[0].input.as_ref().unwrap();
1115        assert_eq!(input["project_id"], 123);
1116        assert_eq!(input["title"], "Bug report");
1117    }
1118
1119    #[tokio::test]
1120    async fn operation_step_failure_marks_run_failed() {
1121        let mut engine = create_test_engine();
1122        engine.register(FailingOperationWorkflow).unwrap();
1123
1124        let result = engine
1125            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1126            .await;
1127
1128        assert!(result.is_err());
1129    }
1130
1131    #[tokio::test]
1132    async fn operation_mixed_with_shell_steps() {
1133        let mut engine = create_test_engine();
1134        engine.register(MixedWorkflow).unwrap();
1135
1136        let run = engine
1137            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1138            .await
1139            .unwrap();
1140
1141        assert_eq!(run.status.state, RunStatus::Completed);
1142
1143        let steps = engine.store().list_steps(run.id).await.unwrap();
1144
1145        assert_eq!(steps.len(), 2);
1146        assert_eq!(steps[0].kind, StepKind::Shell);
1147        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1148        assert_eq!(steps[0].position, 0);
1149        assert_eq!(steps[1].position, 1);
1150    }
1151
1152    // -----------------------------------------------------------------------
1153    // Approval + resume tests
1154    // -----------------------------------------------------------------------
1155
1156    use crate::config::ApprovalConfig;
1157
1158    struct SingleApprovalWorkflow;
1159
1160    impl WorkflowHandler for SingleApprovalWorkflow {
1161        fn name(&self) -> &str {
1162            "single-approval"
1163        }
1164
1165        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1166            Box::pin(async move {
1167                ctx.shell("build", ShellConfig::new("echo built")).await?;
1168                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1169                ctx.shell("deploy", ShellConfig::new("echo deployed"))
1170                    .await?;
1171                Ok(())
1172            })
1173        }
1174    }
1175
1176    struct DoubleApprovalWorkflow;
1177
1178    impl WorkflowHandler for DoubleApprovalWorkflow {
1179        fn name(&self) -> &str {
1180            "double-approval"
1181        }
1182
1183        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1184            Box::pin(async move {
1185                ctx.shell("build", ShellConfig::new("echo built")).await?;
1186                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1187                    .await?;
1188                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1189                    .await?;
1190                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1191                    .await?;
1192                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1193                    .await?;
1194                Ok(())
1195            })
1196        }
1197    }
1198
1199    #[tokio::test]
1200    async fn approval_pauses_run() {
1201        let mut engine = create_test_engine();
1202        engine.register(SingleApprovalWorkflow).unwrap();
1203
1204        let run = engine
1205            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1206            .await
1207            .unwrap();
1208
1209        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1210
1211        let steps = engine.store().list_steps(run.id).await.unwrap();
1212        assert_eq!(steps.len(), 2); // build + approval gate
1213        assert_eq!(steps[0].kind, StepKind::Shell);
1214        assert_eq!(steps[0].status.state, StepStatus::Completed);
1215        assert_eq!(steps[1].kind, StepKind::Approval);
1216        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1217    }
1218
1219    #[tokio::test]
1220    async fn approval_resume_completes_run() {
1221        let mut engine = create_test_engine();
1222        engine.register(SingleApprovalWorkflow).unwrap();
1223
1224        // First execution: pauses at approval
1225        let run = engine
1226            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1227            .await
1228            .unwrap();
1229        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1230
1231        // Simulate approval: transition to Running
1232        engine
1233            .store()
1234            .update_run_status(run.id, RunStatus::Running)
1235            .await
1236            .unwrap();
1237
1238        // Resume: replays build, skips approval, executes deploy
1239        let resumed = engine.resume_run(run.id).await.unwrap();
1240        assert_eq!(resumed.status.state, RunStatus::Completed);
1241
1242        let steps = engine.store().list_steps(run.id).await.unwrap();
1243        assert_eq!(steps.len(), 3); // build + approval + deploy
1244        assert_eq!(steps[0].name, "build");
1245        assert_eq!(steps[0].status.state, StepStatus::Completed);
1246        assert_eq!(steps[1].name, "gate");
1247        assert_eq!(steps[1].kind, StepKind::Approval);
1248        assert_eq!(steps[1].status.state, StepStatus::Completed);
1249        assert_eq!(steps[2].name, "deploy");
1250        assert_eq!(steps[2].status.state, StepStatus::Completed);
1251    }
1252
1253    #[tokio::test]
1254    async fn double_approval_two_resumes() {
1255        let mut engine = create_test_engine();
1256        engine.register(DoubleApprovalWorkflow).unwrap();
1257
1258        // First execution: pauses at staging-gate
1259        let run = engine
1260            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1261            .await
1262            .unwrap();
1263        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1264
1265        let steps = engine.store().list_steps(run.id).await.unwrap();
1266        assert_eq!(steps.len(), 2); // build + staging-gate
1267
1268        // First approval
1269        engine
1270            .store()
1271            .update_run_status(run.id, RunStatus::Running)
1272            .await
1273            .unwrap();
1274
1275        let resumed = engine.resume_run(run.id).await.unwrap();
1276        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1277
1278        let steps = engine.store().list_steps(run.id).await.unwrap();
1279        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1280
1281        // Second approval
1282        engine
1283            .store()
1284            .update_run_status(run.id, RunStatus::Running)
1285            .await
1286            .unwrap();
1287
1288        let final_run = engine.resume_run(run.id).await.unwrap();
1289        assert_eq!(final_run.status.state, RunStatus::Completed);
1290
1291        let steps = engine.store().list_steps(run.id).await.unwrap();
1292        assert_eq!(steps.len(), 5);
1293        assert_eq!(steps[0].name, "build");
1294        assert_eq!(steps[1].name, "staging-gate");
1295        assert_eq!(steps[2].name, "deploy-staging");
1296        assert_eq!(steps[3].name, "prod-gate");
1297        assert_eq!(steps[4].name, "deploy-prod");
1298
1299        for step in &steps {
1300            assert_eq!(step.status.state, StepStatus::Completed);
1301        }
1302    }
1303}