Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33/// The workflow orchestration engine.
34///
35/// Holds references to the store, agent provider, and a registry of
36/// [`WorkflowHandler`]s.
37///
38/// # Examples
39///
40/// ```no_run
41/// use std::sync::Arc;
42/// use ironflow_engine::engine::Engine;
43/// use ironflow_engine::config::ShellConfig;
44/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
45/// use ironflow_engine::context::WorkflowContext;
46/// use ironflow_store::memory::InMemoryStore;
47/// use ironflow_store::models::TriggerKind;
48/// use ironflow_core::providers::claude::ClaudeCodeProvider;
49/// use serde_json::json;
50///
51/// struct CiWorkflow;
52/// impl WorkflowHandler for CiWorkflow {
53///     fn name(&self) -> &str { "ci" }
54///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
55///         Box::pin(async move {
56///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
57///             Ok(())
58///         })
59///     }
60/// }
61///
62/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
63/// let store = Arc::new(InMemoryStore::new());
64/// let provider = Arc::new(ClaudeCodeProvider::new());
65/// let mut engine = Engine::new(store, provider);
66/// engine.register(CiWorkflow)?;
67///
68/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
69/// println!("Run {} completed with status {:?}", run.id, run.status);
70/// # Ok(())
71/// # }
72/// ```
73pub struct Engine {
74    store: Arc<dyn RunStore>,
75    provider: Arc<dyn AgentProvider>,
76    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77    event_publisher: EventPublisher,
78}
79
80impl Engine {
81    /// Create a new engine with the given store and agent provider.
82    ///
83    /// # Examples
84    ///
85    /// ```no_run
86    /// use std::sync::Arc;
87    /// use ironflow_engine::engine::Engine;
88    /// use ironflow_store::memory::InMemoryStore;
89    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
90    ///
91    /// let engine = Engine::new(
92    ///     Arc::new(InMemoryStore::new()),
93    ///     Arc::new(ClaudeCodeProvider::new()),
94    /// );
95    /// ```
96    pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97        Self {
98            store,
99            provider,
100            handlers: HashMap::new(),
101            event_publisher: EventPublisher::new(),
102        }
103    }
104
105    /// Returns a reference to the backing store.
106    pub fn store(&self) -> &Arc<dyn RunStore> {
107        &self.store
108    }
109
110    /// Returns a reference to the agent provider.
111    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
112        &self.provider
113    }
114
115    /// Build a [`WorkflowContext`] with access to the handler registry.
116    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
117        let handlers = self.handlers.clone();
118        let resolver: crate::context::HandlerResolver =
119            Arc::new(move |name: &str| handlers.get(name).cloned());
120        WorkflowContext::with_handler_resolver(
121            run_id,
122            self.store.clone(),
123            self.provider.clone(),
124            resolver,
125        )
126    }
127
128    // -----------------------------------------------------------------------
129    // Handler registration
130    // -----------------------------------------------------------------------
131
132    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
133    ///
134    /// The handler is looked up by [`WorkflowHandler::name`] when executing
135    /// or enqueuing.
136    ///
137    /// # Errors
138    ///
139    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
140    /// name is already registered.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// use std::sync::Arc;
146    /// use ironflow_engine::engine::Engine;
147    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
148    /// use ironflow_engine::context::WorkflowContext;
149    /// use ironflow_engine::config::ShellConfig;
150    /// use ironflow_store::memory::InMemoryStore;
151    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
152    ///
153    /// struct MyWorkflow;
154    /// impl WorkflowHandler for MyWorkflow {
155    ///     fn name(&self) -> &str { "my-workflow" }
156    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
157    ///         Box::pin(async move {
158    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
159    ///             Ok(())
160    ///         })
161    ///     }
162    /// }
163    ///
164    /// let mut engine = Engine::new(
165    ///     Arc::new(InMemoryStore::new()),
166    ///     Arc::new(ClaudeCodeProvider::new()),
167    /// );
168    /// engine.register(MyWorkflow)?;
169    /// # Ok::<(), ironflow_engine::error::EngineError>(())
170    /// ```
171    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
172        let name = handler.name().to_string();
173        if self.handlers.contains_key(&name) {
174            return Err(EngineError::InvalidWorkflow(format!(
175                "handler '{}' already registered",
176                name
177            )));
178        }
179        self.handlers.insert(name, Arc::new(handler));
180        Ok(())
181    }
182
183    /// Register a pre-boxed workflow handler.
184    ///
185    /// # Errors
186    ///
187    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
188    /// name is already registered.
189    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
190        let name = handler.name().to_string();
191        if self.handlers.contains_key(&name) {
192            return Err(EngineError::InvalidWorkflow(format!(
193                "handler '{}' already registered",
194                name
195            )));
196        }
197        self.handlers.insert(name, Arc::from(handler));
198        Ok(())
199    }
200
201    /// Get a registered handler by name.
202    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
203        self.handlers.get(name)
204    }
205
206    /// List registered handler names.
207    pub fn handler_names(&self) -> Vec<&str> {
208        self.handlers.keys().map(|s| s.as_str()).collect()
209    }
210
211    /// Get detailed info about a registered workflow handler.
212    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
213        self.handlers.get(name).map(|h| h.describe())
214    }
215
216    /// Register an event subscriber for domain events.
217    ///
218    /// The subscriber is called only for events whose type is in
219    /// `event_types`. Pass [`Event::ALL`] to receive every event.
220    ///
221    /// # Examples
222    ///
223    /// ```no_run
224    /// use ironflow_engine::engine::Engine;
225    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
226    /// use ironflow_store::memory::InMemoryStore;
227    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
228    /// use std::sync::Arc;
229    ///
230    /// let mut engine = Engine::new(
231    ///     Arc::new(InMemoryStore::new()),
232    ///     Arc::new(ClaudeCodeProvider::new()),
233    /// );
234    ///
235    /// engine.subscribe(
236    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
237    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
238    /// );
239    /// ```
240    pub fn subscribe(
241        &mut self,
242        subscriber: impl EventSubscriber + 'static,
243        event_types: &[&'static str],
244    ) {
245        self.event_publisher.subscribe(subscriber, event_types);
246    }
247
248    /// Returns a reference to the event publisher.
249    ///
250    /// Useful for publishing events from outside the engine (e.g. auth
251    /// routes in the API layer).
252    pub fn event_publisher(&self) -> &EventPublisher {
253        &self.event_publisher
254    }
255
256    // -----------------------------------------------------------------------
257    // Dynamic workflow execution (WorkflowHandler)
258    // -----------------------------------------------------------------------
259
260    /// Execute a registered handler inline.
261    ///
262    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
263    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
264    ///
265    /// # Errors
266    ///
267    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
268    /// with that name. Returns [`EngineError`] if execution fails.
269    ///
270    /// # Examples
271    ///
272    /// ```no_run
273    /// use std::sync::Arc;
274    /// use ironflow_engine::engine::Engine;
275    /// use ironflow_store::memory::InMemoryStore;
276    /// use ironflow_store::models::TriggerKind;
277    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
278    /// use serde_json::json;
279    ///
280    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
281    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
282    /// # Ok(())
283    /// # }
284    /// ```
285    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
286    pub async fn run_handler(
287        &self,
288        handler_name: &str,
289        trigger: TriggerKind,
290        payload: Value,
291    ) -> Result<Run, EngineError> {
292        let handler = self
293            .handlers
294            .get(handler_name)
295            .ok_or_else(|| {
296                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
297            })?
298            .clone();
299
300        let run = self
301            .store
302            .create_run(NewRun {
303                workflow_name: handler_name.to_string(),
304                trigger,
305                payload,
306                max_retries: 0,
307            })
308            .await?;
309
310        let run_id = run.id;
311        info!(run_id = %run_id, "run created");
312
313        self.store
314            .update_run_status(run_id, RunStatus::Running)
315            .await?;
316
317        #[cfg(feature = "prometheus")]
318        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
319
320        let run_start = Instant::now();
321        let mut ctx = self.build_context(run_id);
322
323        let result = handler.execute(&mut ctx).await;
324        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
325            .await
326    }
327
328    /// Enqueue a handler-based workflow for worker execution.
329    ///
330    /// The workflow name is stored in the run. The worker looks up the
331    /// handler by name when executing.
332    ///
333    /// # Errors
334    ///
335    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
336    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
337    pub async fn enqueue_handler(
338        &self,
339        handler_name: &str,
340        trigger: TriggerKind,
341        payload: Value,
342        max_retries: u32,
343    ) -> Result<Run, EngineError> {
344        if !self.handlers.contains_key(handler_name) {
345            return Err(EngineError::InvalidWorkflow(format!(
346                "no handler registered: {handler_name}"
347            )));
348        }
349
350        let run = self
351            .store
352            .create_run(NewRun {
353                workflow_name: handler_name.to_string(),
354                trigger,
355                payload,
356                max_retries,
357            })
358            .await?;
359
360        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
361        Ok(run)
362    }
363
364    /// Execute a handler-based run (used by the worker after pick_next_pending).
365    ///
366    /// Looks up the handler by the run's `workflow_name` and executes it
367    /// with a fresh [`WorkflowContext`].
368    ///
369    /// # Errors
370    ///
371    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
372    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
373    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
374        let run = self
375            .store
376            .get_run(run_id)
377            .await?
378            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
379
380        let handler = self
381            .handlers
382            .get(&run.workflow_name)
383            .ok_or_else(|| {
384                EngineError::InvalidWorkflow(format!(
385                    "no handler registered: {}",
386                    run.workflow_name
387                ))
388            })?
389            .clone();
390
391        #[cfg(feature = "prometheus")]
392        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
393
394        let run_start = Instant::now();
395        let mut ctx = self.build_context(run_id);
396
397        let result = handler.execute(&mut ctx).await;
398        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
399            .await
400    }
401
402    /// Execute a run by its ID (used by the worker after pick_next_pending).
403    ///
404    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
405    ///
406    /// # Errors
407    ///
408    /// Returns [`EngineError`] if the run is not found or execution fails.
409    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
410    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
411        self.execute_handler_run(run_id).await
412    }
413
414    /// Resume a run after human approval.
415    ///
416    /// Re-executes the handler with step replay: completed steps return
417    /// cached output, approved approval steps are skipped, and execution
418    /// continues from the first unexecuted step.
419    ///
420    /// Supports multiple approval gates -- each resume replays all prior
421    /// steps and stops at the next approval (or completes the run).
422    ///
423    /// # Errors
424    ///
425    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
426    /// Returns [`EngineError`] if execution fails or hits another approval.
427    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
428    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
429        let run = self
430            .store
431            .get_run(run_id)
432            .await?
433            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
434
435        let handler = self
436            .handlers
437            .get(&run.workflow_name)
438            .ok_or_else(|| {
439                EngineError::InvalidWorkflow(format!(
440                    "no handler registered: {}",
441                    run.workflow_name
442                ))
443            })?
444            .clone();
445
446        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
447
448        let run_start = Instant::now();
449        let mut ctx = self.build_context(run_id);
450        ctx.load_replay_steps().await?;
451
452        let result = handler.execute(&mut ctx).await;
453        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
454            .await
455    }
456
457    /// Finalize a run with the given result and context.
458    ///
459    /// On success: updates run to Completed with cost, duration, and completed_at.
460    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
461    /// Always: fetches and returns the final Run.
462    async fn finalize_run(
463        &self,
464        run_id: Uuid,
465        workflow_name: &str,
466        result: Result<(), EngineError>,
467        ctx: &WorkflowContext,
468        run_start: Instant,
469    ) -> Result<Run, EngineError> {
470        let total_duration = run_start.elapsed().as_millis() as u64;
471        let completed_at = Utc::now();
472
473        let final_status;
474        let final_run;
475
476        match result {
477            Ok(()) => {
478                final_status = RunStatus::Completed;
479                final_run = self
480                    .store
481                    .update_run_returning(
482                        run_id,
483                        RunUpdate {
484                            status: Some(RunStatus::Completed),
485                            cost_usd: Some(ctx.total_cost_usd()),
486                            duration_ms: Some(total_duration),
487                            completed_at: Some(completed_at),
488                            ..RunUpdate::default()
489                        },
490                    )
491                    .await?;
492
493                info!(
494                    run_id = %run_id,
495                    cost_usd = %ctx.total_cost_usd(),
496                    duration_ms = total_duration,
497                    "run completed"
498                );
499            }
500            Err(EngineError::ApprovalRequired {
501                run_id: approval_run_id,
502                step_id,
503                ref message,
504            }) => {
505                final_status = RunStatus::AwaitingApproval;
506                final_run = self
507                    .store
508                    .update_run_returning(
509                        run_id,
510                        RunUpdate {
511                            status: Some(RunStatus::AwaitingApproval),
512                            cost_usd: Some(ctx.total_cost_usd()),
513                            duration_ms: Some(total_duration),
514                            ..RunUpdate::default()
515                        },
516                    )
517                    .await?;
518
519                info!(
520                    run_id = %approval_run_id,
521                    step_id = %step_id,
522                    message = %message,
523                    "run awaiting approval"
524                );
525            }
526            Err(err) => {
527                final_status = RunStatus::Failed;
528                if let Err(store_err) = self
529                    .store
530                    .update_run(
531                        run_id,
532                        RunUpdate {
533                            status: Some(RunStatus::Failed),
534                            error: Some(err.to_string()),
535                            cost_usd: Some(ctx.total_cost_usd()),
536                            duration_ms: Some(total_duration),
537                            completed_at: Some(completed_at),
538                            ..RunUpdate::default()
539                        },
540                    )
541                    .await
542                {
543                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
544                }
545
546                error!(run_id = %run_id, error = %err, "run failed");
547
548                self.publish_run_status_changed(
549                    workflow_name,
550                    run_id,
551                    final_status,
552                    Some(err.to_string()),
553                    ctx,
554                    total_duration,
555                );
556
557                #[cfg(feature = "prometheus")]
558                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
559
560                return Err(err);
561            }
562        }
563
564        self.publish_run_status_changed(
565            workflow_name,
566            run_id,
567            final_status,
568            None,
569            ctx,
570            total_duration,
571        );
572
573        #[cfg(feature = "prometheus")]
574        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
575
576        Ok(final_run)
577    }
578
579    /// Emit Prometheus metrics for a completed run.
580    #[cfg(feature = "prometheus")]
581    fn emit_run_metrics(
582        &self,
583        workflow_name: &str,
584        status: RunStatus,
585        duration_ms: u64,
586        ctx: &WorkflowContext,
587    ) {
588        let status_str = status.to_string();
589        let wf = workflow_name.to_string();
590
591        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
592        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
593            .record(duration_ms as f64 / 1000.0);
594        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
595            ctx.total_cost_usd()
596                .to_string()
597                .parse::<f64>()
598                .unwrap_or(0.0),
599        );
600        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
601    }
602
603    /// Publish a run status changed event to all registered subscribers.
604    ///
605    /// `from` is always `Running` because `finalize_run` is only called
606    /// from a running state.
607    fn publish_run_status_changed(
608        &self,
609        workflow_name: &str,
610        run_id: Uuid,
611        to: RunStatus,
612        error: Option<String>,
613        ctx: &WorkflowContext,
614        duration_ms: u64,
615    ) {
616        let now = Utc::now();
617        let cost_usd = ctx.total_cost_usd();
618        let wf = workflow_name.to_string();
619
620        self.event_publisher.publish(Event::RunStatusChanged {
621            run_id,
622            workflow_name: wf.clone(),
623            from: RunStatus::Running,
624            to,
625            error: error.clone(),
626            cost_usd,
627            duration_ms,
628            at: now,
629        });
630
631        if to == RunStatus::Failed {
632            self.event_publisher.publish(Event::RunFailed {
633                run_id,
634                workflow_name: wf,
635                error,
636                cost_usd,
637                duration_ms,
638                at: now,
639            });
640        }
641    }
642}
643
644impl fmt::Debug for Engine {
645    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
646        f.debug_struct("Engine")
647            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
648            .finish_non_exhaustive()
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655    use crate::config::ShellConfig;
656    use crate::handler::{HandlerFuture, WorkflowHandler};
657    use ironflow_core::providers::claude::ClaudeCodeProvider;
658    use ironflow_core::providers::record_replay::RecordReplayProvider;
659    use ironflow_store::memory::InMemoryStore;
660    use ironflow_store::models::StepStatus;
661    use serde_json::json;
662
663    // Test handler that echoes a message via shell
664    struct EchoWorkflow;
665
666    impl WorkflowHandler for EchoWorkflow {
667        fn name(&self) -> &str {
668            "echo-workflow"
669        }
670
671        fn describe(&self) -> WorkflowInfo {
672            WorkflowInfo {
673                description: "A simple workflow that echoes hello".to_string(),
674                source_code: None,
675                sub_workflows: Vec::new(),
676            }
677        }
678
679        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
680            Box::pin(async move {
681                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
682                Ok(())
683            })
684        }
685    }
686
687    // Test handler that fails
688    struct FailingWorkflow;
689
690    impl WorkflowHandler for FailingWorkflow {
691        fn name(&self) -> &str {
692            "failing-workflow"
693        }
694
695        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
696            Box::pin(async move {
697                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
698                Ok(())
699            })
700        }
701    }
702
703    fn create_test_engine() -> Engine {
704        let store = Arc::new(InMemoryStore::new());
705        let inner = ClaudeCodeProvider::new();
706        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
707            inner,
708            "/tmp/ironflow-fixtures",
709        ));
710        Engine::new(store, provider)
711    }
712
713    #[test]
714    fn engine_new_creates_instance() {
715        let engine = create_test_engine();
716        assert_eq!(engine.handler_names().len(), 0);
717    }
718
719    #[test]
720    fn engine_register_handler() {
721        let mut engine = create_test_engine();
722        let result = engine.register(EchoWorkflow);
723        assert!(result.is_ok());
724        assert_eq!(engine.handler_names().len(), 1);
725        assert!(engine.handler_names().contains(&"echo-workflow"));
726    }
727
728    #[test]
729    fn engine_register_duplicate_returns_error() {
730        let mut engine = create_test_engine();
731        engine.register(EchoWorkflow).unwrap();
732        let result = engine.register(EchoWorkflow);
733        assert!(result.is_err());
734    }
735
736    #[test]
737    fn engine_get_handler_found() {
738        let mut engine = create_test_engine();
739        engine.register(EchoWorkflow).unwrap();
740        let handler = engine.get_handler("echo-workflow");
741        assert!(handler.is_some());
742    }
743
744    #[test]
745    fn engine_get_handler_not_found() {
746        let engine = create_test_engine();
747        let handler = engine.get_handler("nonexistent");
748        assert!(handler.is_none());
749    }
750
751    #[test]
752    fn engine_handler_names_lists_all() {
753        let mut engine = create_test_engine();
754        engine.register(EchoWorkflow).unwrap();
755        engine.register(FailingWorkflow).unwrap();
756        let names = engine.handler_names();
757        assert_eq!(names.len(), 2);
758        assert!(names.contains(&"echo-workflow"));
759        assert!(names.contains(&"failing-workflow"));
760    }
761
762    #[test]
763    fn engine_handler_info_returns_description() {
764        let mut engine = create_test_engine();
765        engine.register(EchoWorkflow).unwrap();
766        let info = engine.handler_info("echo-workflow");
767        assert!(info.is_some());
768        let info = info.unwrap();
769        assert_eq!(info.description, "A simple workflow that echoes hello");
770    }
771
772    #[tokio::test]
773    async fn engine_unknown_workflow_returns_error() {
774        let engine = create_test_engine();
775        let result = engine
776            .run_handler("unknown", TriggerKind::Manual, json!({}))
777            .await;
778        assert!(result.is_err());
779        match result {
780            Err(EngineError::InvalidWorkflow(msg)) => {
781                assert!(msg.contains("no handler registered"));
782            }
783            _ => panic!("expected InvalidWorkflow error"),
784        }
785    }
786
787    #[tokio::test]
788    async fn engine_enqueue_handler_creates_pending_run() {
789        let mut engine = create_test_engine();
790        engine.register(EchoWorkflow).unwrap();
791
792        let run = engine
793            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
794            .await
795            .unwrap();
796        assert_eq!(run.status.state, RunStatus::Pending);
797        assert_eq!(run.workflow_name, "echo-workflow");
798    }
799
800    #[tokio::test]
801    async fn engine_register_boxed() {
802        let mut engine = create_test_engine();
803        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
804        let result = engine.register_boxed(handler);
805        assert!(result.is_ok());
806        assert_eq!(engine.handler_names().len(), 1);
807    }
808
809    #[tokio::test]
810    async fn engine_store_and_provider_accessors() {
811        let store = Arc::new(InMemoryStore::new());
812        let inner = ClaudeCodeProvider::new();
813        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
814            inner,
815            "/tmp/ironflow-fixtures",
816        ));
817        let engine = Engine::new(store.clone(), provider.clone());
818
819        // Verify accessors return references
820        let _ = engine.store();
821        let _ = engine.provider();
822    }
823
824    // -----------------------------------------------------------------------
825    // Operation trait tests
826    // -----------------------------------------------------------------------
827
828    use crate::operation::Operation;
829    use ironflow_store::models::StepKind;
830    use std::future::Future;
831    use std::pin::Pin;
832
833    struct FakeGitlabOp {
834        project_id: u64,
835        title: String,
836    }
837
838    impl Operation for FakeGitlabOp {
839        fn kind(&self) -> &str {
840            "gitlab"
841        }
842
843        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
844            Box::pin(async move {
845                Ok(json!({
846                    "issue_id": 42,
847                    "project_id": self.project_id,
848                    "title": self.title,
849                }))
850            })
851        }
852
853        fn input(&self) -> Option<Value> {
854            Some(json!({
855                "project_id": self.project_id,
856                "title": self.title,
857            }))
858        }
859    }
860
861    struct FailingOp;
862
863    impl Operation for FailingOp {
864        fn kind(&self) -> &str {
865            "broken-service"
866        }
867
868        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
869            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
870        }
871    }
872
873    struct OperationWorkflow;
874
875    impl WorkflowHandler for OperationWorkflow {
876        fn name(&self) -> &str {
877            "operation-workflow"
878        }
879
880        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
881            Box::pin(async move {
882                let op = FakeGitlabOp {
883                    project_id: 123,
884                    title: "Bug report".to_string(),
885                };
886                ctx.operation("create-issue", &op).await?;
887                Ok(())
888            })
889        }
890    }
891
892    struct FailingOperationWorkflow;
893
894    impl WorkflowHandler for FailingOperationWorkflow {
895        fn name(&self) -> &str {
896            "failing-operation-workflow"
897        }
898
899        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
900            Box::pin(async move {
901                ctx.operation("broken-call", &FailingOp).await?;
902                Ok(())
903            })
904        }
905    }
906
907    struct MixedWorkflow;
908
909    impl WorkflowHandler for MixedWorkflow {
910        fn name(&self) -> &str {
911            "mixed-workflow"
912        }
913
914        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
915            Box::pin(async move {
916                ctx.shell("build", ShellConfig::new("echo built")).await?;
917                let op = FakeGitlabOp {
918                    project_id: 456,
919                    title: "Deploy done".to_string(),
920                };
921                let result = ctx.operation("notify-gitlab", &op).await?;
922                assert_eq!(result.output["issue_id"], 42);
923                Ok(())
924            })
925        }
926    }
927
928    #[tokio::test]
929    async fn operation_step_happy_path() {
930        let mut engine = create_test_engine();
931        engine.register(OperationWorkflow).unwrap();
932
933        let run = engine
934            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
935            .await
936            .unwrap();
937
938        assert_eq!(run.status.state, RunStatus::Completed);
939
940        let steps = engine.store().list_steps(run.id).await.unwrap();
941
942        assert_eq!(steps.len(), 1);
943        assert_eq!(steps[0].name, "create-issue");
944        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
945        assert_eq!(
946            steps[0].status.state,
947            ironflow_store::models::StepStatus::Completed
948        );
949
950        let output = steps[0].output.as_ref().unwrap();
951        assert_eq!(output["issue_id"], 42);
952        assert_eq!(output["project_id"], 123);
953
954        let input = steps[0].input.as_ref().unwrap();
955        assert_eq!(input["project_id"], 123);
956        assert_eq!(input["title"], "Bug report");
957    }
958
959    #[tokio::test]
960    async fn operation_step_failure_marks_run_failed() {
961        let mut engine = create_test_engine();
962        engine.register(FailingOperationWorkflow).unwrap();
963
964        let result = engine
965            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
966            .await;
967
968        assert!(result.is_err());
969    }
970
971    #[tokio::test]
972    async fn operation_mixed_with_shell_steps() {
973        let mut engine = create_test_engine();
974        engine.register(MixedWorkflow).unwrap();
975
976        let run = engine
977            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
978            .await
979            .unwrap();
980
981        assert_eq!(run.status.state, RunStatus::Completed);
982
983        let steps = engine.store().list_steps(run.id).await.unwrap();
984
985        assert_eq!(steps.len(), 2);
986        assert_eq!(steps[0].kind, StepKind::Shell);
987        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
988        assert_eq!(steps[0].position, 0);
989        assert_eq!(steps[1].position, 1);
990    }
991
992    // -----------------------------------------------------------------------
993    // Approval + resume tests
994    // -----------------------------------------------------------------------
995
996    use crate::config::ApprovalConfig;
997
998    struct SingleApprovalWorkflow;
999
1000    impl WorkflowHandler for SingleApprovalWorkflow {
1001        fn name(&self) -> &str {
1002            "single-approval"
1003        }
1004
1005        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1006            Box::pin(async move {
1007                ctx.shell("build", ShellConfig::new("echo built")).await?;
1008                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1009                ctx.shell("deploy", ShellConfig::new("echo deployed"))
1010                    .await?;
1011                Ok(())
1012            })
1013        }
1014    }
1015
1016    struct DoubleApprovalWorkflow;
1017
1018    impl WorkflowHandler for DoubleApprovalWorkflow {
1019        fn name(&self) -> &str {
1020            "double-approval"
1021        }
1022
1023        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1024            Box::pin(async move {
1025                ctx.shell("build", ShellConfig::new("echo built")).await?;
1026                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1027                    .await?;
1028                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1029                    .await?;
1030                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1031                    .await?;
1032                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1033                    .await?;
1034                Ok(())
1035            })
1036        }
1037    }
1038
1039    #[tokio::test]
1040    async fn approval_pauses_run() {
1041        let mut engine = create_test_engine();
1042        engine.register(SingleApprovalWorkflow).unwrap();
1043
1044        let run = engine
1045            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1046            .await
1047            .unwrap();
1048
1049        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1050
1051        let steps = engine.store().list_steps(run.id).await.unwrap();
1052        assert_eq!(steps.len(), 2); // build + approval gate
1053        assert_eq!(steps[0].kind, StepKind::Shell);
1054        assert_eq!(steps[0].status.state, StepStatus::Completed);
1055        assert_eq!(steps[1].kind, StepKind::Approval);
1056        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1057    }
1058
1059    #[tokio::test]
1060    async fn approval_resume_completes_run() {
1061        let mut engine = create_test_engine();
1062        engine.register(SingleApprovalWorkflow).unwrap();
1063
1064        // First execution: pauses at approval
1065        let run = engine
1066            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1067            .await
1068            .unwrap();
1069        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1070
1071        // Simulate approval: transition to Running
1072        engine
1073            .store()
1074            .update_run_status(run.id, RunStatus::Running)
1075            .await
1076            .unwrap();
1077
1078        // Resume: replays build, skips approval, executes deploy
1079        let resumed = engine.resume_run(run.id).await.unwrap();
1080        assert_eq!(resumed.status.state, RunStatus::Completed);
1081
1082        let steps = engine.store().list_steps(run.id).await.unwrap();
1083        assert_eq!(steps.len(), 3); // build + approval + deploy
1084        assert_eq!(steps[0].name, "build");
1085        assert_eq!(steps[0].status.state, StepStatus::Completed);
1086        assert_eq!(steps[1].name, "gate");
1087        assert_eq!(steps[1].kind, StepKind::Approval);
1088        assert_eq!(steps[1].status.state, StepStatus::Completed);
1089        assert_eq!(steps[2].name, "deploy");
1090        assert_eq!(steps[2].status.state, StepStatus::Completed);
1091    }
1092
1093    #[tokio::test]
1094    async fn double_approval_two_resumes() {
1095        let mut engine = create_test_engine();
1096        engine.register(DoubleApprovalWorkflow).unwrap();
1097
1098        // First execution: pauses at staging-gate
1099        let run = engine
1100            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1101            .await
1102            .unwrap();
1103        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1104
1105        let steps = engine.store().list_steps(run.id).await.unwrap();
1106        assert_eq!(steps.len(), 2); // build + staging-gate
1107
1108        // First approval
1109        engine
1110            .store()
1111            .update_run_status(run.id, RunStatus::Running)
1112            .await
1113            .unwrap();
1114
1115        let resumed = engine.resume_run(run.id).await.unwrap();
1116        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1117
1118        let steps = engine.store().list_steps(run.id).await.unwrap();
1119        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1120
1121        // Second approval
1122        engine
1123            .store()
1124            .update_run_status(run.id, RunStatus::Running)
1125            .await
1126            .unwrap();
1127
1128        let final_run = engine.resume_run(run.id).await.unwrap();
1129        assert_eq!(final_run.status.state, RunStatus::Completed);
1130
1131        let steps = engine.store().list_steps(run.id).await.unwrap();
1132        assert_eq!(steps.len(), 5);
1133        assert_eq!(steps[0].name, "build");
1134        assert_eq!(steps[1].name, "staging-gate");
1135        assert_eq!(steps[2].name, "deploy-staging");
1136        assert_eq!(steps[3].name, "prod-gate");
1137        assert_eq!(steps[4].name, "deploy-prod");
1138
1139        for step in &steps {
1140            assert_eq!(step.status.state, StepStatus::Completed);
1141        }
1142    }
1143}