Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19use ironflow_core::provider::AgentProvider;
20use ironflow_store::error::StoreError;
21use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
22use ironflow_store::store::RunStore;
23
24use crate::context::WorkflowContext;
25use crate::error::EngineError;
26use crate::handler::{WorkflowHandler, WorkflowInfo};
27use crate::notify::{Event, EventPublisher, EventSubscriber};
28
29/// The workflow orchestration engine.
30///
31/// Holds references to the store, agent provider, and a registry of
32/// [`WorkflowHandler`]s.
33///
34/// # Examples
35///
36/// ```no_run
37/// use std::sync::Arc;
38/// use ironflow_engine::engine::Engine;
39/// use ironflow_engine::config::ShellConfig;
40/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
41/// use ironflow_engine::context::WorkflowContext;
42/// use ironflow_store::memory::InMemoryStore;
43/// use ironflow_store::models::TriggerKind;
44/// use ironflow_core::providers::claude::ClaudeCodeProvider;
45/// use serde_json::json;
46///
47/// struct CiWorkflow;
48/// impl WorkflowHandler for CiWorkflow {
49///     fn name(&self) -> &str { "ci" }
50///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
51///         Box::pin(async move {
52///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
53///             Ok(())
54///         })
55///     }
56/// }
57///
58/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
59/// let store = Arc::new(InMemoryStore::new());
60/// let provider = Arc::new(ClaudeCodeProvider::new());
61/// let mut engine = Engine::new(store, provider);
62/// engine.register(CiWorkflow)?;
63///
64/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
65/// println!("Run {} completed with status {:?}", run.id, run.status);
66/// # Ok(())
67/// # }
68/// ```
69pub struct Engine {
70    store: Arc<dyn RunStore>,
71    provider: Arc<dyn AgentProvider>,
72    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
73    event_publisher: EventPublisher,
74}
75
76impl Engine {
77    /// Create a new engine with the given store and agent provider.
78    ///
79    /// # Examples
80    ///
81    /// ```no_run
82    /// use std::sync::Arc;
83    /// use ironflow_engine::engine::Engine;
84    /// use ironflow_store::memory::InMemoryStore;
85    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
86    ///
87    /// let engine = Engine::new(
88    ///     Arc::new(InMemoryStore::new()),
89    ///     Arc::new(ClaudeCodeProvider::new()),
90    /// );
91    /// ```
92    pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
93        Self {
94            store,
95            provider,
96            handlers: HashMap::new(),
97            event_publisher: EventPublisher::new(),
98        }
99    }
100
101    /// Returns a reference to the backing store.
102    pub fn store(&self) -> &Arc<dyn RunStore> {
103        &self.store
104    }
105
106    /// Returns a reference to the agent provider.
107    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
108        &self.provider
109    }
110
111    /// Build a [`WorkflowContext`] with access to the handler registry.
112    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
113        let handlers = self.handlers.clone();
114        let resolver: crate::context::HandlerResolver =
115            Arc::new(move |name: &str| handlers.get(name).cloned());
116        WorkflowContext::with_handler_resolver(
117            run_id,
118            self.store.clone(),
119            self.provider.clone(),
120            resolver,
121        )
122    }
123
124    // -----------------------------------------------------------------------
125    // Handler registration
126    // -----------------------------------------------------------------------
127
128    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
129    ///
130    /// The handler is looked up by [`WorkflowHandler::name`] when executing
131    /// or enqueuing.
132    ///
133    /// # Errors
134    ///
135    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
136    /// name is already registered.
137    ///
138    /// # Examples
139    ///
140    /// ```no_run
141    /// use std::sync::Arc;
142    /// use ironflow_engine::engine::Engine;
143    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
144    /// use ironflow_engine::context::WorkflowContext;
145    /// use ironflow_engine::config::ShellConfig;
146    /// use ironflow_store::memory::InMemoryStore;
147    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
148    ///
149    /// struct MyWorkflow;
150    /// impl WorkflowHandler for MyWorkflow {
151    ///     fn name(&self) -> &str { "my-workflow" }
152    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
153    ///         Box::pin(async move {
154    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
155    ///             Ok(())
156    ///         })
157    ///     }
158    /// }
159    ///
160    /// let mut engine = Engine::new(
161    ///     Arc::new(InMemoryStore::new()),
162    ///     Arc::new(ClaudeCodeProvider::new()),
163    /// );
164    /// engine.register(MyWorkflow)?;
165    /// # Ok::<(), ironflow_engine::error::EngineError>(())
166    /// ```
167    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
168        let name = handler.name().to_string();
169        if self.handlers.contains_key(&name) {
170            return Err(EngineError::InvalidWorkflow(format!(
171                "handler '{}' already registered",
172                name
173            )));
174        }
175        self.handlers.insert(name, Arc::new(handler));
176        Ok(())
177    }
178
179    /// Register a pre-boxed workflow handler.
180    ///
181    /// # Errors
182    ///
183    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
184    /// name is already registered.
185    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
186        let name = handler.name().to_string();
187        if self.handlers.contains_key(&name) {
188            return Err(EngineError::InvalidWorkflow(format!(
189                "handler '{}' already registered",
190                name
191            )));
192        }
193        self.handlers.insert(name, Arc::from(handler));
194        Ok(())
195    }
196
197    /// Get a registered handler by name.
198    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
199        self.handlers.get(name)
200    }
201
202    /// List registered handler names.
203    pub fn handler_names(&self) -> Vec<&str> {
204        self.handlers.keys().map(|s| s.as_str()).collect()
205    }
206
207    /// Get detailed info about a registered workflow handler.
208    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
209        self.handlers.get(name).map(|h| h.describe())
210    }
211
212    /// Register an event subscriber for domain events.
213    ///
214    /// The subscriber is called only for events whose type is in
215    /// `event_types`. Pass [`Event::ALL`] to receive every event.
216    ///
217    /// # Examples
218    ///
219    /// ```no_run
220    /// use ironflow_engine::engine::Engine;
221    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
222    /// use ironflow_store::memory::InMemoryStore;
223    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
224    /// use std::sync::Arc;
225    ///
226    /// let mut engine = Engine::new(
227    ///     Arc::new(InMemoryStore::new()),
228    ///     Arc::new(ClaudeCodeProvider::new()),
229    /// );
230    ///
231    /// engine.subscribe(
232    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
233    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
234    /// );
235    /// ```
236    pub fn subscribe(
237        &mut self,
238        subscriber: impl EventSubscriber + 'static,
239        event_types: &[&'static str],
240    ) {
241        self.event_publisher.subscribe(subscriber, event_types);
242    }
243
244    /// Returns a reference to the event publisher.
245    ///
246    /// Useful for publishing events from outside the engine (e.g. auth
247    /// routes in the API layer).
248    pub fn event_publisher(&self) -> &EventPublisher {
249        &self.event_publisher
250    }
251
252    // -----------------------------------------------------------------------
253    // Dynamic workflow execution (WorkflowHandler)
254    // -----------------------------------------------------------------------
255
256    /// Execute a registered handler inline.
257    ///
258    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
259    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
264    /// with that name. Returns [`EngineError`] if execution fails.
265    ///
266    /// # Examples
267    ///
268    /// ```no_run
269    /// use std::sync::Arc;
270    /// use ironflow_engine::engine::Engine;
271    /// use ironflow_store::memory::InMemoryStore;
272    /// use ironflow_store::models::TriggerKind;
273    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
274    /// use serde_json::json;
275    ///
276    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
277    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
278    /// # Ok(())
279    /// # }
280    /// ```
281    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
282    pub async fn run_handler(
283        &self,
284        handler_name: &str,
285        trigger: TriggerKind,
286        payload: Value,
287    ) -> Result<Run, EngineError> {
288        let handler = self
289            .handlers
290            .get(handler_name)
291            .ok_or_else(|| {
292                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
293            })?
294            .clone();
295
296        let run = self
297            .store
298            .create_run(NewRun {
299                workflow_name: handler_name.to_string(),
300                trigger,
301                payload,
302                max_retries: 0,
303            })
304            .await?;
305
306        let run_id = run.id;
307        info!(run_id = %run_id, "run created");
308
309        self.store
310            .update_run_status(run_id, RunStatus::Running)
311            .await?;
312
313        let run_start = Instant::now();
314        let mut ctx = self.build_context(run_id);
315
316        let result = handler.execute(&mut ctx).await;
317        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
318            .await
319    }
320
321    /// Enqueue a handler-based workflow for worker execution.
322    ///
323    /// The workflow name is stored in the run. The worker looks up the
324    /// handler by name when executing.
325    ///
326    /// # Errors
327    ///
328    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
329    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
330    pub async fn enqueue_handler(
331        &self,
332        handler_name: &str,
333        trigger: TriggerKind,
334        payload: Value,
335        max_retries: u32,
336    ) -> Result<Run, EngineError> {
337        if !self.handlers.contains_key(handler_name) {
338            return Err(EngineError::InvalidWorkflow(format!(
339                "no handler registered: {handler_name}"
340            )));
341        }
342
343        let run = self
344            .store
345            .create_run(NewRun {
346                workflow_name: handler_name.to_string(),
347                trigger,
348                payload,
349                max_retries,
350            })
351            .await?;
352
353        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
354        Ok(run)
355    }
356
357    /// Execute a handler-based run (used by the worker after pick_next_pending).
358    ///
359    /// Looks up the handler by the run's `workflow_name` and executes it
360    /// with a fresh [`WorkflowContext`].
361    ///
362    /// # Errors
363    ///
364    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
365    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
366    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
367        let run = self
368            .store
369            .get_run(run_id)
370            .await?
371            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
372
373        let handler = self
374            .handlers
375            .get(&run.workflow_name)
376            .ok_or_else(|| {
377                EngineError::InvalidWorkflow(format!(
378                    "no handler registered: {}",
379                    run.workflow_name
380                ))
381            })?
382            .clone();
383
384        let run_start = Instant::now();
385        let mut ctx = self.build_context(run_id);
386
387        let result = handler.execute(&mut ctx).await;
388        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
389            .await
390    }
391
392    /// Execute a run by its ID (used by the worker after pick_next_pending).
393    ///
394    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
395    ///
396    /// # Errors
397    ///
398    /// Returns [`EngineError`] if the run is not found or execution fails.
399    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
400    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
401        self.execute_handler_run(run_id).await
402    }
403
404    /// Resume a run after human approval.
405    ///
406    /// Re-executes the handler with step replay: completed steps return
407    /// cached output, approved approval steps are skipped, and execution
408    /// continues from the first unexecuted step.
409    ///
410    /// Supports multiple approval gates -- each resume replays all prior
411    /// steps and stops at the next approval (or completes the run).
412    ///
413    /// # Errors
414    ///
415    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
416    /// Returns [`EngineError`] if execution fails or hits another approval.
417    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
418    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
419        let run = self
420            .store
421            .get_run(run_id)
422            .await?
423            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
424
425        let handler = self
426            .handlers
427            .get(&run.workflow_name)
428            .ok_or_else(|| {
429                EngineError::InvalidWorkflow(format!(
430                    "no handler registered: {}",
431                    run.workflow_name
432                ))
433            })?
434            .clone();
435
436        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
437
438        let run_start = Instant::now();
439        let mut ctx = self.build_context(run_id);
440        ctx.load_replay_steps().await?;
441
442        let result = handler.execute(&mut ctx).await;
443        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
444            .await
445    }
446
447    /// Finalize a run with the given result and context.
448    ///
449    /// On success: updates run to Completed with cost, duration, and completed_at.
450    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
451    /// Always: fetches and returns the final Run.
452    ///
453    /// TODO: `get_run` at the end could be optimized by using an `update_run_returning`
454    /// method if the store supports it.
455    async fn finalize_run(
456        &self,
457        run_id: Uuid,
458        workflow_name: &str,
459        result: Result<(), EngineError>,
460        ctx: &WorkflowContext,
461        run_start: Instant,
462    ) -> Result<Run, EngineError> {
463        let total_duration = run_start.elapsed().as_millis() as u64;
464        let completed_at = Utc::now();
465
466        let final_status;
467
468        match result {
469            Ok(()) => {
470                final_status = RunStatus::Completed;
471                self.store
472                    .update_run(
473                        run_id,
474                        RunUpdate {
475                            status: Some(RunStatus::Completed),
476                            cost_usd: Some(ctx.total_cost_usd()),
477                            duration_ms: Some(total_duration),
478                            completed_at: Some(completed_at),
479                            ..RunUpdate::default()
480                        },
481                    )
482                    .await?;
483
484                info!(
485                    run_id = %run_id,
486                    cost_usd = %ctx.total_cost_usd(),
487                    duration_ms = total_duration,
488                    "run completed"
489                );
490            }
491            Err(EngineError::ApprovalRequired {
492                run_id: approval_run_id,
493                step_id,
494                ref message,
495            }) => {
496                final_status = RunStatus::AwaitingApproval;
497                self.store
498                    .update_run(
499                        run_id,
500                        RunUpdate {
501                            status: Some(RunStatus::AwaitingApproval),
502                            cost_usd: Some(ctx.total_cost_usd()),
503                            duration_ms: Some(total_duration),
504                            ..RunUpdate::default()
505                        },
506                    )
507                    .await?;
508
509                info!(
510                    run_id = %approval_run_id,
511                    step_id = %step_id,
512                    message = %message,
513                    "run awaiting approval"
514                );
515            }
516            Err(err) => {
517                final_status = RunStatus::Failed;
518                if let Err(store_err) = self
519                    .store
520                    .update_run(
521                        run_id,
522                        RunUpdate {
523                            status: Some(RunStatus::Failed),
524                            error: Some(err.to_string()),
525                            cost_usd: Some(ctx.total_cost_usd()),
526                            duration_ms: Some(total_duration),
527                            completed_at: Some(completed_at),
528                            ..RunUpdate::default()
529                        },
530                    )
531                    .await
532                {
533                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
534                }
535
536                error!(run_id = %run_id, error = %err, "run failed");
537
538                self.publish_run_status_changed(
539                    workflow_name,
540                    run_id,
541                    final_status,
542                    Some(err.to_string()),
543                    ctx,
544                    total_duration,
545                );
546                return Err(err);
547            }
548        }
549
550        self.publish_run_status_changed(
551            workflow_name,
552            run_id,
553            final_status,
554            None,
555            ctx,
556            total_duration,
557        );
558
559        self.store
560            .get_run(run_id)
561            .await?
562            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))
563    }
564
565    /// Publish a run status changed event to all registered subscribers.
566    ///
567    /// `from` is always `Running` because `finalize_run` is only called
568    /// from a running state.
569    fn publish_run_status_changed(
570        &self,
571        workflow_name: &str,
572        run_id: Uuid,
573        to: RunStatus,
574        error: Option<String>,
575        ctx: &WorkflowContext,
576        duration_ms: u64,
577    ) {
578        self.event_publisher.publish(Event::RunStatusChanged {
579            run_id,
580            workflow_name: workflow_name.to_string(),
581            from: RunStatus::Running,
582            to,
583            error,
584            cost_usd: ctx.total_cost_usd(),
585            duration_ms,
586            at: Utc::now(),
587        });
588    }
589}
590
591impl fmt::Debug for Engine {
592    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
593        f.debug_struct("Engine")
594            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
595            .finish_non_exhaustive()
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602    use crate::config::ShellConfig;
603    use crate::handler::{HandlerFuture, WorkflowHandler};
604    use ironflow_core::providers::claude::ClaudeCodeProvider;
605    use ironflow_core::providers::record_replay::RecordReplayProvider;
606    use ironflow_store::memory::InMemoryStore;
607    use ironflow_store::models::StepStatus;
608    use serde_json::json;
609
610    // Test handler that echoes a message via shell
611    struct EchoWorkflow;
612
613    impl WorkflowHandler for EchoWorkflow {
614        fn name(&self) -> &str {
615            "echo-workflow"
616        }
617
618        fn describe(&self) -> WorkflowInfo {
619            WorkflowInfo {
620                description: "A simple workflow that echoes hello".to_string(),
621                source_code: None,
622                sub_workflows: Vec::new(),
623            }
624        }
625
626        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
627            Box::pin(async move {
628                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
629                Ok(())
630            })
631        }
632    }
633
634    // Test handler that fails
635    struct FailingWorkflow;
636
637    impl WorkflowHandler for FailingWorkflow {
638        fn name(&self) -> &str {
639            "failing-workflow"
640        }
641
642        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
643            Box::pin(async move {
644                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
645                Ok(())
646            })
647        }
648    }
649
650    fn create_test_engine() -> Engine {
651        let store = Arc::new(InMemoryStore::new());
652        let inner = ClaudeCodeProvider::new();
653        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
654            inner,
655            "/tmp/ironflow-fixtures",
656        ));
657        Engine::new(store, provider)
658    }
659
660    #[test]
661    fn engine_new_creates_instance() {
662        let engine = create_test_engine();
663        assert_eq!(engine.handler_names().len(), 0);
664    }
665
666    #[test]
667    fn engine_register_handler() {
668        let mut engine = create_test_engine();
669        let result = engine.register(EchoWorkflow);
670        assert!(result.is_ok());
671        assert_eq!(engine.handler_names().len(), 1);
672        assert!(engine.handler_names().contains(&"echo-workflow"));
673    }
674
675    #[test]
676    fn engine_register_duplicate_returns_error() {
677        let mut engine = create_test_engine();
678        engine.register(EchoWorkflow).unwrap();
679        let result = engine.register(EchoWorkflow);
680        assert!(result.is_err());
681    }
682
683    #[test]
684    fn engine_get_handler_found() {
685        let mut engine = create_test_engine();
686        engine.register(EchoWorkflow).unwrap();
687        let handler = engine.get_handler("echo-workflow");
688        assert!(handler.is_some());
689    }
690
691    #[test]
692    fn engine_get_handler_not_found() {
693        let engine = create_test_engine();
694        let handler = engine.get_handler("nonexistent");
695        assert!(handler.is_none());
696    }
697
698    #[test]
699    fn engine_handler_names_lists_all() {
700        let mut engine = create_test_engine();
701        engine.register(EchoWorkflow).unwrap();
702        engine.register(FailingWorkflow).unwrap();
703        let names = engine.handler_names();
704        assert_eq!(names.len(), 2);
705        assert!(names.contains(&"echo-workflow"));
706        assert!(names.contains(&"failing-workflow"));
707    }
708
709    #[test]
710    fn engine_handler_info_returns_description() {
711        let mut engine = create_test_engine();
712        engine.register(EchoWorkflow).unwrap();
713        let info = engine.handler_info("echo-workflow");
714        assert!(info.is_some());
715        let info = info.unwrap();
716        assert_eq!(info.description, "A simple workflow that echoes hello");
717    }
718
719    #[tokio::test]
720    async fn engine_unknown_workflow_returns_error() {
721        let engine = create_test_engine();
722        let result = engine
723            .run_handler("unknown", TriggerKind::Manual, json!({}))
724            .await;
725        assert!(result.is_err());
726        match result {
727            Err(EngineError::InvalidWorkflow(msg)) => {
728                assert!(msg.contains("no handler registered"));
729            }
730            _ => panic!("expected InvalidWorkflow error"),
731        }
732    }
733
734    #[tokio::test]
735    async fn engine_enqueue_handler_creates_pending_run() {
736        let mut engine = create_test_engine();
737        engine.register(EchoWorkflow).unwrap();
738
739        let run = engine
740            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
741            .await
742            .unwrap();
743        assert_eq!(run.status.state, RunStatus::Pending);
744        assert_eq!(run.workflow_name, "echo-workflow");
745    }
746
747    #[tokio::test]
748    async fn engine_register_boxed() {
749        let mut engine = create_test_engine();
750        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
751        let result = engine.register_boxed(handler);
752        assert!(result.is_ok());
753        assert_eq!(engine.handler_names().len(), 1);
754    }
755
756    #[tokio::test]
757    async fn engine_store_and_provider_accessors() {
758        let store = Arc::new(InMemoryStore::new());
759        let inner = ClaudeCodeProvider::new();
760        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
761            inner,
762            "/tmp/ironflow-fixtures",
763        ));
764        let engine = Engine::new(store.clone(), provider.clone());
765
766        // Verify accessors return references
767        let _ = engine.store();
768        let _ = engine.provider();
769    }
770
771    // -----------------------------------------------------------------------
772    // Operation trait tests
773    // -----------------------------------------------------------------------
774
775    use crate::operation::Operation;
776    use ironflow_store::models::StepKind;
777    use std::future::Future;
778    use std::pin::Pin;
779
780    struct FakeGitlabOp {
781        project_id: u64,
782        title: String,
783    }
784
785    impl Operation for FakeGitlabOp {
786        fn kind(&self) -> &str {
787            "gitlab"
788        }
789
790        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
791            Box::pin(async move {
792                Ok(json!({
793                    "issue_id": 42,
794                    "project_id": self.project_id,
795                    "title": self.title,
796                }))
797            })
798        }
799
800        fn input(&self) -> Option<Value> {
801            Some(json!({
802                "project_id": self.project_id,
803                "title": self.title,
804            }))
805        }
806    }
807
808    struct FailingOp;
809
810    impl Operation for FailingOp {
811        fn kind(&self) -> &str {
812            "broken-service"
813        }
814
815        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
816            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
817        }
818    }
819
820    struct OperationWorkflow;
821
822    impl WorkflowHandler for OperationWorkflow {
823        fn name(&self) -> &str {
824            "operation-workflow"
825        }
826
827        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
828            Box::pin(async move {
829                let op = FakeGitlabOp {
830                    project_id: 123,
831                    title: "Bug report".to_string(),
832                };
833                ctx.operation("create-issue", &op).await?;
834                Ok(())
835            })
836        }
837    }
838
839    struct FailingOperationWorkflow;
840
841    impl WorkflowHandler for FailingOperationWorkflow {
842        fn name(&self) -> &str {
843            "failing-operation-workflow"
844        }
845
846        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
847            Box::pin(async move {
848                ctx.operation("broken-call", &FailingOp).await?;
849                Ok(())
850            })
851        }
852    }
853
854    struct MixedWorkflow;
855
856    impl WorkflowHandler for MixedWorkflow {
857        fn name(&self) -> &str {
858            "mixed-workflow"
859        }
860
861        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
862            Box::pin(async move {
863                ctx.shell("build", ShellConfig::new("echo built")).await?;
864                let op = FakeGitlabOp {
865                    project_id: 456,
866                    title: "Deploy done".to_string(),
867                };
868                let result = ctx.operation("notify-gitlab", &op).await?;
869                assert_eq!(result.output["issue_id"], 42);
870                Ok(())
871            })
872        }
873    }
874
875    #[tokio::test]
876    async fn operation_step_happy_path() {
877        let mut engine = create_test_engine();
878        engine.register(OperationWorkflow).unwrap();
879
880        let run = engine
881            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
882            .await
883            .unwrap();
884
885        assert_eq!(run.status.state, RunStatus::Completed);
886
887        let steps = engine.store().list_steps(run.id).await.unwrap();
888
889        assert_eq!(steps.len(), 1);
890        assert_eq!(steps[0].name, "create-issue");
891        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
892        assert_eq!(
893            steps[0].status.state,
894            ironflow_store::models::StepStatus::Completed
895        );
896
897        let output = steps[0].output.as_ref().unwrap();
898        assert_eq!(output["issue_id"], 42);
899        assert_eq!(output["project_id"], 123);
900
901        let input = steps[0].input.as_ref().unwrap();
902        assert_eq!(input["project_id"], 123);
903        assert_eq!(input["title"], "Bug report");
904    }
905
906    #[tokio::test]
907    async fn operation_step_failure_marks_run_failed() {
908        let mut engine = create_test_engine();
909        engine.register(FailingOperationWorkflow).unwrap();
910
911        let result = engine
912            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
913            .await;
914
915        assert!(result.is_err());
916    }
917
918    #[tokio::test]
919    async fn operation_mixed_with_shell_steps() {
920        let mut engine = create_test_engine();
921        engine.register(MixedWorkflow).unwrap();
922
923        let run = engine
924            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
925            .await
926            .unwrap();
927
928        assert_eq!(run.status.state, RunStatus::Completed);
929
930        let steps = engine.store().list_steps(run.id).await.unwrap();
931
932        assert_eq!(steps.len(), 2);
933        assert_eq!(steps[0].kind, StepKind::Shell);
934        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
935        assert_eq!(steps[0].position, 0);
936        assert_eq!(steps[1].position, 1);
937    }
938
939    // -----------------------------------------------------------------------
940    // Approval + resume tests
941    // -----------------------------------------------------------------------
942
943    use crate::config::ApprovalConfig;
944
945    struct SingleApprovalWorkflow;
946
947    impl WorkflowHandler for SingleApprovalWorkflow {
948        fn name(&self) -> &str {
949            "single-approval"
950        }
951
952        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
953            Box::pin(async move {
954                ctx.shell("build", ShellConfig::new("echo built")).await?;
955                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
956                ctx.shell("deploy", ShellConfig::new("echo deployed"))
957                    .await?;
958                Ok(())
959            })
960        }
961    }
962
963    struct DoubleApprovalWorkflow;
964
965    impl WorkflowHandler for DoubleApprovalWorkflow {
966        fn name(&self) -> &str {
967            "double-approval"
968        }
969
970        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
971            Box::pin(async move {
972                ctx.shell("build", ShellConfig::new("echo built")).await?;
973                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
974                    .await?;
975                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
976                    .await?;
977                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
978                    .await?;
979                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
980                    .await?;
981                Ok(())
982            })
983        }
984    }
985
986    #[tokio::test]
987    async fn approval_pauses_run() {
988        let mut engine = create_test_engine();
989        engine.register(SingleApprovalWorkflow).unwrap();
990
991        let run = engine
992            .run_handler("single-approval", TriggerKind::Manual, json!({}))
993            .await
994            .unwrap();
995
996        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
997
998        let steps = engine.store().list_steps(run.id).await.unwrap();
999        assert_eq!(steps.len(), 2); // build + approval gate
1000        assert_eq!(steps[0].kind, StepKind::Shell);
1001        assert_eq!(steps[0].status.state, StepStatus::Completed);
1002        assert_eq!(steps[1].kind, StepKind::Approval);
1003        assert_eq!(steps[1].status.state, StepStatus::Running);
1004    }
1005
1006    #[tokio::test]
1007    async fn approval_resume_completes_run() {
1008        let mut engine = create_test_engine();
1009        engine.register(SingleApprovalWorkflow).unwrap();
1010
1011        // First execution: pauses at approval
1012        let run = engine
1013            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1014            .await
1015            .unwrap();
1016        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1017
1018        // Simulate approval: transition to Running
1019        engine
1020            .store()
1021            .update_run_status(run.id, RunStatus::Running)
1022            .await
1023            .unwrap();
1024
1025        // Resume: replays build, skips approval, executes deploy
1026        let resumed = engine.resume_run(run.id).await.unwrap();
1027        assert_eq!(resumed.status.state, RunStatus::Completed);
1028
1029        let steps = engine.store().list_steps(run.id).await.unwrap();
1030        assert_eq!(steps.len(), 3); // build + approval + deploy
1031        assert_eq!(steps[0].name, "build");
1032        assert_eq!(steps[0].status.state, StepStatus::Completed);
1033        assert_eq!(steps[1].name, "gate");
1034        assert_eq!(steps[1].kind, StepKind::Approval);
1035        assert_eq!(steps[1].status.state, StepStatus::Completed);
1036        assert_eq!(steps[2].name, "deploy");
1037        assert_eq!(steps[2].status.state, StepStatus::Completed);
1038    }
1039
1040    #[tokio::test]
1041    async fn double_approval_two_resumes() {
1042        let mut engine = create_test_engine();
1043        engine.register(DoubleApprovalWorkflow).unwrap();
1044
1045        // First execution: pauses at staging-gate
1046        let run = engine
1047            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1048            .await
1049            .unwrap();
1050        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1051
1052        let steps = engine.store().list_steps(run.id).await.unwrap();
1053        assert_eq!(steps.len(), 2); // build + staging-gate
1054
1055        // First approval
1056        engine
1057            .store()
1058            .update_run_status(run.id, RunStatus::Running)
1059            .await
1060            .unwrap();
1061
1062        let resumed = engine.resume_run(run.id).await.unwrap();
1063        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1064
1065        let steps = engine.store().list_steps(run.id).await.unwrap();
1066        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1067
1068        // Second approval
1069        engine
1070            .store()
1071            .update_run_status(run.id, RunStatus::Running)
1072            .await
1073            .unwrap();
1074
1075        let final_run = engine.resume_run(run.id).await.unwrap();
1076        assert_eq!(final_run.status.state, RunStatus::Completed);
1077
1078        let steps = engine.store().list_steps(run.id).await.unwrap();
1079        assert_eq!(steps.len(), 5);
1080        assert_eq!(steps[0].name, "build");
1081        assert_eq!(steps[1].name, "staging-gate");
1082        assert_eq!(steps[2].name, "deploy-staging");
1083        assert_eq!(steps[3].name, "prod-gate");
1084        assert_eq!(steps[4].name, "deploy-prod");
1085
1086        for step in &steps {
1087            assert_eq!(step.status.state, StepStatus::Completed);
1088        }
1089    }
1090}