Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33/// The workflow orchestration engine.
34///
35/// Holds references to the store, agent provider, and a registry of
36/// [`WorkflowHandler`]s.
37///
38/// # Examples
39///
40/// ```no_run
41/// use std::sync::Arc;
42/// use ironflow_engine::engine::Engine;
43/// use ironflow_engine::config::ShellConfig;
44/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
45/// use ironflow_engine::context::WorkflowContext;
46/// use ironflow_store::memory::InMemoryStore;
47/// use ironflow_store::models::TriggerKind;
48/// use ironflow_core::providers::claude::ClaudeCodeProvider;
49/// use serde_json::json;
50///
51/// struct CiWorkflow;
52/// impl WorkflowHandler for CiWorkflow {
53///     fn name(&self) -> &str { "ci" }
54///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
55///         Box::pin(async move {
56///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
57///             Ok(())
58///         })
59///     }
60/// }
61///
62/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
63/// let store = Arc::new(InMemoryStore::new());
64/// let provider = Arc::new(ClaudeCodeProvider::new());
65/// let mut engine = Engine::new(store, provider);
66/// engine.register(CiWorkflow)?;
67///
68/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
69/// println!("Run {} completed with status {:?}", run.id, run.status);
70/// # Ok(())
71/// # }
72/// ```
73pub struct Engine {
74    store: Arc<dyn RunStore>,
75    provider: Arc<dyn AgentProvider>,
76    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77    event_publisher: EventPublisher,
78}
79
80impl Engine {
81    /// Create a new engine with the given store and agent provider.
82    ///
83    /// # Examples
84    ///
85    /// ```no_run
86    /// use std::sync::Arc;
87    /// use ironflow_engine::engine::Engine;
88    /// use ironflow_store::memory::InMemoryStore;
89    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
90    ///
91    /// let engine = Engine::new(
92    ///     Arc::new(InMemoryStore::new()),
93    ///     Arc::new(ClaudeCodeProvider::new()),
94    /// );
95    /// ```
96    pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97        Self {
98            store,
99            provider,
100            handlers: HashMap::new(),
101            event_publisher: EventPublisher::new(),
102        }
103    }
104
105    /// Returns a reference to the backing store.
106    pub fn store(&self) -> &Arc<dyn RunStore> {
107        &self.store
108    }
109
110    /// Returns a reference to the agent provider.
111    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
112        &self.provider
113    }
114
115    /// Build a [`WorkflowContext`] with access to the handler registry.
116    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
117        let handlers = self.handlers.clone();
118        let resolver: crate::context::HandlerResolver =
119            Arc::new(move |name: &str| handlers.get(name).cloned());
120        WorkflowContext::with_handler_resolver(
121            run_id,
122            self.store.clone(),
123            self.provider.clone(),
124            resolver,
125        )
126    }
127
128    // -----------------------------------------------------------------------
129    // Handler registration
130    // -----------------------------------------------------------------------
131
132    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
133    ///
134    /// The handler is looked up by [`WorkflowHandler::name`] when executing
135    /// or enqueuing.
136    ///
137    /// # Errors
138    ///
139    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
140    /// name is already registered.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// use std::sync::Arc;
146    /// use ironflow_engine::engine::Engine;
147    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
148    /// use ironflow_engine::context::WorkflowContext;
149    /// use ironflow_engine::config::ShellConfig;
150    /// use ironflow_store::memory::InMemoryStore;
151    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
152    ///
153    /// struct MyWorkflow;
154    /// impl WorkflowHandler for MyWorkflow {
155    ///     fn name(&self) -> &str { "my-workflow" }
156    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
157    ///         Box::pin(async move {
158    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
159    ///             Ok(())
160    ///         })
161    ///     }
162    /// }
163    ///
164    /// let mut engine = Engine::new(
165    ///     Arc::new(InMemoryStore::new()),
166    ///     Arc::new(ClaudeCodeProvider::new()),
167    /// );
168    /// engine.register(MyWorkflow)?;
169    /// # Ok::<(), ironflow_engine::error::EngineError>(())
170    /// ```
171    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
172        let name = handler.name().to_string();
173        if self.handlers.contains_key(&name) {
174            return Err(EngineError::InvalidWorkflow(format!(
175                "handler '{}' already registered",
176                name
177            )));
178        }
179        self.handlers.insert(name, Arc::new(handler));
180        Ok(())
181    }
182
183    /// Register a pre-boxed workflow handler.
184    ///
185    /// # Errors
186    ///
187    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
188    /// name is already registered.
189    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
190        let name = handler.name().to_string();
191        if self.handlers.contains_key(&name) {
192            return Err(EngineError::InvalidWorkflow(format!(
193                "handler '{}' already registered",
194                name
195            )));
196        }
197        self.handlers.insert(name, Arc::from(handler));
198        Ok(())
199    }
200
201    /// Get a registered handler by name.
202    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
203        self.handlers.get(name)
204    }
205
206    /// List registered handler names.
207    pub fn handler_names(&self) -> Vec<&str> {
208        self.handlers.keys().map(|s| s.as_str()).collect()
209    }
210
211    /// Get detailed info about a registered workflow handler.
212    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
213        self.handlers.get(name).map(|h| h.describe())
214    }
215
216    /// Register an event subscriber for domain events.
217    ///
218    /// The subscriber is called only for events whose type is in
219    /// `event_types`. Pass [`Event::ALL`] to receive every event.
220    ///
221    /// # Examples
222    ///
223    /// ```no_run
224    /// use ironflow_engine::engine::Engine;
225    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
226    /// use ironflow_store::memory::InMemoryStore;
227    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
228    /// use std::sync::Arc;
229    ///
230    /// let mut engine = Engine::new(
231    ///     Arc::new(InMemoryStore::new()),
232    ///     Arc::new(ClaudeCodeProvider::new()),
233    /// );
234    ///
235    /// engine.subscribe(
236    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
237    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
238    /// );
239    /// ```
240    pub fn subscribe(
241        &mut self,
242        subscriber: impl EventSubscriber + 'static,
243        event_types: &[&'static str],
244    ) {
245        self.event_publisher.subscribe(subscriber, event_types);
246    }
247
248    /// Returns a reference to the event publisher.
249    ///
250    /// Useful for publishing events from outside the engine (e.g. auth
251    /// routes in the API layer).
252    pub fn event_publisher(&self) -> &EventPublisher {
253        &self.event_publisher
254    }
255
256    // -----------------------------------------------------------------------
257    // Dynamic workflow execution (WorkflowHandler)
258    // -----------------------------------------------------------------------
259
260    /// Execute a registered handler inline.
261    ///
262    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
263    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
264    ///
265    /// # Errors
266    ///
267    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
268    /// with that name. Returns [`EngineError`] if execution fails.
269    ///
270    /// # Examples
271    ///
272    /// ```no_run
273    /// use std::sync::Arc;
274    /// use ironflow_engine::engine::Engine;
275    /// use ironflow_store::memory::InMemoryStore;
276    /// use ironflow_store::models::TriggerKind;
277    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
278    /// use serde_json::json;
279    ///
280    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
281    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
282    /// # Ok(())
283    /// # }
284    /// ```
285    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
286    pub async fn run_handler(
287        &self,
288        handler_name: &str,
289        trigger: TriggerKind,
290        payload: Value,
291    ) -> Result<Run, EngineError> {
292        let handler = self
293            .handlers
294            .get(handler_name)
295            .ok_or_else(|| {
296                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
297            })?
298            .clone();
299
300        let run = self
301            .store
302            .create_run(NewRun {
303                workflow_name: handler_name.to_string(),
304                trigger,
305                payload,
306                max_retries: 0,
307            })
308            .await?;
309
310        let run_id = run.id;
311        info!(run_id = %run_id, "run created");
312
313        self.store
314            .update_run_status(run_id, RunStatus::Running)
315            .await?;
316
317        #[cfg(feature = "prometheus")]
318        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
319
320        let run_start = Instant::now();
321        let mut ctx = self.build_context(run_id);
322
323        let result = handler.execute(&mut ctx).await;
324        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
325            .await
326    }
327
328    /// Enqueue a handler-based workflow for worker execution.
329    ///
330    /// The workflow name is stored in the run. The worker looks up the
331    /// handler by name when executing.
332    ///
333    /// # Errors
334    ///
335    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
336    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
337    pub async fn enqueue_handler(
338        &self,
339        handler_name: &str,
340        trigger: TriggerKind,
341        payload: Value,
342        max_retries: u32,
343    ) -> Result<Run, EngineError> {
344        if !self.handlers.contains_key(handler_name) {
345            return Err(EngineError::InvalidWorkflow(format!(
346                "no handler registered: {handler_name}"
347            )));
348        }
349
350        let run = self
351            .store
352            .create_run(NewRun {
353                workflow_name: handler_name.to_string(),
354                trigger,
355                payload,
356                max_retries,
357            })
358            .await?;
359
360        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
361        Ok(run)
362    }
363
364    /// Execute a handler-based run (used by the worker after pick_next_pending).
365    ///
366    /// Looks up the handler by the run's `workflow_name` and executes it
367    /// with a fresh [`WorkflowContext`].
368    ///
369    /// # Errors
370    ///
371    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
372    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
373    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
374        let run = self
375            .store
376            .get_run(run_id)
377            .await?
378            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
379
380        let handler = self
381            .handlers
382            .get(&run.workflow_name)
383            .ok_or_else(|| {
384                EngineError::InvalidWorkflow(format!(
385                    "no handler registered: {}",
386                    run.workflow_name
387                ))
388            })?
389            .clone();
390
391        #[cfg(feature = "prometheus")]
392        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
393
394        let run_start = Instant::now();
395        let mut ctx = self.build_context(run_id);
396
397        let result = handler.execute(&mut ctx).await;
398        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
399            .await
400    }
401
402    /// Execute a run by its ID (used by the worker after pick_next_pending).
403    ///
404    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
405    ///
406    /// # Errors
407    ///
408    /// Returns [`EngineError`] if the run is not found or execution fails.
409    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
410    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
411        self.execute_handler_run(run_id).await
412    }
413
414    /// Resume a run after human approval.
415    ///
416    /// Re-executes the handler with step replay: completed steps return
417    /// cached output, approved approval steps are skipped, and execution
418    /// continues from the first unexecuted step.
419    ///
420    /// Supports multiple approval gates -- each resume replays all prior
421    /// steps and stops at the next approval (or completes the run).
422    ///
423    /// # Errors
424    ///
425    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
426    /// Returns [`EngineError`] if execution fails or hits another approval.
427    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
428    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
429        let run = self
430            .store
431            .get_run(run_id)
432            .await?
433            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
434
435        let handler = self
436            .handlers
437            .get(&run.workflow_name)
438            .ok_or_else(|| {
439                EngineError::InvalidWorkflow(format!(
440                    "no handler registered: {}",
441                    run.workflow_name
442                ))
443            })?
444            .clone();
445
446        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
447
448        let run_start = Instant::now();
449        let mut ctx = self.build_context(run_id);
450        ctx.load_replay_steps().await?;
451
452        let result = handler.execute(&mut ctx).await;
453        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
454            .await
455    }
456
457    /// Finalize a run with the given result and context.
458    ///
459    /// On success: updates run to Completed with cost, duration, and completed_at.
460    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
461    /// Always: fetches and returns the final Run.
462    async fn finalize_run(
463        &self,
464        run_id: Uuid,
465        workflow_name: &str,
466        result: Result<(), EngineError>,
467        ctx: &WorkflowContext,
468        run_start: Instant,
469    ) -> Result<Run, EngineError> {
470        let total_duration = run_start.elapsed().as_millis() as u64;
471        let completed_at = Utc::now();
472
473        let final_status;
474        let final_run;
475
476        match result {
477            Ok(()) => {
478                final_status = RunStatus::Completed;
479                final_run = self
480                    .store
481                    .update_run_returning(
482                        run_id,
483                        RunUpdate {
484                            status: Some(RunStatus::Completed),
485                            cost_usd: Some(ctx.total_cost_usd()),
486                            duration_ms: Some(total_duration),
487                            completed_at: Some(completed_at),
488                            ..RunUpdate::default()
489                        },
490                    )
491                    .await?;
492
493                info!(
494                    run_id = %run_id,
495                    cost_usd = %ctx.total_cost_usd(),
496                    duration_ms = total_duration,
497                    "run completed"
498                );
499            }
500            Err(EngineError::ApprovalRequired {
501                run_id: approval_run_id,
502                step_id,
503                ref message,
504            }) => {
505                final_status = RunStatus::AwaitingApproval;
506                final_run = self
507                    .store
508                    .update_run_returning(
509                        run_id,
510                        RunUpdate {
511                            status: Some(RunStatus::AwaitingApproval),
512                            cost_usd: Some(ctx.total_cost_usd()),
513                            duration_ms: Some(total_duration),
514                            ..RunUpdate::default()
515                        },
516                    )
517                    .await?;
518
519                info!(
520                    run_id = %approval_run_id,
521                    step_id = %step_id,
522                    message = %message,
523                    "run awaiting approval"
524                );
525            }
526            Err(err) => {
527                final_status = RunStatus::Failed;
528                if let Err(store_err) = self
529                    .store
530                    .update_run(
531                        run_id,
532                        RunUpdate {
533                            status: Some(RunStatus::Failed),
534                            error: Some(err.to_string()),
535                            cost_usd: Some(ctx.total_cost_usd()),
536                            duration_ms: Some(total_duration),
537                            completed_at: Some(completed_at),
538                            ..RunUpdate::default()
539                        },
540                    )
541                    .await
542                {
543                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
544                }
545
546                error!(run_id = %run_id, error = %err, "run failed");
547
548                self.publish_run_status_changed(
549                    workflow_name,
550                    run_id,
551                    final_status,
552                    Some(err.to_string()),
553                    ctx,
554                    total_duration,
555                );
556
557                #[cfg(feature = "prometheus")]
558                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
559
560                return Err(err);
561            }
562        }
563
564        self.publish_run_status_changed(
565            workflow_name,
566            run_id,
567            final_status,
568            None,
569            ctx,
570            total_duration,
571        );
572
573        #[cfg(feature = "prometheus")]
574        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
575
576        Ok(final_run)
577    }
578
579    /// Emit Prometheus metrics for a completed run.
580    #[cfg(feature = "prometheus")]
581    fn emit_run_metrics(
582        &self,
583        workflow_name: &str,
584        status: RunStatus,
585        duration_ms: u64,
586        ctx: &WorkflowContext,
587    ) {
588        let status_str = status.to_string();
589        let wf = workflow_name.to_string();
590
591        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
592        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
593            .record(duration_ms as f64 / 1000.0);
594        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
595            ctx.total_cost_usd()
596                .to_string()
597                .parse::<f64>()
598                .unwrap_or(0.0),
599        );
600        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
601    }
602
603    /// Publish a run status changed event to all registered subscribers.
604    ///
605    /// `from` is always `Running` because `finalize_run` is only called
606    /// from a running state.
607    fn publish_run_status_changed(
608        &self,
609        workflow_name: &str,
610        run_id: Uuid,
611        to: RunStatus,
612        error: Option<String>,
613        ctx: &WorkflowContext,
614        duration_ms: u64,
615    ) {
616        self.event_publisher.publish(Event::RunStatusChanged {
617            run_id,
618            workflow_name: workflow_name.to_string(),
619            from: RunStatus::Running,
620            to,
621            error,
622            cost_usd: ctx.total_cost_usd(),
623            duration_ms,
624            at: Utc::now(),
625        });
626    }
627}
628
629impl fmt::Debug for Engine {
630    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
631        f.debug_struct("Engine")
632            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
633            .finish_non_exhaustive()
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use crate::config::ShellConfig;
641    use crate::handler::{HandlerFuture, WorkflowHandler};
642    use ironflow_core::providers::claude::ClaudeCodeProvider;
643    use ironflow_core::providers::record_replay::RecordReplayProvider;
644    use ironflow_store::memory::InMemoryStore;
645    use ironflow_store::models::StepStatus;
646    use serde_json::json;
647
648    // Test handler that echoes a message via shell
649    struct EchoWorkflow;
650
651    impl WorkflowHandler for EchoWorkflow {
652        fn name(&self) -> &str {
653            "echo-workflow"
654        }
655
656        fn describe(&self) -> WorkflowInfo {
657            WorkflowInfo {
658                description: "A simple workflow that echoes hello".to_string(),
659                source_code: None,
660                sub_workflows: Vec::new(),
661            }
662        }
663
664        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
665            Box::pin(async move {
666                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
667                Ok(())
668            })
669        }
670    }
671
672    // Test handler that fails
673    struct FailingWorkflow;
674
675    impl WorkflowHandler for FailingWorkflow {
676        fn name(&self) -> &str {
677            "failing-workflow"
678        }
679
680        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
681            Box::pin(async move {
682                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
683                Ok(())
684            })
685        }
686    }
687
688    fn create_test_engine() -> Engine {
689        let store = Arc::new(InMemoryStore::new());
690        let inner = ClaudeCodeProvider::new();
691        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
692            inner,
693            "/tmp/ironflow-fixtures",
694        ));
695        Engine::new(store, provider)
696    }
697
698    #[test]
699    fn engine_new_creates_instance() {
700        let engine = create_test_engine();
701        assert_eq!(engine.handler_names().len(), 0);
702    }
703
704    #[test]
705    fn engine_register_handler() {
706        let mut engine = create_test_engine();
707        let result = engine.register(EchoWorkflow);
708        assert!(result.is_ok());
709        assert_eq!(engine.handler_names().len(), 1);
710        assert!(engine.handler_names().contains(&"echo-workflow"));
711    }
712
713    #[test]
714    fn engine_register_duplicate_returns_error() {
715        let mut engine = create_test_engine();
716        engine.register(EchoWorkflow).unwrap();
717        let result = engine.register(EchoWorkflow);
718        assert!(result.is_err());
719    }
720
721    #[test]
722    fn engine_get_handler_found() {
723        let mut engine = create_test_engine();
724        engine.register(EchoWorkflow).unwrap();
725        let handler = engine.get_handler("echo-workflow");
726        assert!(handler.is_some());
727    }
728
729    #[test]
730    fn engine_get_handler_not_found() {
731        let engine = create_test_engine();
732        let handler = engine.get_handler("nonexistent");
733        assert!(handler.is_none());
734    }
735
736    #[test]
737    fn engine_handler_names_lists_all() {
738        let mut engine = create_test_engine();
739        engine.register(EchoWorkflow).unwrap();
740        engine.register(FailingWorkflow).unwrap();
741        let names = engine.handler_names();
742        assert_eq!(names.len(), 2);
743        assert!(names.contains(&"echo-workflow"));
744        assert!(names.contains(&"failing-workflow"));
745    }
746
747    #[test]
748    fn engine_handler_info_returns_description() {
749        let mut engine = create_test_engine();
750        engine.register(EchoWorkflow).unwrap();
751        let info = engine.handler_info("echo-workflow");
752        assert!(info.is_some());
753        let info = info.unwrap();
754        assert_eq!(info.description, "A simple workflow that echoes hello");
755    }
756
757    #[tokio::test]
758    async fn engine_unknown_workflow_returns_error() {
759        let engine = create_test_engine();
760        let result = engine
761            .run_handler("unknown", TriggerKind::Manual, json!({}))
762            .await;
763        assert!(result.is_err());
764        match result {
765            Err(EngineError::InvalidWorkflow(msg)) => {
766                assert!(msg.contains("no handler registered"));
767            }
768            _ => panic!("expected InvalidWorkflow error"),
769        }
770    }
771
772    #[tokio::test]
773    async fn engine_enqueue_handler_creates_pending_run() {
774        let mut engine = create_test_engine();
775        engine.register(EchoWorkflow).unwrap();
776
777        let run = engine
778            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
779            .await
780            .unwrap();
781        assert_eq!(run.status.state, RunStatus::Pending);
782        assert_eq!(run.workflow_name, "echo-workflow");
783    }
784
785    #[tokio::test]
786    async fn engine_register_boxed() {
787        let mut engine = create_test_engine();
788        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
789        let result = engine.register_boxed(handler);
790        assert!(result.is_ok());
791        assert_eq!(engine.handler_names().len(), 1);
792    }
793
794    #[tokio::test]
795    async fn engine_store_and_provider_accessors() {
796        let store = Arc::new(InMemoryStore::new());
797        let inner = ClaudeCodeProvider::new();
798        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
799            inner,
800            "/tmp/ironflow-fixtures",
801        ));
802        let engine = Engine::new(store.clone(), provider.clone());
803
804        // Verify accessors return references
805        let _ = engine.store();
806        let _ = engine.provider();
807    }
808
809    // -----------------------------------------------------------------------
810    // Operation trait tests
811    // -----------------------------------------------------------------------
812
813    use crate::operation::Operation;
814    use ironflow_store::models::StepKind;
815    use std::future::Future;
816    use std::pin::Pin;
817
818    struct FakeGitlabOp {
819        project_id: u64,
820        title: String,
821    }
822
823    impl Operation for FakeGitlabOp {
824        fn kind(&self) -> &str {
825            "gitlab"
826        }
827
828        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
829            Box::pin(async move {
830                Ok(json!({
831                    "issue_id": 42,
832                    "project_id": self.project_id,
833                    "title": self.title,
834                }))
835            })
836        }
837
838        fn input(&self) -> Option<Value> {
839            Some(json!({
840                "project_id": self.project_id,
841                "title": self.title,
842            }))
843        }
844    }
845
846    struct FailingOp;
847
848    impl Operation for FailingOp {
849        fn kind(&self) -> &str {
850            "broken-service"
851        }
852
853        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
854            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
855        }
856    }
857
858    struct OperationWorkflow;
859
860    impl WorkflowHandler for OperationWorkflow {
861        fn name(&self) -> &str {
862            "operation-workflow"
863        }
864
865        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
866            Box::pin(async move {
867                let op = FakeGitlabOp {
868                    project_id: 123,
869                    title: "Bug report".to_string(),
870                };
871                ctx.operation("create-issue", &op).await?;
872                Ok(())
873            })
874        }
875    }
876
877    struct FailingOperationWorkflow;
878
879    impl WorkflowHandler for FailingOperationWorkflow {
880        fn name(&self) -> &str {
881            "failing-operation-workflow"
882        }
883
884        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
885            Box::pin(async move {
886                ctx.operation("broken-call", &FailingOp).await?;
887                Ok(())
888            })
889        }
890    }
891
892    struct MixedWorkflow;
893
894    impl WorkflowHandler for MixedWorkflow {
895        fn name(&self) -> &str {
896            "mixed-workflow"
897        }
898
899        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
900            Box::pin(async move {
901                ctx.shell("build", ShellConfig::new("echo built")).await?;
902                let op = FakeGitlabOp {
903                    project_id: 456,
904                    title: "Deploy done".to_string(),
905                };
906                let result = ctx.operation("notify-gitlab", &op).await?;
907                assert_eq!(result.output["issue_id"], 42);
908                Ok(())
909            })
910        }
911    }
912
913    #[tokio::test]
914    async fn operation_step_happy_path() {
915        let mut engine = create_test_engine();
916        engine.register(OperationWorkflow).unwrap();
917
918        let run = engine
919            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
920            .await
921            .unwrap();
922
923        assert_eq!(run.status.state, RunStatus::Completed);
924
925        let steps = engine.store().list_steps(run.id).await.unwrap();
926
927        assert_eq!(steps.len(), 1);
928        assert_eq!(steps[0].name, "create-issue");
929        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
930        assert_eq!(
931            steps[0].status.state,
932            ironflow_store::models::StepStatus::Completed
933        );
934
935        let output = steps[0].output.as_ref().unwrap();
936        assert_eq!(output["issue_id"], 42);
937        assert_eq!(output["project_id"], 123);
938
939        let input = steps[0].input.as_ref().unwrap();
940        assert_eq!(input["project_id"], 123);
941        assert_eq!(input["title"], "Bug report");
942    }
943
944    #[tokio::test]
945    async fn operation_step_failure_marks_run_failed() {
946        let mut engine = create_test_engine();
947        engine.register(FailingOperationWorkflow).unwrap();
948
949        let result = engine
950            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
951            .await;
952
953        assert!(result.is_err());
954    }
955
956    #[tokio::test]
957    async fn operation_mixed_with_shell_steps() {
958        let mut engine = create_test_engine();
959        engine.register(MixedWorkflow).unwrap();
960
961        let run = engine
962            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
963            .await
964            .unwrap();
965
966        assert_eq!(run.status.state, RunStatus::Completed);
967
968        let steps = engine.store().list_steps(run.id).await.unwrap();
969
970        assert_eq!(steps.len(), 2);
971        assert_eq!(steps[0].kind, StepKind::Shell);
972        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
973        assert_eq!(steps[0].position, 0);
974        assert_eq!(steps[1].position, 1);
975    }
976
977    // -----------------------------------------------------------------------
978    // Approval + resume tests
979    // -----------------------------------------------------------------------
980
981    use crate::config::ApprovalConfig;
982
983    struct SingleApprovalWorkflow;
984
985    impl WorkflowHandler for SingleApprovalWorkflow {
986        fn name(&self) -> &str {
987            "single-approval"
988        }
989
990        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
991            Box::pin(async move {
992                ctx.shell("build", ShellConfig::new("echo built")).await?;
993                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
994                ctx.shell("deploy", ShellConfig::new("echo deployed"))
995                    .await?;
996                Ok(())
997            })
998        }
999    }
1000
1001    struct DoubleApprovalWorkflow;
1002
1003    impl WorkflowHandler for DoubleApprovalWorkflow {
1004        fn name(&self) -> &str {
1005            "double-approval"
1006        }
1007
1008        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1009            Box::pin(async move {
1010                ctx.shell("build", ShellConfig::new("echo built")).await?;
1011                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1012                    .await?;
1013                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1014                    .await?;
1015                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1016                    .await?;
1017                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1018                    .await?;
1019                Ok(())
1020            })
1021        }
1022    }
1023
1024    #[tokio::test]
1025    async fn approval_pauses_run() {
1026        let mut engine = create_test_engine();
1027        engine.register(SingleApprovalWorkflow).unwrap();
1028
1029        let run = engine
1030            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1031            .await
1032            .unwrap();
1033
1034        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1035
1036        let steps = engine.store().list_steps(run.id).await.unwrap();
1037        assert_eq!(steps.len(), 2); // build + approval gate
1038        assert_eq!(steps[0].kind, StepKind::Shell);
1039        assert_eq!(steps[0].status.state, StepStatus::Completed);
1040        assert_eq!(steps[1].kind, StepKind::Approval);
1041        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1042    }
1043
1044    #[tokio::test]
1045    async fn approval_resume_completes_run() {
1046        let mut engine = create_test_engine();
1047        engine.register(SingleApprovalWorkflow).unwrap();
1048
1049        // First execution: pauses at approval
1050        let run = engine
1051            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1052            .await
1053            .unwrap();
1054        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1055
1056        // Simulate approval: transition to Running
1057        engine
1058            .store()
1059            .update_run_status(run.id, RunStatus::Running)
1060            .await
1061            .unwrap();
1062
1063        // Resume: replays build, skips approval, executes deploy
1064        let resumed = engine.resume_run(run.id).await.unwrap();
1065        assert_eq!(resumed.status.state, RunStatus::Completed);
1066
1067        let steps = engine.store().list_steps(run.id).await.unwrap();
1068        assert_eq!(steps.len(), 3); // build + approval + deploy
1069        assert_eq!(steps[0].name, "build");
1070        assert_eq!(steps[0].status.state, StepStatus::Completed);
1071        assert_eq!(steps[1].name, "gate");
1072        assert_eq!(steps[1].kind, StepKind::Approval);
1073        assert_eq!(steps[1].status.state, StepStatus::Completed);
1074        assert_eq!(steps[2].name, "deploy");
1075        assert_eq!(steps[2].status.state, StepStatus::Completed);
1076    }
1077
1078    #[tokio::test]
1079    async fn double_approval_two_resumes() {
1080        let mut engine = create_test_engine();
1081        engine.register(DoubleApprovalWorkflow).unwrap();
1082
1083        // First execution: pauses at staging-gate
1084        let run = engine
1085            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1086            .await
1087            .unwrap();
1088        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1089
1090        let steps = engine.store().list_steps(run.id).await.unwrap();
1091        assert_eq!(steps.len(), 2); // build + staging-gate
1092
1093        // First approval
1094        engine
1095            .store()
1096            .update_run_status(run.id, RunStatus::Running)
1097            .await
1098            .unwrap();
1099
1100        let resumed = engine.resume_run(run.id).await.unwrap();
1101        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1102
1103        let steps = engine.store().list_steps(run.id).await.unwrap();
1104        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1105
1106        // Second approval
1107        engine
1108            .store()
1109            .update_run_status(run.id, RunStatus::Running)
1110            .await
1111            .unwrap();
1112
1113        let final_run = engine.resume_run(run.id).await.unwrap();
1114        assert_eq!(final_run.status.state, RunStatus::Completed);
1115
1116        let steps = engine.store().list_steps(run.id).await.unwrap();
1117        assert_eq!(steps.len(), 5);
1118        assert_eq!(steps[0].name, "build");
1119        assert_eq!(steps[1].name, "staging-gate");
1120        assert_eq!(steps[2].name, "deploy-staging");
1121        assert_eq!(steps[3].name, "prod-gate");
1122        assert_eq!(steps[4].name, "deploy-prod");
1123
1124        for step in &steps {
1125            assert_eq!(step.status.state, StepStatus::Completed);
1126        }
1127    }
1128}