Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33/// The workflow orchestration engine.
34///
35/// Holds references to the store, agent provider, and a registry of
36/// [`WorkflowHandler`]s.
37///
38/// # Examples
39///
40/// ```no_run
41/// use std::sync::Arc;
42/// use ironflow_engine::engine::Engine;
43/// use ironflow_engine::config::ShellConfig;
44/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
45/// use ironflow_engine::context::WorkflowContext;
46/// use ironflow_store::memory::InMemoryStore;
47/// use ironflow_store::models::TriggerKind;
48/// use ironflow_core::providers::claude::ClaudeCodeProvider;
49/// use serde_json::json;
50///
51/// struct CiWorkflow;
52/// impl WorkflowHandler for CiWorkflow {
53///     fn name(&self) -> &str { "ci" }
54///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
55///         Box::pin(async move {
56///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
57///             Ok(())
58///         })
59///     }
60/// }
61///
62/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
63/// let store = Arc::new(InMemoryStore::new());
64/// let provider = Arc::new(ClaudeCodeProvider::new());
65/// let mut engine = Engine::new(store, provider);
66/// engine.register(CiWorkflow)?;
67///
68/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
69/// println!("Run {} completed with status {:?}", run.id, run.status);
70/// # Ok(())
71/// # }
72/// ```
73pub struct Engine {
74    store: Arc<dyn RunStore>,
75    provider: Arc<dyn AgentProvider>,
76    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77    event_publisher: EventPublisher,
78}
79
80impl Engine {
81    /// Create a new engine with the given store and agent provider.
82    ///
83    /// # Examples
84    ///
85    /// ```no_run
86    /// use std::sync::Arc;
87    /// use ironflow_engine::engine::Engine;
88    /// use ironflow_store::memory::InMemoryStore;
89    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
90    ///
91    /// let engine = Engine::new(
92    ///     Arc::new(InMemoryStore::new()),
93    ///     Arc::new(ClaudeCodeProvider::new()),
94    /// );
95    /// ```
96    pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97        Self {
98            store,
99            provider,
100            handlers: HashMap::new(),
101            event_publisher: EventPublisher::new(),
102        }
103    }
104
105    /// Returns a reference to the backing store.
106    pub fn store(&self) -> &Arc<dyn RunStore> {
107        &self.store
108    }
109
110    /// Returns a reference to the agent provider.
111    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
112        &self.provider
113    }
114
115    /// Build a [`WorkflowContext`] with access to the handler registry.
116    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
117        let handlers = self.handlers.clone();
118        let resolver: crate::context::HandlerResolver =
119            Arc::new(move |name: &str| handlers.get(name).cloned());
120        WorkflowContext::with_handler_resolver(
121            run_id,
122            self.store.clone(),
123            self.provider.clone(),
124            resolver,
125        )
126    }
127
128    // -----------------------------------------------------------------------
129    // Handler registration
130    // -----------------------------------------------------------------------
131
132    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
133    ///
134    /// The handler is looked up by [`WorkflowHandler::name`] when executing
135    /// or enqueuing.
136    ///
137    /// # Errors
138    ///
139    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
140    /// name is already registered.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// use std::sync::Arc;
146    /// use ironflow_engine::engine::Engine;
147    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
148    /// use ironflow_engine::context::WorkflowContext;
149    /// use ironflow_engine::config::ShellConfig;
150    /// use ironflow_store::memory::InMemoryStore;
151    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
152    ///
153    /// struct MyWorkflow;
154    /// impl WorkflowHandler for MyWorkflow {
155    ///     fn name(&self) -> &str { "my-workflow" }
156    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
157    ///         Box::pin(async move {
158    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
159    ///             Ok(())
160    ///         })
161    ///     }
162    /// }
163    ///
164    /// let mut engine = Engine::new(
165    ///     Arc::new(InMemoryStore::new()),
166    ///     Arc::new(ClaudeCodeProvider::new()),
167    /// );
168    /// engine.register(MyWorkflow)?;
169    /// # Ok::<(), ironflow_engine::error::EngineError>(())
170    /// ```
171    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
172        let name = handler.name().to_string();
173        if self.handlers.contains_key(&name) {
174            return Err(EngineError::InvalidWorkflow(format!(
175                "handler '{}' already registered",
176                name
177            )));
178        }
179        self.handlers.insert(name, Arc::new(handler));
180        Ok(())
181    }
182
183    /// Register a pre-boxed workflow handler.
184    ///
185    /// # Errors
186    ///
187    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
188    /// name is already registered.
189    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
190        let name = handler.name().to_string();
191        if self.handlers.contains_key(&name) {
192            return Err(EngineError::InvalidWorkflow(format!(
193                "handler '{}' already registered",
194                name
195            )));
196        }
197        self.handlers.insert(name, Arc::from(handler));
198        Ok(())
199    }
200
201    /// Get a registered handler by name.
202    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
203        self.handlers.get(name)
204    }
205
206    /// List registered handler names.
207    pub fn handler_names(&self) -> Vec<&str> {
208        self.handlers.keys().map(|s| s.as_str()).collect()
209    }
210
211    /// Get detailed info about a registered workflow handler.
212    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
213        self.handlers.get(name).map(|h| h.describe())
214    }
215
216    /// Register an event subscriber for domain events.
217    ///
218    /// The subscriber is called only for events whose type is in
219    /// `event_types`. Pass [`Event::ALL`] to receive every event.
220    ///
221    /// # Examples
222    ///
223    /// ```no_run
224    /// use ironflow_engine::engine::Engine;
225    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
226    /// use ironflow_store::memory::InMemoryStore;
227    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
228    /// use std::sync::Arc;
229    ///
230    /// let mut engine = Engine::new(
231    ///     Arc::new(InMemoryStore::new()),
232    ///     Arc::new(ClaudeCodeProvider::new()),
233    /// );
234    ///
235    /// engine.subscribe(
236    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
237    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
238    /// );
239    /// ```
240    pub fn subscribe(
241        &mut self,
242        subscriber: impl EventSubscriber + 'static,
243        event_types: &[&'static str],
244    ) {
245        self.event_publisher.subscribe(subscriber, event_types);
246    }
247
248    /// Returns a reference to the event publisher.
249    ///
250    /// Useful for publishing events from outside the engine (e.g. auth
251    /// routes in the API layer).
252    pub fn event_publisher(&self) -> &EventPublisher {
253        &self.event_publisher
254    }
255
256    // -----------------------------------------------------------------------
257    // Dynamic workflow execution (WorkflowHandler)
258    // -----------------------------------------------------------------------
259
260    /// Execute a registered handler inline.
261    ///
262    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
263    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
264    ///
265    /// # Errors
266    ///
267    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
268    /// with that name. Returns [`EngineError`] if execution fails.
269    ///
270    /// # Examples
271    ///
272    /// ```no_run
273    /// use std::sync::Arc;
274    /// use ironflow_engine::engine::Engine;
275    /// use ironflow_store::memory::InMemoryStore;
276    /// use ironflow_store::models::TriggerKind;
277    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
278    /// use serde_json::json;
279    ///
280    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
281    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
282    /// # Ok(())
283    /// # }
284    /// ```
285    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
286    pub async fn run_handler(
287        &self,
288        handler_name: &str,
289        trigger: TriggerKind,
290        payload: Value,
291    ) -> Result<Run, EngineError> {
292        let handler = self
293            .handlers
294            .get(handler_name)
295            .ok_or_else(|| {
296                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
297            })?
298            .clone();
299
300        let run = self
301            .store
302            .create_run(NewRun {
303                workflow_name: handler_name.to_string(),
304                trigger,
305                payload,
306                max_retries: 0,
307            })
308            .await?;
309
310        let run_id = run.id;
311        info!(run_id = %run_id, "run created");
312
313        self.store
314            .update_run_status(run_id, RunStatus::Running)
315            .await?;
316
317        #[cfg(feature = "prometheus")]
318        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
319
320        let run_start = Instant::now();
321        let mut ctx = self.build_context(run_id);
322
323        let result = handler.execute(&mut ctx).await;
324        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
325            .await
326    }
327
328    /// Enqueue a handler-based workflow for worker execution.
329    ///
330    /// The workflow name is stored in the run. The worker looks up the
331    /// handler by name when executing.
332    ///
333    /// # Errors
334    ///
335    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
336    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
337    pub async fn enqueue_handler(
338        &self,
339        handler_name: &str,
340        trigger: TriggerKind,
341        payload: Value,
342        max_retries: u32,
343    ) -> Result<Run, EngineError> {
344        if !self.handlers.contains_key(handler_name) {
345            return Err(EngineError::InvalidWorkflow(format!(
346                "no handler registered: {handler_name}"
347            )));
348        }
349
350        let run = self
351            .store
352            .create_run(NewRun {
353                workflow_name: handler_name.to_string(),
354                trigger,
355                payload,
356                max_retries,
357            })
358            .await?;
359
360        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
361        Ok(run)
362    }
363
364    /// Execute a handler-based run (used by the worker after pick_next_pending).
365    ///
366    /// Looks up the handler by the run's `workflow_name` and executes it
367    /// with a fresh [`WorkflowContext`].
368    ///
369    /// # Errors
370    ///
371    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
372    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
373    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
374        let run = self
375            .store
376            .get_run(run_id)
377            .await?
378            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
379
380        let handler = self
381            .handlers
382            .get(&run.workflow_name)
383            .ok_or_else(|| {
384                EngineError::InvalidWorkflow(format!(
385                    "no handler registered: {}",
386                    run.workflow_name
387                ))
388            })?
389            .clone();
390
391        #[cfg(feature = "prometheus")]
392        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
393
394        let run_start = Instant::now();
395        let mut ctx = self.build_context(run_id);
396
397        let result = handler.execute(&mut ctx).await;
398        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
399            .await
400    }
401
402    /// Execute a run by its ID (used by the worker after pick_next_pending).
403    ///
404    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
405    ///
406    /// # Errors
407    ///
408    /// Returns [`EngineError`] if the run is not found or execution fails.
409    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
410    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
411        self.execute_handler_run(run_id).await
412    }
413
414    /// Resume a run after human approval.
415    ///
416    /// Re-executes the handler with step replay: completed steps return
417    /// cached output, approved approval steps are skipped, and execution
418    /// continues from the first unexecuted step.
419    ///
420    /// Supports multiple approval gates -- each resume replays all prior
421    /// steps and stops at the next approval (or completes the run).
422    ///
423    /// # Errors
424    ///
425    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
426    /// Returns [`EngineError`] if execution fails or hits another approval.
427    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
428    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
429        let run = self
430            .store
431            .get_run(run_id)
432            .await?
433            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
434
435        let handler = self
436            .handlers
437            .get(&run.workflow_name)
438            .ok_or_else(|| {
439                EngineError::InvalidWorkflow(format!(
440                    "no handler registered: {}",
441                    run.workflow_name
442                ))
443            })?
444            .clone();
445
446        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
447
448        let run_start = Instant::now();
449        let mut ctx = self.build_context(run_id);
450        ctx.load_replay_steps().await?;
451
452        let result = handler.execute(&mut ctx).await;
453        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
454            .await
455    }
456
457    /// Finalize a run with the given result and context.
458    ///
459    /// On success: updates run to Completed with cost, duration, and completed_at.
460    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
461    /// Always: fetches and returns the final Run.
462    ///
463    /// TODO: `get_run` at the end could be optimized by using an `update_run_returning`
464    /// method if the store supports it.
465    async fn finalize_run(
466        &self,
467        run_id: Uuid,
468        workflow_name: &str,
469        result: Result<(), EngineError>,
470        ctx: &WorkflowContext,
471        run_start: Instant,
472    ) -> Result<Run, EngineError> {
473        let total_duration = run_start.elapsed().as_millis() as u64;
474        let completed_at = Utc::now();
475
476        let final_status;
477
478        match result {
479            Ok(()) => {
480                final_status = RunStatus::Completed;
481                self.store
482                    .update_run(
483                        run_id,
484                        RunUpdate {
485                            status: Some(RunStatus::Completed),
486                            cost_usd: Some(ctx.total_cost_usd()),
487                            duration_ms: Some(total_duration),
488                            completed_at: Some(completed_at),
489                            ..RunUpdate::default()
490                        },
491                    )
492                    .await?;
493
494                info!(
495                    run_id = %run_id,
496                    cost_usd = %ctx.total_cost_usd(),
497                    duration_ms = total_duration,
498                    "run completed"
499                );
500            }
501            Err(EngineError::ApprovalRequired {
502                run_id: approval_run_id,
503                step_id,
504                ref message,
505            }) => {
506                final_status = RunStatus::AwaitingApproval;
507                self.store
508                    .update_run(
509                        run_id,
510                        RunUpdate {
511                            status: Some(RunStatus::AwaitingApproval),
512                            cost_usd: Some(ctx.total_cost_usd()),
513                            duration_ms: Some(total_duration),
514                            ..RunUpdate::default()
515                        },
516                    )
517                    .await?;
518
519                info!(
520                    run_id = %approval_run_id,
521                    step_id = %step_id,
522                    message = %message,
523                    "run awaiting approval"
524                );
525            }
526            Err(err) => {
527                final_status = RunStatus::Failed;
528                if let Err(store_err) = self
529                    .store
530                    .update_run(
531                        run_id,
532                        RunUpdate {
533                            status: Some(RunStatus::Failed),
534                            error: Some(err.to_string()),
535                            cost_usd: Some(ctx.total_cost_usd()),
536                            duration_ms: Some(total_duration),
537                            completed_at: Some(completed_at),
538                            ..RunUpdate::default()
539                        },
540                    )
541                    .await
542                {
543                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
544                }
545
546                error!(run_id = %run_id, error = %err, "run failed");
547
548                self.publish_run_status_changed(
549                    workflow_name,
550                    run_id,
551                    final_status,
552                    Some(err.to_string()),
553                    ctx,
554                    total_duration,
555                );
556
557                #[cfg(feature = "prometheus")]
558                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
559
560                return Err(err);
561            }
562        }
563
564        self.publish_run_status_changed(
565            workflow_name,
566            run_id,
567            final_status,
568            None,
569            ctx,
570            total_duration,
571        );
572
573        #[cfg(feature = "prometheus")]
574        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
575
576        self.store
577            .get_run(run_id)
578            .await?
579            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))
580    }
581
582    /// Emit Prometheus metrics for a completed run.
583    #[cfg(feature = "prometheus")]
584    fn emit_run_metrics(
585        &self,
586        workflow_name: &str,
587        status: RunStatus,
588        duration_ms: u64,
589        ctx: &WorkflowContext,
590    ) {
591        let status_str = status.to_string();
592        let wf = workflow_name.to_string();
593
594        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
595        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
596            .record(duration_ms as f64 / 1000.0);
597        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
598            ctx.total_cost_usd()
599                .to_string()
600                .parse::<f64>()
601                .unwrap_or(0.0),
602        );
603        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
604    }
605
606    /// Publish a run status changed event to all registered subscribers.
607    ///
608    /// `from` is always `Running` because `finalize_run` is only called
609    /// from a running state.
610    fn publish_run_status_changed(
611        &self,
612        workflow_name: &str,
613        run_id: Uuid,
614        to: RunStatus,
615        error: Option<String>,
616        ctx: &WorkflowContext,
617        duration_ms: u64,
618    ) {
619        self.event_publisher.publish(Event::RunStatusChanged {
620            run_id,
621            workflow_name: workflow_name.to_string(),
622            from: RunStatus::Running,
623            to,
624            error,
625            cost_usd: ctx.total_cost_usd(),
626            duration_ms,
627            at: Utc::now(),
628        });
629    }
630}
631
632impl fmt::Debug for Engine {
633    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634        f.debug_struct("Engine")
635            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
636            .finish_non_exhaustive()
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    use crate::config::ShellConfig;
644    use crate::handler::{HandlerFuture, WorkflowHandler};
645    use ironflow_core::providers::claude::ClaudeCodeProvider;
646    use ironflow_core::providers::record_replay::RecordReplayProvider;
647    use ironflow_store::memory::InMemoryStore;
648    use ironflow_store::models::StepStatus;
649    use serde_json::json;
650
651    // Test handler that echoes a message via shell
652    struct EchoWorkflow;
653
654    impl WorkflowHandler for EchoWorkflow {
655        fn name(&self) -> &str {
656            "echo-workflow"
657        }
658
659        fn describe(&self) -> WorkflowInfo {
660            WorkflowInfo {
661                description: "A simple workflow that echoes hello".to_string(),
662                source_code: None,
663                sub_workflows: Vec::new(),
664            }
665        }
666
667        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
668            Box::pin(async move {
669                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
670                Ok(())
671            })
672        }
673    }
674
675    // Test handler that fails
676    struct FailingWorkflow;
677
678    impl WorkflowHandler for FailingWorkflow {
679        fn name(&self) -> &str {
680            "failing-workflow"
681        }
682
683        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
684            Box::pin(async move {
685                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
686                Ok(())
687            })
688        }
689    }
690
691    fn create_test_engine() -> Engine {
692        let store = Arc::new(InMemoryStore::new());
693        let inner = ClaudeCodeProvider::new();
694        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
695            inner,
696            "/tmp/ironflow-fixtures",
697        ));
698        Engine::new(store, provider)
699    }
700
701    #[test]
702    fn engine_new_creates_instance() {
703        let engine = create_test_engine();
704        assert_eq!(engine.handler_names().len(), 0);
705    }
706
707    #[test]
708    fn engine_register_handler() {
709        let mut engine = create_test_engine();
710        let result = engine.register(EchoWorkflow);
711        assert!(result.is_ok());
712        assert_eq!(engine.handler_names().len(), 1);
713        assert!(engine.handler_names().contains(&"echo-workflow"));
714    }
715
716    #[test]
717    fn engine_register_duplicate_returns_error() {
718        let mut engine = create_test_engine();
719        engine.register(EchoWorkflow).unwrap();
720        let result = engine.register(EchoWorkflow);
721        assert!(result.is_err());
722    }
723
724    #[test]
725    fn engine_get_handler_found() {
726        let mut engine = create_test_engine();
727        engine.register(EchoWorkflow).unwrap();
728        let handler = engine.get_handler("echo-workflow");
729        assert!(handler.is_some());
730    }
731
732    #[test]
733    fn engine_get_handler_not_found() {
734        let engine = create_test_engine();
735        let handler = engine.get_handler("nonexistent");
736        assert!(handler.is_none());
737    }
738
739    #[test]
740    fn engine_handler_names_lists_all() {
741        let mut engine = create_test_engine();
742        engine.register(EchoWorkflow).unwrap();
743        engine.register(FailingWorkflow).unwrap();
744        let names = engine.handler_names();
745        assert_eq!(names.len(), 2);
746        assert!(names.contains(&"echo-workflow"));
747        assert!(names.contains(&"failing-workflow"));
748    }
749
750    #[test]
751    fn engine_handler_info_returns_description() {
752        let mut engine = create_test_engine();
753        engine.register(EchoWorkflow).unwrap();
754        let info = engine.handler_info("echo-workflow");
755        assert!(info.is_some());
756        let info = info.unwrap();
757        assert_eq!(info.description, "A simple workflow that echoes hello");
758    }
759
760    #[tokio::test]
761    async fn engine_unknown_workflow_returns_error() {
762        let engine = create_test_engine();
763        let result = engine
764            .run_handler("unknown", TriggerKind::Manual, json!({}))
765            .await;
766        assert!(result.is_err());
767        match result {
768            Err(EngineError::InvalidWorkflow(msg)) => {
769                assert!(msg.contains("no handler registered"));
770            }
771            _ => panic!("expected InvalidWorkflow error"),
772        }
773    }
774
775    #[tokio::test]
776    async fn engine_enqueue_handler_creates_pending_run() {
777        let mut engine = create_test_engine();
778        engine.register(EchoWorkflow).unwrap();
779
780        let run = engine
781            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
782            .await
783            .unwrap();
784        assert_eq!(run.status.state, RunStatus::Pending);
785        assert_eq!(run.workflow_name, "echo-workflow");
786    }
787
788    #[tokio::test]
789    async fn engine_register_boxed() {
790        let mut engine = create_test_engine();
791        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
792        let result = engine.register_boxed(handler);
793        assert!(result.is_ok());
794        assert_eq!(engine.handler_names().len(), 1);
795    }
796
797    #[tokio::test]
798    async fn engine_store_and_provider_accessors() {
799        let store = Arc::new(InMemoryStore::new());
800        let inner = ClaudeCodeProvider::new();
801        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
802            inner,
803            "/tmp/ironflow-fixtures",
804        ));
805        let engine = Engine::new(store.clone(), provider.clone());
806
807        // Verify accessors return references
808        let _ = engine.store();
809        let _ = engine.provider();
810    }
811
812    // -----------------------------------------------------------------------
813    // Operation trait tests
814    // -----------------------------------------------------------------------
815
816    use crate::operation::Operation;
817    use ironflow_store::models::StepKind;
818    use std::future::Future;
819    use std::pin::Pin;
820
821    struct FakeGitlabOp {
822        project_id: u64,
823        title: String,
824    }
825
826    impl Operation for FakeGitlabOp {
827        fn kind(&self) -> &str {
828            "gitlab"
829        }
830
831        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
832            Box::pin(async move {
833                Ok(json!({
834                    "issue_id": 42,
835                    "project_id": self.project_id,
836                    "title": self.title,
837                }))
838            })
839        }
840
841        fn input(&self) -> Option<Value> {
842            Some(json!({
843                "project_id": self.project_id,
844                "title": self.title,
845            }))
846        }
847    }
848
849    struct FailingOp;
850
851    impl Operation for FailingOp {
852        fn kind(&self) -> &str {
853            "broken-service"
854        }
855
856        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
857            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
858        }
859    }
860
861    struct OperationWorkflow;
862
863    impl WorkflowHandler for OperationWorkflow {
864        fn name(&self) -> &str {
865            "operation-workflow"
866        }
867
868        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
869            Box::pin(async move {
870                let op = FakeGitlabOp {
871                    project_id: 123,
872                    title: "Bug report".to_string(),
873                };
874                ctx.operation("create-issue", &op).await?;
875                Ok(())
876            })
877        }
878    }
879
880    struct FailingOperationWorkflow;
881
882    impl WorkflowHandler for FailingOperationWorkflow {
883        fn name(&self) -> &str {
884            "failing-operation-workflow"
885        }
886
887        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
888            Box::pin(async move {
889                ctx.operation("broken-call", &FailingOp).await?;
890                Ok(())
891            })
892        }
893    }
894
895    struct MixedWorkflow;
896
897    impl WorkflowHandler for MixedWorkflow {
898        fn name(&self) -> &str {
899            "mixed-workflow"
900        }
901
902        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
903            Box::pin(async move {
904                ctx.shell("build", ShellConfig::new("echo built")).await?;
905                let op = FakeGitlabOp {
906                    project_id: 456,
907                    title: "Deploy done".to_string(),
908                };
909                let result = ctx.operation("notify-gitlab", &op).await?;
910                assert_eq!(result.output["issue_id"], 42);
911                Ok(())
912            })
913        }
914    }
915
916    #[tokio::test]
917    async fn operation_step_happy_path() {
918        let mut engine = create_test_engine();
919        engine.register(OperationWorkflow).unwrap();
920
921        let run = engine
922            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
923            .await
924            .unwrap();
925
926        assert_eq!(run.status.state, RunStatus::Completed);
927
928        let steps = engine.store().list_steps(run.id).await.unwrap();
929
930        assert_eq!(steps.len(), 1);
931        assert_eq!(steps[0].name, "create-issue");
932        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
933        assert_eq!(
934            steps[0].status.state,
935            ironflow_store::models::StepStatus::Completed
936        );
937
938        let output = steps[0].output.as_ref().unwrap();
939        assert_eq!(output["issue_id"], 42);
940        assert_eq!(output["project_id"], 123);
941
942        let input = steps[0].input.as_ref().unwrap();
943        assert_eq!(input["project_id"], 123);
944        assert_eq!(input["title"], "Bug report");
945    }
946
947    #[tokio::test]
948    async fn operation_step_failure_marks_run_failed() {
949        let mut engine = create_test_engine();
950        engine.register(FailingOperationWorkflow).unwrap();
951
952        let result = engine
953            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
954            .await;
955
956        assert!(result.is_err());
957    }
958
959    #[tokio::test]
960    async fn operation_mixed_with_shell_steps() {
961        let mut engine = create_test_engine();
962        engine.register(MixedWorkflow).unwrap();
963
964        let run = engine
965            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
966            .await
967            .unwrap();
968
969        assert_eq!(run.status.state, RunStatus::Completed);
970
971        let steps = engine.store().list_steps(run.id).await.unwrap();
972
973        assert_eq!(steps.len(), 2);
974        assert_eq!(steps[0].kind, StepKind::Shell);
975        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
976        assert_eq!(steps[0].position, 0);
977        assert_eq!(steps[1].position, 1);
978    }
979
980    // -----------------------------------------------------------------------
981    // Approval + resume tests
982    // -----------------------------------------------------------------------
983
984    use crate::config::ApprovalConfig;
985
986    struct SingleApprovalWorkflow;
987
988    impl WorkflowHandler for SingleApprovalWorkflow {
989        fn name(&self) -> &str {
990            "single-approval"
991        }
992
993        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
994            Box::pin(async move {
995                ctx.shell("build", ShellConfig::new("echo built")).await?;
996                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
997                ctx.shell("deploy", ShellConfig::new("echo deployed"))
998                    .await?;
999                Ok(())
1000            })
1001        }
1002    }
1003
1004    struct DoubleApprovalWorkflow;
1005
1006    impl WorkflowHandler for DoubleApprovalWorkflow {
1007        fn name(&self) -> &str {
1008            "double-approval"
1009        }
1010
1011        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1012            Box::pin(async move {
1013                ctx.shell("build", ShellConfig::new("echo built")).await?;
1014                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1015                    .await?;
1016                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1017                    .await?;
1018                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1019                    .await?;
1020                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1021                    .await?;
1022                Ok(())
1023            })
1024        }
1025    }
1026
1027    #[tokio::test]
1028    async fn approval_pauses_run() {
1029        let mut engine = create_test_engine();
1030        engine.register(SingleApprovalWorkflow).unwrap();
1031
1032        let run = engine
1033            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1034            .await
1035            .unwrap();
1036
1037        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1038
1039        let steps = engine.store().list_steps(run.id).await.unwrap();
1040        assert_eq!(steps.len(), 2); // build + approval gate
1041        assert_eq!(steps[0].kind, StepKind::Shell);
1042        assert_eq!(steps[0].status.state, StepStatus::Completed);
1043        assert_eq!(steps[1].kind, StepKind::Approval);
1044        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1045    }
1046
1047    #[tokio::test]
1048    async fn approval_resume_completes_run() {
1049        let mut engine = create_test_engine();
1050        engine.register(SingleApprovalWorkflow).unwrap();
1051
1052        // First execution: pauses at approval
1053        let run = engine
1054            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1055            .await
1056            .unwrap();
1057        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1058
1059        // Simulate approval: transition to Running
1060        engine
1061            .store()
1062            .update_run_status(run.id, RunStatus::Running)
1063            .await
1064            .unwrap();
1065
1066        // Resume: replays build, skips approval, executes deploy
1067        let resumed = engine.resume_run(run.id).await.unwrap();
1068        assert_eq!(resumed.status.state, RunStatus::Completed);
1069
1070        let steps = engine.store().list_steps(run.id).await.unwrap();
1071        assert_eq!(steps.len(), 3); // build + approval + deploy
1072        assert_eq!(steps[0].name, "build");
1073        assert_eq!(steps[0].status.state, StepStatus::Completed);
1074        assert_eq!(steps[1].name, "gate");
1075        assert_eq!(steps[1].kind, StepKind::Approval);
1076        assert_eq!(steps[1].status.state, StepStatus::Completed);
1077        assert_eq!(steps[2].name, "deploy");
1078        assert_eq!(steps[2].status.state, StepStatus::Completed);
1079    }
1080
1081    #[tokio::test]
1082    async fn double_approval_two_resumes() {
1083        let mut engine = create_test_engine();
1084        engine.register(DoubleApprovalWorkflow).unwrap();
1085
1086        // First execution: pauses at staging-gate
1087        let run = engine
1088            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1089            .await
1090            .unwrap();
1091        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1092
1093        let steps = engine.store().list_steps(run.id).await.unwrap();
1094        assert_eq!(steps.len(), 2); // build + staging-gate
1095
1096        // First approval
1097        engine
1098            .store()
1099            .update_run_status(run.id, RunStatus::Running)
1100            .await
1101            .unwrap();
1102
1103        let resumed = engine.resume_run(run.id).await.unwrap();
1104        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1105
1106        let steps = engine.store().list_steps(run.id).await.unwrap();
1107        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1108
1109        // Second approval
1110        engine
1111            .store()
1112            .update_run_status(run.id, RunStatus::Running)
1113            .await
1114            .unwrap();
1115
1116        let final_run = engine.resume_run(run.id).await.unwrap();
1117        assert_eq!(final_run.status.state, RunStatus::Completed);
1118
1119        let steps = engine.store().list_steps(run.id).await.unwrap();
1120        assert_eq!(steps.len(), 5);
1121        assert_eq!(steps[0].name, "build");
1122        assert_eq!(steps[1].name, "staging-gate");
1123        assert_eq!(steps[2].name, "deploy-staging");
1124        assert_eq!(steps[3].name, "prod-gate");
1125        assert_eq!(steps[4].name, "deploy-prod");
1126
1127        for step in &steps {
1128            assert_eq!(step.status.state, StepStatus::Completed);
1129        }
1130    }
1131}