Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Instant;
12
13use chrono::Utc;
14use serde_json::Value;
15use tracing::{error, info};
16use uuid::Uuid;
17
18use ironflow_core::provider::AgentProvider;
19use ironflow_store::error::StoreError;
20use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
21use ironflow_store::store::RunStore;
22
23use crate::context::WorkflowContext;
24use crate::error::EngineError;
25use crate::handler::{WorkflowHandler, WorkflowInfo};
26
27/// The workflow orchestration engine.
28///
29/// Holds references to the store, agent provider, and a registry of
30/// [`WorkflowHandler`]s.
31///
32/// # Examples
33///
34/// ```no_run
35/// use std::sync::Arc;
36/// use ironflow_engine::engine::Engine;
37/// use ironflow_engine::config::ShellConfig;
38/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
39/// use ironflow_engine::context::WorkflowContext;
40/// use ironflow_store::memory::InMemoryStore;
41/// use ironflow_store::models::TriggerKind;
42/// use ironflow_core::providers::claude::ClaudeCodeProvider;
43/// use serde_json::json;
44///
45/// struct CiWorkflow;
46/// impl WorkflowHandler for CiWorkflow {
47///     fn name(&self) -> &str { "ci" }
48///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
49///         Box::pin(async move {
50///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
51///             Ok(())
52///         })
53///     }
54/// }
55///
56/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
57/// let store = Arc::new(InMemoryStore::new());
58/// let provider = Arc::new(ClaudeCodeProvider::new());
59/// let mut engine = Engine::new(store, provider);
60/// engine.register(CiWorkflow)?;
61///
62/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
63/// println!("Run {} completed with status {:?}", run.id, run.status);
64/// # Ok(())
65/// # }
66/// ```
67pub struct Engine {
68    store: Arc<dyn RunStore>,
69    provider: Arc<dyn AgentProvider>,
70    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
71}
72
73impl Engine {
74    /// Create a new engine with the given store and agent provider.
75    ///
76    /// # Examples
77    ///
78    /// ```no_run
79    /// use std::sync::Arc;
80    /// use ironflow_engine::engine::Engine;
81    /// use ironflow_store::memory::InMemoryStore;
82    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
83    ///
84    /// let engine = Engine::new(
85    ///     Arc::new(InMemoryStore::new()),
86    ///     Arc::new(ClaudeCodeProvider::new()),
87    /// );
88    /// ```
89    pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
90        Self {
91            store,
92            provider,
93            handlers: HashMap::new(),
94        }
95    }
96
97    /// Returns a reference to the backing store.
98    pub fn store(&self) -> &Arc<dyn RunStore> {
99        &self.store
100    }
101
102    /// Returns a reference to the agent provider.
103    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
104        &self.provider
105    }
106
107    /// Build a [`WorkflowContext`] with access to the handler registry.
108    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
109        let handlers = self.handlers.clone();
110        let resolver: crate::context::HandlerResolver =
111            Arc::new(move |name: &str| handlers.get(name).cloned());
112        WorkflowContext::with_handler_resolver(
113            run_id,
114            self.store.clone(),
115            self.provider.clone(),
116            resolver,
117        )
118    }
119
120    // -----------------------------------------------------------------------
121    // Handler registration
122    // -----------------------------------------------------------------------
123
124    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
125    ///
126    /// The handler is looked up by [`WorkflowHandler::name`] when executing
127    /// or enqueuing.
128    ///
129    /// # Errors
130    ///
131    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
132    /// name is already registered.
133    ///
134    /// # Examples
135    ///
136    /// ```no_run
137    /// use std::sync::Arc;
138    /// use ironflow_engine::engine::Engine;
139    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
140    /// use ironflow_engine::context::WorkflowContext;
141    /// use ironflow_engine::config::ShellConfig;
142    /// use ironflow_store::memory::InMemoryStore;
143    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
144    ///
145    /// struct MyWorkflow;
146    /// impl WorkflowHandler for MyWorkflow {
147    ///     fn name(&self) -> &str { "my-workflow" }
148    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
149    ///         Box::pin(async move {
150    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
151    ///             Ok(())
152    ///         })
153    ///     }
154    /// }
155    ///
156    /// let mut engine = Engine::new(
157    ///     Arc::new(InMemoryStore::new()),
158    ///     Arc::new(ClaudeCodeProvider::new()),
159    /// );
160    /// engine.register(MyWorkflow)?;
161    /// # Ok::<(), ironflow_engine::error::EngineError>(())
162    /// ```
163    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
164        let name = handler.name().to_string();
165        if self.handlers.contains_key(&name) {
166            return Err(EngineError::InvalidWorkflow(format!(
167                "handler '{}' already registered",
168                name
169            )));
170        }
171        self.handlers.insert(name, Arc::new(handler));
172        Ok(())
173    }
174
175    /// Register a pre-boxed workflow handler.
176    ///
177    /// # Errors
178    ///
179    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
180    /// name is already registered.
181    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
182        let name = handler.name().to_string();
183        if self.handlers.contains_key(&name) {
184            return Err(EngineError::InvalidWorkflow(format!(
185                "handler '{}' already registered",
186                name
187            )));
188        }
189        self.handlers.insert(name, Arc::from(handler));
190        Ok(())
191    }
192
193    /// Get a registered handler by name.
194    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
195        self.handlers.get(name)
196    }
197
198    /// List registered handler names.
199    pub fn handler_names(&self) -> Vec<&str> {
200        self.handlers.keys().map(|s| s.as_str()).collect()
201    }
202
203    /// Get detailed info about a registered workflow handler.
204    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
205        self.handlers.get(name).map(|h| h.describe())
206    }
207
208    // -----------------------------------------------------------------------
209    // Dynamic workflow execution (WorkflowHandler)
210    // -----------------------------------------------------------------------
211
212    /// Execute a registered handler inline.
213    ///
214    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
215    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
216    ///
217    /// # Errors
218    ///
219    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
220    /// with that name. Returns [`EngineError`] if execution fails.
221    ///
222    /// # Examples
223    ///
224    /// ```no_run
225    /// use std::sync::Arc;
226    /// use ironflow_engine::engine::Engine;
227    /// use ironflow_store::memory::InMemoryStore;
228    /// use ironflow_store::models::TriggerKind;
229    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
230    /// use serde_json::json;
231    ///
232    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
233    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
234    /// # Ok(())
235    /// # }
236    /// ```
237    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
238    pub async fn run_handler(
239        &self,
240        handler_name: &str,
241        trigger: TriggerKind,
242        payload: Value,
243    ) -> Result<Run, EngineError> {
244        let handler = self
245            .handlers
246            .get(handler_name)
247            .ok_or_else(|| {
248                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
249            })?
250            .clone();
251
252        let run = self
253            .store
254            .create_run(NewRun {
255                workflow_name: handler_name.to_string(),
256                trigger,
257                payload,
258                max_retries: 0,
259            })
260            .await?;
261
262        let run_id = run.id;
263        info!(run_id = %run_id, "run created");
264
265        self.store
266            .update_run_status(run_id, RunStatus::Running)
267            .await?;
268
269        let run_start = Instant::now();
270        let mut ctx = self.build_context(run_id);
271
272        let result = handler.execute(&mut ctx).await;
273        self.finalize_run(run_id, result, &ctx, run_start).await
274    }
275
276    /// Enqueue a handler-based workflow for worker execution.
277    ///
278    /// The workflow name is stored in the run. The worker looks up the
279    /// handler by name when executing.
280    ///
281    /// # Errors
282    ///
283    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
284    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
285    pub async fn enqueue_handler(
286        &self,
287        handler_name: &str,
288        trigger: TriggerKind,
289        payload: Value,
290        max_retries: u32,
291    ) -> Result<Run, EngineError> {
292        if !self.handlers.contains_key(handler_name) {
293            return Err(EngineError::InvalidWorkflow(format!(
294                "no handler registered: {handler_name}"
295            )));
296        }
297
298        let run = self
299            .store
300            .create_run(NewRun {
301                workflow_name: handler_name.to_string(),
302                trigger,
303                payload,
304                max_retries,
305            })
306            .await?;
307
308        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
309        Ok(run)
310    }
311
312    /// Execute a handler-based run (used by the worker after pick_next_pending).
313    ///
314    /// Looks up the handler by the run's `workflow_name` and executes it
315    /// with a fresh [`WorkflowContext`].
316    ///
317    /// # Errors
318    ///
319    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
320    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
321    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
322        let run = self
323            .store
324            .get_run(run_id)
325            .await?
326            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
327
328        let handler = self
329            .handlers
330            .get(&run.workflow_name)
331            .ok_or_else(|| {
332                EngineError::InvalidWorkflow(format!(
333                    "no handler registered: {}",
334                    run.workflow_name
335                ))
336            })?
337            .clone();
338
339        let run_start = Instant::now();
340        let mut ctx = self.build_context(run_id);
341
342        let result = handler.execute(&mut ctx).await;
343        self.finalize_run(run_id, result, &ctx, run_start).await
344    }
345
346    /// Execute a run by its ID (used by the worker after pick_next_pending).
347    ///
348    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
349    ///
350    /// # Errors
351    ///
352    /// Returns [`EngineError`] if the run is not found or execution fails.
353    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
354    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
355        self.execute_handler_run(run_id).await
356    }
357
358    /// Finalize a run with the given result and context.
359    ///
360    /// On success: updates run to Completed with cost, duration, and completed_at.
361    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
362    /// Always: fetches and returns the final Run.
363    ///
364    /// TODO: `get_run` at the end could be optimized by using an `update_run_returning`
365    /// method if the store supports it.
366    async fn finalize_run(
367        &self,
368        run_id: Uuid,
369        result: Result<(), EngineError>,
370        ctx: &WorkflowContext,
371        run_start: Instant,
372    ) -> Result<Run, EngineError> {
373        let total_duration = run_start.elapsed().as_millis() as u64;
374        let completed_at = Utc::now();
375
376        match result {
377            Ok(()) => {
378                self.store
379                    .update_run(
380                        run_id,
381                        RunUpdate {
382                            status: Some(RunStatus::Completed),
383                            cost_usd: Some(ctx.total_cost_usd()),
384                            duration_ms: Some(total_duration),
385                            completed_at: Some(completed_at),
386                            ..RunUpdate::default()
387                        },
388                    )
389                    .await?;
390
391                info!(
392                    run_id = %run_id,
393                    cost_usd = %ctx.total_cost_usd(),
394                    duration_ms = total_duration,
395                    "run completed"
396                );
397            }
398            Err(err) => {
399                if let Err(store_err) = self
400                    .store
401                    .update_run(
402                        run_id,
403                        RunUpdate {
404                            status: Some(RunStatus::Failed),
405                            error: Some(err.to_string()),
406                            cost_usd: Some(ctx.total_cost_usd()),
407                            duration_ms: Some(total_duration),
408                            completed_at: Some(completed_at),
409                            ..RunUpdate::default()
410                        },
411                    )
412                    .await
413                {
414                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
415                }
416
417                error!(run_id = %run_id, error = %err, "run failed");
418                return Err(err);
419            }
420        }
421
422        self.store
423            .get_run(run_id)
424            .await?
425            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))
426    }
427}
428
429impl std::fmt::Debug for Engine {
430    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431        f.debug_struct("Engine")
432            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
433            .finish_non_exhaustive()
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440    use crate::config::ShellConfig;
441    use crate::handler::{HandlerFuture, WorkflowHandler};
442    use ironflow_core::providers::claude::ClaudeCodeProvider;
443    use ironflow_core::providers::record_replay::RecordReplayProvider;
444    use ironflow_store::memory::InMemoryStore;
445    use serde_json::json;
446
447    // Test handler that echoes a message via shell
448    struct EchoWorkflow;
449
450    impl WorkflowHandler for EchoWorkflow {
451        fn name(&self) -> &str {
452            "echo-workflow"
453        }
454
455        fn describe(&self) -> WorkflowInfo {
456            WorkflowInfo {
457                description: "A simple workflow that echoes hello".to_string(),
458                source_code: None,
459                sub_workflows: Vec::new(),
460            }
461        }
462
463        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
464            Box::pin(async move {
465                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
466                Ok(())
467            })
468        }
469    }
470
471    // Test handler that fails
472    struct FailingWorkflow;
473
474    impl WorkflowHandler for FailingWorkflow {
475        fn name(&self) -> &str {
476            "failing-workflow"
477        }
478
479        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
480            Box::pin(async move {
481                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
482                Ok(())
483            })
484        }
485    }
486
487    fn create_test_engine() -> Engine {
488        let store = Arc::new(InMemoryStore::new());
489        let inner = ClaudeCodeProvider::new();
490        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
491            inner,
492            "/tmp/ironflow-fixtures",
493        ));
494        Engine::new(store, provider)
495    }
496
497    #[test]
498    fn engine_new_creates_instance() {
499        let engine = create_test_engine();
500        assert_eq!(engine.handler_names().len(), 0);
501    }
502
503    #[test]
504    fn engine_register_handler() {
505        let mut engine = create_test_engine();
506        let result = engine.register(EchoWorkflow);
507        assert!(result.is_ok());
508        assert_eq!(engine.handler_names().len(), 1);
509        assert!(engine.handler_names().contains(&"echo-workflow"));
510    }
511
512    #[test]
513    fn engine_register_duplicate_returns_error() {
514        let mut engine = create_test_engine();
515        engine.register(EchoWorkflow).unwrap();
516        let result = engine.register(EchoWorkflow);
517        assert!(result.is_err());
518    }
519
520    #[test]
521    fn engine_get_handler_found() {
522        let mut engine = create_test_engine();
523        engine.register(EchoWorkflow).unwrap();
524        let handler = engine.get_handler("echo-workflow");
525        assert!(handler.is_some());
526    }
527
528    #[test]
529    fn engine_get_handler_not_found() {
530        let engine = create_test_engine();
531        let handler = engine.get_handler("nonexistent");
532        assert!(handler.is_none());
533    }
534
535    #[test]
536    fn engine_handler_names_lists_all() {
537        let mut engine = create_test_engine();
538        engine.register(EchoWorkflow).unwrap();
539        engine.register(FailingWorkflow).unwrap();
540        let names = engine.handler_names();
541        assert_eq!(names.len(), 2);
542        assert!(names.contains(&"echo-workflow"));
543        assert!(names.contains(&"failing-workflow"));
544    }
545
546    #[test]
547    fn engine_handler_info_returns_description() {
548        let mut engine = create_test_engine();
549        engine.register(EchoWorkflow).unwrap();
550        let info = engine.handler_info("echo-workflow");
551        assert!(info.is_some());
552        let info = info.unwrap();
553        assert_eq!(info.description, "A simple workflow that echoes hello");
554    }
555
556    #[tokio::test]
557    async fn engine_unknown_workflow_returns_error() {
558        let engine = create_test_engine();
559        let result = engine
560            .run_handler("unknown", TriggerKind::Manual, json!({}))
561            .await;
562        assert!(result.is_err());
563        match result {
564            Err(EngineError::InvalidWorkflow(msg)) => {
565                assert!(msg.contains("no handler registered"));
566            }
567            _ => panic!("expected InvalidWorkflow error"),
568        }
569    }
570
571    #[tokio::test]
572    async fn engine_enqueue_handler_creates_pending_run() {
573        let mut engine = create_test_engine();
574        engine.register(EchoWorkflow).unwrap();
575
576        let run = engine
577            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
578            .await
579            .unwrap();
580        assert_eq!(run.status.state, RunStatus::Pending);
581        assert_eq!(run.workflow_name, "echo-workflow");
582    }
583
584    #[tokio::test]
585    async fn engine_register_boxed() {
586        let mut engine = create_test_engine();
587        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
588        let result = engine.register_boxed(handler);
589        assert!(result.is_ok());
590        assert_eq!(engine.handler_names().len(), 1);
591    }
592
593    #[tokio::test]
594    async fn engine_store_and_provider_accessors() {
595        let store = Arc::new(InMemoryStore::new());
596        let inner = ClaudeCodeProvider::new();
597        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
598            inner,
599            "/tmp/ironflow-fixtures",
600        ));
601        let engine = Engine::new(store.clone(), provider.clone());
602
603        // Verify accessors return references
604        let _ = engine.store();
605        let _ = engine.provider();
606    }
607
608    // -----------------------------------------------------------------------
609    // Operation trait tests
610    // -----------------------------------------------------------------------
611
612    use crate::operation::Operation;
613    use ironflow_store::models::StepKind;
614    use std::future::Future;
615    use std::pin::Pin;
616
617    struct FakeGitlabOp {
618        project_id: u64,
619        title: String,
620    }
621
622    impl Operation for FakeGitlabOp {
623        fn kind(&self) -> &str {
624            "gitlab"
625        }
626
627        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
628            Box::pin(async move {
629                Ok(json!({
630                    "issue_id": 42,
631                    "project_id": self.project_id,
632                    "title": self.title,
633                }))
634            })
635        }
636
637        fn input(&self) -> Option<Value> {
638            Some(json!({
639                "project_id": self.project_id,
640                "title": self.title,
641            }))
642        }
643    }
644
645    struct FailingOp;
646
647    impl Operation for FailingOp {
648        fn kind(&self) -> &str {
649            "broken-service"
650        }
651
652        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
653            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
654        }
655    }
656
657    struct OperationWorkflow;
658
659    impl WorkflowHandler for OperationWorkflow {
660        fn name(&self) -> &str {
661            "operation-workflow"
662        }
663
664        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
665            Box::pin(async move {
666                let op = FakeGitlabOp {
667                    project_id: 123,
668                    title: "Bug report".to_string(),
669                };
670                ctx.operation("create-issue", &op).await?;
671                Ok(())
672            })
673        }
674    }
675
676    struct FailingOperationWorkflow;
677
678    impl WorkflowHandler for FailingOperationWorkflow {
679        fn name(&self) -> &str {
680            "failing-operation-workflow"
681        }
682
683        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
684            Box::pin(async move {
685                ctx.operation("broken-call", &FailingOp).await?;
686                Ok(())
687            })
688        }
689    }
690
691    struct MixedWorkflow;
692
693    impl WorkflowHandler for MixedWorkflow {
694        fn name(&self) -> &str {
695            "mixed-workflow"
696        }
697
698        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
699            Box::pin(async move {
700                ctx.shell("build", ShellConfig::new("echo built")).await?;
701                let op = FakeGitlabOp {
702                    project_id: 456,
703                    title: "Deploy done".to_string(),
704                };
705                let result = ctx.operation("notify-gitlab", &op).await?;
706                assert_eq!(result.output["issue_id"], 42);
707                Ok(())
708            })
709        }
710    }
711
712    #[tokio::test]
713    async fn operation_step_happy_path() {
714        let mut engine = create_test_engine();
715        engine.register(OperationWorkflow).unwrap();
716
717        let run = engine
718            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
719            .await
720            .unwrap();
721
722        assert_eq!(run.status.state, RunStatus::Completed);
723
724        let steps = engine.store().list_steps(run.id).await.unwrap();
725
726        assert_eq!(steps.len(), 1);
727        assert_eq!(steps[0].name, "create-issue");
728        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
729        assert_eq!(
730            steps[0].status.state,
731            ironflow_store::models::StepStatus::Completed
732        );
733
734        let output = steps[0].output.as_ref().unwrap();
735        assert_eq!(output["issue_id"], 42);
736        assert_eq!(output["project_id"], 123);
737
738        let input = steps[0].input.as_ref().unwrap();
739        assert_eq!(input["project_id"], 123);
740        assert_eq!(input["title"], "Bug report");
741    }
742
743    #[tokio::test]
744    async fn operation_step_failure_marks_run_failed() {
745        let mut engine = create_test_engine();
746        engine.register(FailingOperationWorkflow).unwrap();
747
748        let result = engine
749            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
750            .await;
751
752        assert!(result.is_err());
753    }
754
755    #[tokio::test]
756    async fn operation_mixed_with_shell_steps() {
757        let mut engine = create_test_engine();
758        engine.register(MixedWorkflow).unwrap();
759
760        let run = engine
761            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
762            .await
763            .unwrap();
764
765        assert_eq!(run.status.state, RunStatus::Completed);
766
767        let steps = engine.store().list_steps(run.id).await.unwrap();
768
769        assert_eq!(steps.len(), 2);
770        assert_eq!(steps[0].kind, StepKind::Shell);
771        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
772        assert_eq!(steps[0].position, 0);
773        assert_eq!(steps[1].position, 1);
774    }
775}