Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] — orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and the executor for running steps. It supports:
5//!
6//! - **Static workflows** ([`WorkflowDef`]): serializable step sequences without chaining.
7//! - **Dynamic workflows** ([`WorkflowHandler`](crate::handler::WorkflowHandler)): Rust-native
8//!   handlers where steps can reference previous outputs.
9//!
10//! Both can be executed inline or enqueued for a worker.
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15
16use chrono::Utc;
17use serde_json::Value;
18use tracing::{error, info};
19use uuid::Uuid;
20
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25
26use crate::config::StepConfig;
27use crate::context::WorkflowContext;
28use crate::error::EngineError;
29use crate::handler::{WorkflowHandler, WorkflowInfo};
30use crate::workflow::WorkflowDef;
31
32/// The workflow orchestration engine.
33///
34/// Holds references to the store, agent provider, and a registry of
35/// [`WorkflowHandler`]s for dynamic workflows.
36///
37/// # Examples
38///
39/// ```no_run
40/// use std::sync::Arc;
41/// use ironflow_engine::engine::Engine;
42/// use ironflow_engine::config::ShellConfig;
43/// use ironflow_engine::workflow::Workflow;
44/// use ironflow_store::memory::InMemoryStore;
45/// use ironflow_store::models::TriggerKind;
46/// use ironflow_core::providers::claude::ClaudeCodeProvider;
47/// use serde_json::json;
48///
49/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
50/// let store = Arc::new(InMemoryStore::new());
51/// let provider = Arc::new(ClaudeCodeProvider::new());
52/// let engine = Engine::new(store, provider);
53///
54/// let workflow = Workflow::new("ci")
55///     .shell("test", ShellConfig::new("cargo test"))
56///     .build()?;
57///
58/// let run = engine.run_inline(&workflow, TriggerKind::Manual, json!({})).await?;
59/// println!("Run {} completed with status {:?}", run.id, run.status);
60/// # Ok(())
61/// # }
62/// ```
63pub struct Engine {
64    store: Arc<dyn RunStore>,
65    provider: Arc<dyn AgentProvider>,
66    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
67}
68
69impl Engine {
70    /// Create a new engine with the given store and agent provider.
71    ///
72    /// # Examples
73    ///
74    /// ```no_run
75    /// use std::sync::Arc;
76    /// use ironflow_engine::engine::Engine;
77    /// use ironflow_store::memory::InMemoryStore;
78    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
79    ///
80    /// let engine = Engine::new(
81    ///     Arc::new(InMemoryStore::new()),
82    ///     Arc::new(ClaudeCodeProvider::new()),
83    /// );
84    /// ```
85    pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
86        Self {
87            store,
88            provider,
89            handlers: HashMap::new(),
90        }
91    }
92
93    /// Returns a reference to the backing store.
94    pub fn store(&self) -> &Arc<dyn RunStore> {
95        &self.store
96    }
97
98    /// Returns a reference to the agent provider.
99    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
100        &self.provider
101    }
102
103    /// Build a [`WorkflowContext`] with access to the handler registry.
104    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
105        let handlers = self.handlers.clone();
106        let resolver: crate::context::HandlerResolver =
107            Arc::new(move |name: &str| handlers.get(name).cloned());
108        WorkflowContext::with_handler_resolver(
109            run_id,
110            self.store.clone(),
111            self.provider.clone(),
112            resolver,
113        )
114    }
115
116    // -----------------------------------------------------------------------
117    // Handler registration
118    // -----------------------------------------------------------------------
119
120    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
121    ///
122    /// The handler is looked up by [`WorkflowHandler::name`] when executing
123    /// or enqueuing.
124    ///
125    /// # Errors
126    ///
127    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
128    /// name is already registered.
129    ///
130    /// # Examples
131    ///
132    /// ```no_run
133    /// use std::sync::Arc;
134    /// use ironflow_engine::engine::Engine;
135    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
136    /// use ironflow_engine::context::WorkflowContext;
137    /// use ironflow_engine::config::ShellConfig;
138    /// use ironflow_store::memory::InMemoryStore;
139    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
140    ///
141    /// struct MyWorkflow;
142    /// impl WorkflowHandler for MyWorkflow {
143    ///     fn name(&self) -> &str { "my-workflow" }
144    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
145    ///         Box::pin(async move {
146    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
147    ///             Ok(())
148    ///         })
149    ///     }
150    /// }
151    ///
152    /// let mut engine = Engine::new(
153    ///     Arc::new(InMemoryStore::new()),
154    ///     Arc::new(ClaudeCodeProvider::new()),
155    /// );
156    /// engine.register(MyWorkflow)?;
157    /// # Ok::<(), ironflow_engine::error::EngineError>(())
158    /// ```
159    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
160        let name = handler.name().to_string();
161        if self.handlers.contains_key(&name) {
162            return Err(EngineError::InvalidWorkflow(format!(
163                "handler '{}' already registered",
164                name
165            )));
166        }
167        self.handlers.insert(name, Arc::new(handler));
168        Ok(())
169    }
170
171    /// Register a pre-boxed workflow handler.
172    ///
173    /// # Errors
174    ///
175    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
176    /// name is already registered.
177    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
178        let name = handler.name().to_string();
179        if self.handlers.contains_key(&name) {
180            return Err(EngineError::InvalidWorkflow(format!(
181                "handler '{}' already registered",
182                name
183            )));
184        }
185        self.handlers.insert(name, Arc::from(handler));
186        Ok(())
187    }
188
189    /// Get a registered handler by name.
190    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
191        self.handlers.get(name)
192    }
193
194    /// List registered handler names.
195    pub fn handler_names(&self) -> Vec<&str> {
196        self.handlers.keys().map(|s| s.as_str()).collect()
197    }
198
199    /// Get detailed info about a registered workflow handler.
200    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
201        self.handlers.get(name).map(|h| h.describe())
202    }
203
204    // -----------------------------------------------------------------------
205    // Dynamic workflow execution (WorkflowHandler)
206    // -----------------------------------------------------------------------
207
208    /// Execute a registered handler inline.
209    ///
210    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
211    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
212    ///
213    /// # Errors
214    ///
215    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
216    /// with that name. Returns [`EngineError`] if execution fails.
217    ///
218    /// # Examples
219    ///
220    /// ```no_run
221    /// use std::sync::Arc;
222    /// use ironflow_engine::engine::Engine;
223    /// use ironflow_store::memory::InMemoryStore;
224    /// use ironflow_store::models::TriggerKind;
225    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
226    /// use serde_json::json;
227    ///
228    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
229    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
230    /// # Ok(())
231    /// # }
232    /// ```
233    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
234    pub async fn run_handler(
235        &self,
236        handler_name: &str,
237        trigger: TriggerKind,
238        payload: Value,
239    ) -> Result<Run, EngineError> {
240        let handler = self
241            .handlers
242            .get(handler_name)
243            .ok_or_else(|| {
244                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
245            })?
246            .clone();
247
248        let run = self
249            .store
250            .create_run(NewRun {
251                workflow_name: handler_name.to_string(),
252                trigger,
253                payload,
254                max_retries: 0,
255            })
256            .await?;
257
258        let run_id = run.id;
259        info!(run_id = %run_id, "run created");
260
261        self.store
262            .update_run_status(run_id, RunStatus::Running)
263            .await?;
264
265        let run_start = Instant::now();
266        let mut ctx = self.build_context(run_id);
267
268        let result = handler.execute(&mut ctx).await;
269        self.finalize_run(run_id, result, &ctx, run_start).await
270    }
271
272    /// Enqueue a handler-based workflow for worker execution.
273    ///
274    /// The workflow name is stored in the run. The worker looks up the
275    /// handler by name when executing.
276    ///
277    /// # Errors
278    ///
279    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
280    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
281    pub async fn enqueue_handler(
282        &self,
283        handler_name: &str,
284        trigger: TriggerKind,
285        payload: Value,
286        max_retries: u32,
287    ) -> Result<Run, EngineError> {
288        if !self.handlers.contains_key(handler_name) {
289            return Err(EngineError::InvalidWorkflow(format!(
290                "no handler registered: {handler_name}"
291            )));
292        }
293
294        let run = self
295            .store
296            .create_run(NewRun {
297                workflow_name: handler_name.to_string(),
298                trigger,
299                payload,
300                max_retries,
301            })
302            .await?;
303
304        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
305        Ok(run)
306    }
307
308    /// Execute a handler-based run (used by the worker after pick_next_pending).
309    ///
310    /// Looks up the handler by the run's `workflow_name` and executes it
311    /// with a fresh [`WorkflowContext`].
312    ///
313    /// # Errors
314    ///
315    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
316    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
317    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
318        let run = self
319            .store
320            .get_run(run_id)
321            .await?
322            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
323
324        let handler = self
325            .handlers
326            .get(&run.workflow_name)
327            .ok_or_else(|| {
328                EngineError::InvalidWorkflow(format!(
329                    "no handler registered: {}",
330                    run.workflow_name
331                ))
332            })?
333            .clone();
334
335        let run_start = Instant::now();
336        let mut ctx = self.build_context(run_id);
337
338        let result = handler.execute(&mut ctx).await;
339        self.finalize_run(run_id, result, &ctx, run_start).await
340    }
341
342    // -----------------------------------------------------------------------
343    // Static workflow execution (WorkflowDef) — backward compatible
344    // -----------------------------------------------------------------------
345
346    /// Execute a static workflow inline.
347    ///
348    /// For workflows without step chaining. Each step config is fixed at
349    /// definition time.
350    ///
351    /// # Errors
352    ///
353    /// Returns [`EngineError`] if any step fails.
354    #[tracing::instrument(name = "engine.run_inline", skip_all, fields(workflow = %workflow.name))]
355    pub async fn run_inline(
356        &self,
357        workflow: &WorkflowDef,
358        trigger: TriggerKind,
359        payload: Value,
360    ) -> Result<Run, EngineError> {
361        let run = self
362            .store
363            .create_run(NewRun {
364                workflow_name: workflow.name.clone(),
365                trigger,
366                payload,
367                max_retries: 0,
368            })
369            .await?;
370
371        let run_id = run.id;
372        info!(run_id = %run_id, "run created");
373
374        self.store
375            .update_run_status(run_id, RunStatus::Running)
376            .await?;
377
378        let run_start = Instant::now();
379        let mut ctx = self.build_context(run_id);
380
381        // Execute each step via WorkflowContext for consistent lifecycle management.
382        let result = async {
383            for step_def in &workflow.steps {
384                match &step_def.config {
385                    StepConfig::Shell(cfg) => {
386                        ctx.shell(&step_def.name, cfg.clone()).await?;
387                    }
388                    StepConfig::Http(cfg) => {
389                        ctx.http(&step_def.name, cfg.clone()).await?;
390                    }
391                    StepConfig::Agent(cfg) => {
392                        ctx.agent(&step_def.name, cfg.clone()).await?;
393                    }
394                    StepConfig::Workflow(cfg) => {
395                        let handler = self.handlers.get(&cfg.workflow_name).ok_or_else(|| {
396                            EngineError::InvalidWorkflow(format!(
397                                "no handler registered: {}",
398                                cfg.workflow_name
399                            ))
400                        })?;
401                        ctx.workflow(handler.as_ref(), cfg.payload.clone()).await?;
402                    }
403                }
404            }
405            Ok::<(), EngineError>(())
406        }
407        .await;
408
409        self.finalize_run(run_id, result, &ctx, run_start).await
410    }
411
412    /// Enqueue a static workflow for worker execution.
413    ///
414    /// Serializes the workflow definition into the payload.
415    #[tracing::instrument(name = "engine.enqueue", skip_all, fields(workflow = %workflow.name))]
416    pub async fn enqueue(
417        &self,
418        workflow: &WorkflowDef,
419        trigger: TriggerKind,
420        payload: Value,
421        max_retries: u32,
422    ) -> Result<Run, EngineError> {
423        let enriched_payload = serde_json::json!({
424            "workflow": serde_json::to_value(workflow)?,
425            "original_payload": payload,
426        });
427
428        let run = self
429            .store
430            .create_run(NewRun {
431                workflow_name: workflow.name.clone(),
432                trigger,
433                payload: enriched_payload,
434                max_retries,
435            })
436            .await?;
437
438        info!(run_id = %run.id, workflow = %workflow.name, "run enqueued");
439        Ok(run)
440    }
441
442    /// Execute a static workflow run (used by the worker).
443    ///
444    /// Extracts the workflow definition from the run's payload.
445    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
446    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
447        // Try handler-based execution first.
448        let run = self
449            .store
450            .get_run(run_id)
451            .await?
452            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
453
454        if self.handlers.contains_key(&run.workflow_name) {
455            return self.execute_handler_run(run_id).await;
456        }
457
458        // Fall back to static workflow from payload.
459        let workflow: WorkflowDef = serde_json::from_value(
460            run.payload
461                .get("workflow")
462                .cloned()
463                .ok_or_else(|| EngineError::StepConfig("missing 'workflow' in payload".into()))?,
464        )
465        .map_err(|e| EngineError::StepConfig(format!("invalid workflow in payload: {e}")))?;
466
467        let run_start = Instant::now();
468        let mut ctx = self.build_context(run_id);
469
470        // Execute static steps via context for consistency.
471        let result = async {
472            for step_def in &workflow.steps {
473                match &step_def.config {
474                    StepConfig::Shell(cfg) => {
475                        ctx.shell(&step_def.name, cfg.clone()).await?;
476                    }
477                    StepConfig::Http(cfg) => {
478                        ctx.http(&step_def.name, cfg.clone()).await?;
479                    }
480                    StepConfig::Agent(cfg) => {
481                        ctx.agent(&step_def.name, cfg.clone()).await?;
482                    }
483                    StepConfig::Workflow(cfg) => {
484                        let handler = self.handlers.get(&cfg.workflow_name).ok_or_else(|| {
485                            EngineError::InvalidWorkflow(format!(
486                                "no handler registered: {}",
487                                cfg.workflow_name
488                            ))
489                        })?;
490                        ctx.workflow(handler.as_ref(), cfg.payload.clone()).await?;
491                    }
492                }
493            }
494            Ok::<(), EngineError>(())
495        }
496        .await;
497
498        self.finalize_run(run_id, result, &ctx, run_start).await
499    }
500
501    /// Finalize a run with the given result and context.
502    ///
503    /// On success: updates run to Completed with cost, duration, and completed_at.
504    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
505    /// Always: fetches and returns the final Run.
506    ///
507    /// TODO: `get_run` at the end could be optimized by using an `update_run_returning`
508    /// method if the store supports it.
509    async fn finalize_run(
510        &self,
511        run_id: Uuid,
512        result: Result<(), EngineError>,
513        ctx: &WorkflowContext,
514        run_start: Instant,
515    ) -> Result<Run, EngineError> {
516        let total_duration = run_start.elapsed().as_millis() as u64;
517        let completed_at = Utc::now();
518
519        match result {
520            Ok(()) => {
521                self.store
522                    .update_run(
523                        run_id,
524                        RunUpdate {
525                            status: Some(RunStatus::Completed),
526                            cost_usd: Some(ctx.total_cost_usd()),
527                            duration_ms: Some(total_duration),
528                            completed_at: Some(completed_at),
529                            ..RunUpdate::default()
530                        },
531                    )
532                    .await?;
533
534                info!(
535                    run_id = %run_id,
536                    cost_usd = %ctx.total_cost_usd(),
537                    duration_ms = total_duration,
538                    "run completed"
539                );
540            }
541            Err(err) => {
542                if let Err(store_err) = self
543                    .store
544                    .update_run(
545                        run_id,
546                        RunUpdate {
547                            status: Some(RunStatus::Failed),
548                            error: Some(err.to_string()),
549                            cost_usd: Some(ctx.total_cost_usd()),
550                            duration_ms: Some(total_duration),
551                            completed_at: Some(completed_at),
552                            ..RunUpdate::default()
553                        },
554                    )
555                    .await
556                {
557                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
558                }
559
560                error!(run_id = %run_id, error = %err, "run failed");
561                return Err(err);
562            }
563        }
564
565        self.store
566            .get_run(run_id)
567            .await?
568            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))
569    }
570}
571
572impl std::fmt::Debug for Engine {
573    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
574        f.debug_struct("Engine")
575            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
576            .finish_non_exhaustive()
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use crate::config::ShellConfig;
584    use crate::handler::{HandlerFuture, WorkflowHandler};
585    use crate::workflow::Workflow;
586    use ironflow_core::providers::claude::ClaudeCodeProvider;
587    use ironflow_core::providers::record_replay::RecordReplayProvider;
588    use ironflow_store::memory::InMemoryStore;
589    use serde_json::json;
590
591    // Test handler that echoes a message via shell
592    struct EchoWorkflow;
593
594    impl WorkflowHandler for EchoWorkflow {
595        fn name(&self) -> &str {
596            "echo-workflow"
597        }
598
599        fn describe(&self) -> WorkflowInfo {
600            WorkflowInfo {
601                description: "A simple workflow that echoes hello".to_string(),
602                source_code: None,
603                sub_workflows: Vec::new(),
604            }
605        }
606
607        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
608            Box::pin(async move {
609                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
610                Ok(())
611            })
612        }
613    }
614
615    // Test handler that fails
616    struct FailingWorkflow;
617
618    impl WorkflowHandler for FailingWorkflow {
619        fn name(&self) -> &str {
620            "failing-workflow"
621        }
622
623        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
624            Box::pin(async move {
625                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
626                Ok(())
627            })
628        }
629    }
630
631    fn create_test_engine() -> Engine {
632        let store = Arc::new(InMemoryStore::new());
633        let inner = ClaudeCodeProvider::new();
634        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
635            inner,
636            "/tmp/ironflow-fixtures",
637        ));
638        Engine::new(store, provider)
639    }
640
641    #[test]
642    fn engine_new_creates_instance() {
643        let engine = create_test_engine();
644        assert_eq!(engine.handler_names().len(), 0);
645    }
646
647    #[test]
648    fn engine_register_handler() {
649        let mut engine = create_test_engine();
650        let result = engine.register(EchoWorkflow);
651        assert!(result.is_ok());
652        assert_eq!(engine.handler_names().len(), 1);
653        assert!(engine.handler_names().contains(&"echo-workflow"));
654    }
655
656    #[test]
657    fn engine_register_duplicate_returns_error() {
658        let mut engine = create_test_engine();
659        engine.register(EchoWorkflow).unwrap();
660        let result = engine.register(EchoWorkflow);
661        assert!(result.is_err());
662    }
663
664    #[test]
665    fn engine_get_handler_found() {
666        let mut engine = create_test_engine();
667        engine.register(EchoWorkflow).unwrap();
668        let handler = engine.get_handler("echo-workflow");
669        assert!(handler.is_some());
670    }
671
672    #[test]
673    fn engine_get_handler_not_found() {
674        let engine = create_test_engine();
675        let handler = engine.get_handler("nonexistent");
676        assert!(handler.is_none());
677    }
678
679    #[test]
680    fn engine_handler_names_lists_all() {
681        let mut engine = create_test_engine();
682        engine.register(EchoWorkflow).unwrap();
683        engine.register(FailingWorkflow).unwrap();
684        let names = engine.handler_names();
685        assert_eq!(names.len(), 2);
686        assert!(names.contains(&"echo-workflow"));
687        assert!(names.contains(&"failing-workflow"));
688    }
689
690    #[test]
691    fn engine_handler_info_returns_description() {
692        let mut engine = create_test_engine();
693        engine.register(EchoWorkflow).unwrap();
694        let info = engine.handler_info("echo-workflow");
695        assert!(info.is_some());
696        let info = info.unwrap();
697        assert_eq!(info.description, "A simple workflow that echoes hello");
698    }
699
700    #[tokio::test]
701    async fn engine_unknown_workflow_returns_error() {
702        let engine = create_test_engine();
703        let result = engine
704            .run_handler("unknown", TriggerKind::Manual, json!({}))
705            .await;
706        assert!(result.is_err());
707        match result {
708            Err(EngineError::InvalidWorkflow(msg)) => {
709                assert!(msg.contains("no handler registered"));
710            }
711            _ => panic!("expected InvalidWorkflow error"),
712        }
713    }
714
715    #[tokio::test]
716    async fn engine_run_inline_happy_path() {
717        let engine = create_test_engine();
718        let workflow = Workflow::new("simple")
719            .shell("test", ShellConfig::new("echo hello"))
720            .build()
721            .unwrap();
722
723        let result = engine
724            .run_inline(&workflow, TriggerKind::Manual, json!({}))
725            .await;
726        assert!(result.is_ok());
727        let run = result.unwrap();
728        assert_eq!(run.status.state, RunStatus::Completed);
729    }
730
731    #[tokio::test]
732    async fn engine_run_inline_step_failure_marks_failed() {
733        let engine = create_test_engine();
734        let workflow = Workflow::new("failing")
735            .shell("fail", ShellConfig::new("exit 1"))
736            .build()
737            .unwrap();
738
739        let result = engine
740            .run_inline(&workflow, TriggerKind::Manual, json!({}))
741            .await;
742        assert!(result.is_err());
743    }
744
745    #[tokio::test]
746    async fn engine_enqueue_creates_pending_run() {
747        let engine = create_test_engine();
748        let workflow = Workflow::new("queued")
749            .shell("test", ShellConfig::new("echo queued"))
750            .build()
751            .unwrap();
752
753        let run = engine
754            .enqueue(&workflow, TriggerKind::Manual, json!({}), 3)
755            .await
756            .unwrap();
757        assert_eq!(run.status.state, RunStatus::Pending);
758        assert_eq!(run.max_retries, 3);
759    }
760
761    #[tokio::test]
762    async fn engine_enqueue_handler_creates_pending_run() {
763        let mut engine = create_test_engine();
764        engine.register(EchoWorkflow).unwrap();
765
766        let run = engine
767            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
768            .await
769            .unwrap();
770        assert_eq!(run.status.state, RunStatus::Pending);
771        assert_eq!(run.workflow_name, "echo-workflow");
772    }
773
774    #[tokio::test]
775    async fn engine_register_boxed() {
776        let mut engine = create_test_engine();
777        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
778        let result = engine.register_boxed(handler);
779        assert!(result.is_ok());
780        assert_eq!(engine.handler_names().len(), 1);
781    }
782
783    #[tokio::test]
784    async fn engine_store_and_provider_accessors() {
785        let store = Arc::new(InMemoryStore::new());
786        let inner = ClaudeCodeProvider::new();
787        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
788            inner,
789            "/tmp/ironflow-fixtures",
790        ));
791        let engine = Engine::new(store.clone(), provider.clone());
792
793        // Verify accessors return references
794        let _ = engine.store();
795        let _ = engine.provider();
796    }
797}