Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::Utc;
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::RunStore;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33/// The workflow orchestration engine.
34///
35/// Holds references to the store, agent provider, and a registry of
36/// [`WorkflowHandler`]s.
37///
38/// # Examples
39///
40/// ```no_run
41/// use std::sync::Arc;
42/// use ironflow_engine::engine::Engine;
43/// use ironflow_engine::config::ShellConfig;
44/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
45/// use ironflow_engine::context::WorkflowContext;
46/// use ironflow_store::memory::InMemoryStore;
47/// use ironflow_store::models::TriggerKind;
48/// use ironflow_core::providers::claude::ClaudeCodeProvider;
49/// use serde_json::json;
50///
51/// struct CiWorkflow;
52/// impl WorkflowHandler for CiWorkflow {
53///     fn name(&self) -> &str { "ci" }
54///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
55///         Box::pin(async move {
56///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
57///             Ok(())
58///         })
59///     }
60/// }
61///
62/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
63/// let store = Arc::new(InMemoryStore::new());
64/// let provider = Arc::new(ClaudeCodeProvider::new());
65/// let mut engine = Engine::new(store, provider);
66/// engine.register(CiWorkflow)?;
67///
68/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
69/// println!("Run {} completed with status {:?}", run.id, run.status);
70/// # Ok(())
71/// # }
72/// ```
73pub struct Engine {
74    store: Arc<dyn RunStore>,
75    provider: Arc<dyn AgentProvider>,
76    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77    event_publisher: EventPublisher,
78}
79
80/// Validate a workflow category path.
81///
82/// A category is a `/`-separated list of non-empty segments. This function
83/// rejects empty paths, leading or trailing `/`, consecutive `/`, and
84/// segments containing only whitespace.
85///
86/// # Errors
87///
88/// Returns [`EngineError::InvalidWorkflow`] when the category is malformed.
89fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
90    let reject = |reason: &str| {
91        Err(EngineError::InvalidWorkflow(format!(
92            "handler '{handler_name}' has invalid category '{category}': {reason}"
93        )))
94    };
95
96    if category.is_empty() {
97        return reject("empty category");
98    }
99    if category.starts_with('/') {
100        return reject("leading '/'");
101    }
102    if category.ends_with('/') {
103        return reject("trailing '/'");
104    }
105    for segment in category.split('/') {
106        if segment.is_empty() {
107            return reject("empty segment (double '/')");
108        }
109        if segment.trim().is_empty() {
110            return reject("whitespace-only segment");
111        }
112    }
113    Ok(())
114}
115
116impl Engine {
117    /// Create a new engine with the given store and agent provider.
118    ///
119    /// # Examples
120    ///
121    /// ```no_run
122    /// use std::sync::Arc;
123    /// use ironflow_engine::engine::Engine;
124    /// use ironflow_store::memory::InMemoryStore;
125    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
126    ///
127    /// let engine = Engine::new(
128    ///     Arc::new(InMemoryStore::new()),
129    ///     Arc::new(ClaudeCodeProvider::new()),
130    /// );
131    /// ```
132    pub fn new(store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
133        Self {
134            store,
135            provider,
136            handlers: HashMap::new(),
137            event_publisher: EventPublisher::new(),
138        }
139    }
140
141    /// Returns a reference to the backing store.
142    pub fn store(&self) -> &Arc<dyn RunStore> {
143        &self.store
144    }
145
146    /// Returns a reference to the agent provider.
147    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
148        &self.provider
149    }
150
151    /// Build a [`WorkflowContext`] with access to the handler registry.
152    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
153        let handlers = self.handlers.clone();
154        let resolver: crate::context::HandlerResolver =
155            Arc::new(move |name: &str| handlers.get(name).cloned());
156        WorkflowContext::with_handler_resolver(
157            run_id,
158            self.store.clone(),
159            self.provider.clone(),
160            resolver,
161        )
162    }
163
164    // -----------------------------------------------------------------------
165    // Handler registration
166    // -----------------------------------------------------------------------
167
168    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
169    ///
170    /// The handler is looked up by [`WorkflowHandler::name`] when executing
171    /// or enqueuing.
172    ///
173    /// # Errors
174    ///
175    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
176    /// name is already registered.
177    ///
178    /// # Examples
179    ///
180    /// ```no_run
181    /// use std::sync::Arc;
182    /// use ironflow_engine::engine::Engine;
183    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
184    /// use ironflow_engine::context::WorkflowContext;
185    /// use ironflow_engine::config::ShellConfig;
186    /// use ironflow_store::memory::InMemoryStore;
187    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
188    ///
189    /// struct MyWorkflow;
190    /// impl WorkflowHandler for MyWorkflow {
191    ///     fn name(&self) -> &str { "my-workflow" }
192    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
193    ///         Box::pin(async move {
194    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
195    ///             Ok(())
196    ///         })
197    ///     }
198    /// }
199    ///
200    /// let mut engine = Engine::new(
201    ///     Arc::new(InMemoryStore::new()),
202    ///     Arc::new(ClaudeCodeProvider::new()),
203    /// );
204    /// engine.register(MyWorkflow)?;
205    /// # Ok::<(), ironflow_engine::error::EngineError>(())
206    /// ```
207    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
208        let name = handler.name().to_string();
209        if self.handlers.contains_key(&name) {
210            return Err(EngineError::InvalidWorkflow(format!(
211                "handler '{}' already registered",
212                name
213            )));
214        }
215        if let Some(category) = handler.category() {
216            validate_category(&name, category)?;
217        }
218        self.handlers.insert(name, Arc::new(handler));
219        Ok(())
220    }
221
222    /// Register a pre-boxed workflow handler.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
227    /// name is already registered or if its category is invalid.
228    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
229        let name = handler.name().to_string();
230        if self.handlers.contains_key(&name) {
231            return Err(EngineError::InvalidWorkflow(format!(
232                "handler '{}' already registered",
233                name
234            )));
235        }
236        if let Some(category) = handler.category() {
237            validate_category(&name, category)?;
238        }
239        self.handlers.insert(name, Arc::from(handler));
240        Ok(())
241    }
242
243    /// Get a registered handler by name.
244    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
245        self.handlers.get(name)
246    }
247
248    /// List registered handler names.
249    pub fn handler_names(&self) -> Vec<&str> {
250        self.handlers.keys().map(|s| s.as_str()).collect()
251    }
252
253    /// Get detailed info about a registered workflow handler.
254    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
255        self.handlers.get(name).map(|h| h.describe())
256    }
257
258    /// Register an event subscriber for domain events.
259    ///
260    /// The subscriber is called only for events whose type is in
261    /// `event_types`. Pass [`Event::ALL`] to receive every event.
262    ///
263    /// # Examples
264    ///
265    /// ```no_run
266    /// use ironflow_engine::engine::Engine;
267    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
268    /// use ironflow_store::memory::InMemoryStore;
269    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
270    /// use std::sync::Arc;
271    ///
272    /// let mut engine = Engine::new(
273    ///     Arc::new(InMemoryStore::new()),
274    ///     Arc::new(ClaudeCodeProvider::new()),
275    /// );
276    ///
277    /// engine.subscribe(
278    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
279    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
280    /// );
281    /// ```
282    pub fn subscribe(
283        &mut self,
284        subscriber: impl EventSubscriber + 'static,
285        event_types: &[&'static str],
286    ) {
287        self.event_publisher.subscribe(subscriber, event_types);
288    }
289
290    /// Returns a reference to the event publisher.
291    ///
292    /// Useful for publishing events from outside the engine (e.g. auth
293    /// routes in the API layer).
294    pub fn event_publisher(&self) -> &EventPublisher {
295        &self.event_publisher
296    }
297
298    // -----------------------------------------------------------------------
299    // Dynamic workflow execution (WorkflowHandler)
300    // -----------------------------------------------------------------------
301
302    /// Execute a registered handler inline.
303    ///
304    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
305    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
310    /// with that name. Returns [`EngineError`] if execution fails.
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// use std::sync::Arc;
316    /// use ironflow_engine::engine::Engine;
317    /// use ironflow_store::memory::InMemoryStore;
318    /// use ironflow_store::models::TriggerKind;
319    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
320    /// use serde_json::json;
321    ///
322    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
323    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
324    /// # Ok(())
325    /// # }
326    /// ```
327    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
328    pub async fn run_handler(
329        &self,
330        handler_name: &str,
331        trigger: TriggerKind,
332        payload: Value,
333    ) -> Result<Run, EngineError> {
334        let handler = self
335            .handlers
336            .get(handler_name)
337            .ok_or_else(|| {
338                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
339            })?
340            .clone();
341
342        let run = self
343            .store
344            .create_run(NewRun {
345                workflow_name: handler_name.to_string(),
346                trigger,
347                payload,
348                max_retries: 0,
349            })
350            .await?;
351
352        let run_id = run.id;
353        info!(run_id = %run_id, "run created");
354
355        self.store
356            .update_run_status(run_id, RunStatus::Running)
357            .await?;
358
359        #[cfg(feature = "prometheus")]
360        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
361
362        let run_start = Instant::now();
363        let mut ctx = self.build_context(run_id);
364
365        let result = handler.execute(&mut ctx).await;
366        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
367            .await
368    }
369
370    /// Enqueue a handler-based workflow for worker execution.
371    ///
372    /// The workflow name is stored in the run. The worker looks up the
373    /// handler by name when executing.
374    ///
375    /// # Errors
376    ///
377    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
378    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
379    pub async fn enqueue_handler(
380        &self,
381        handler_name: &str,
382        trigger: TriggerKind,
383        payload: Value,
384        max_retries: u32,
385    ) -> Result<Run, EngineError> {
386        if !self.handlers.contains_key(handler_name) {
387            return Err(EngineError::InvalidWorkflow(format!(
388                "no handler registered: {handler_name}"
389            )));
390        }
391
392        let run = self
393            .store
394            .create_run(NewRun {
395                workflow_name: handler_name.to_string(),
396                trigger,
397                payload,
398                max_retries,
399            })
400            .await?;
401
402        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
403        Ok(run)
404    }
405
406    /// Execute a handler-based run (used by the worker after pick_next_pending).
407    ///
408    /// Looks up the handler by the run's `workflow_name` and executes it
409    /// with a fresh [`WorkflowContext`].
410    ///
411    /// # Errors
412    ///
413    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
414    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
415    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
416        let run = self
417            .store
418            .get_run(run_id)
419            .await?
420            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
421
422        let handler = self
423            .handlers
424            .get(&run.workflow_name)
425            .ok_or_else(|| {
426                EngineError::InvalidWorkflow(format!(
427                    "no handler registered: {}",
428                    run.workflow_name
429                ))
430            })?
431            .clone();
432
433        #[cfg(feature = "prometheus")]
434        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
435
436        let run_start = Instant::now();
437        let mut ctx = self.build_context(run_id);
438
439        let result = handler.execute(&mut ctx).await;
440        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
441            .await
442    }
443
444    /// Execute a run by its ID (used by the worker after pick_next_pending).
445    ///
446    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
447    ///
448    /// # Errors
449    ///
450    /// Returns [`EngineError`] if the run is not found or execution fails.
451    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
452    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
453        self.execute_handler_run(run_id).await
454    }
455
456    /// Resume a run after human approval.
457    ///
458    /// Re-executes the handler with step replay: completed steps return
459    /// cached output, approved approval steps are skipped, and execution
460    /// continues from the first unexecuted step.
461    ///
462    /// Supports multiple approval gates -- each resume replays all prior
463    /// steps and stops at the next approval (or completes the run).
464    ///
465    /// # Errors
466    ///
467    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
468    /// Returns [`EngineError`] if execution fails or hits another approval.
469    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
470    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
471        let run = self
472            .store
473            .get_run(run_id)
474            .await?
475            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
476
477        let handler = self
478            .handlers
479            .get(&run.workflow_name)
480            .ok_or_else(|| {
481                EngineError::InvalidWorkflow(format!(
482                    "no handler registered: {}",
483                    run.workflow_name
484                ))
485            })?
486            .clone();
487
488        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
489
490        let run_start = Instant::now();
491        let mut ctx = self.build_context(run_id);
492        ctx.load_replay_steps().await?;
493
494        let result = handler.execute(&mut ctx).await;
495        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
496            .await
497    }
498
499    /// Finalize a run with the given result and context.
500    ///
501    /// On success: updates run to Completed with cost, duration, and completed_at.
502    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
503    /// Always: fetches and returns the final Run.
504    async fn finalize_run(
505        &self,
506        run_id: Uuid,
507        workflow_name: &str,
508        result: Result<(), EngineError>,
509        ctx: &WorkflowContext,
510        run_start: Instant,
511    ) -> Result<Run, EngineError> {
512        let total_duration = run_start.elapsed().as_millis() as u64;
513        let completed_at = Utc::now();
514
515        let final_status;
516        let final_run;
517
518        match result {
519            Ok(()) => {
520                final_status = RunStatus::Completed;
521                final_run = self
522                    .store
523                    .update_run_returning(
524                        run_id,
525                        RunUpdate {
526                            status: Some(RunStatus::Completed),
527                            cost_usd: Some(ctx.total_cost_usd()),
528                            duration_ms: Some(total_duration),
529                            completed_at: Some(completed_at),
530                            ..RunUpdate::default()
531                        },
532                    )
533                    .await?;
534
535                info!(
536                    run_id = %run_id,
537                    cost_usd = %ctx.total_cost_usd(),
538                    duration_ms = total_duration,
539                    "run completed"
540                );
541            }
542            Err(EngineError::ApprovalRequired {
543                run_id: approval_run_id,
544                step_id,
545                ref message,
546            }) => {
547                final_status = RunStatus::AwaitingApproval;
548                final_run = self
549                    .store
550                    .update_run_returning(
551                        run_id,
552                        RunUpdate {
553                            status: Some(RunStatus::AwaitingApproval),
554                            cost_usd: Some(ctx.total_cost_usd()),
555                            duration_ms: Some(total_duration),
556                            ..RunUpdate::default()
557                        },
558                    )
559                    .await?;
560
561                info!(
562                    run_id = %approval_run_id,
563                    step_id = %step_id,
564                    message = %message,
565                    "run awaiting approval"
566                );
567            }
568            Err(err) => {
569                final_status = RunStatus::Failed;
570                if let Err(store_err) = self
571                    .store
572                    .update_run(
573                        run_id,
574                        RunUpdate {
575                            status: Some(RunStatus::Failed),
576                            error: Some(err.to_string()),
577                            cost_usd: Some(ctx.total_cost_usd()),
578                            duration_ms: Some(total_duration),
579                            completed_at: Some(completed_at),
580                            ..RunUpdate::default()
581                        },
582                    )
583                    .await
584                {
585                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
586                }
587
588                error!(run_id = %run_id, error = %err, "run failed");
589
590                self.publish_run_status_changed(
591                    workflow_name,
592                    run_id,
593                    final_status,
594                    Some(err.to_string()),
595                    ctx,
596                    total_duration,
597                );
598
599                #[cfg(feature = "prometheus")]
600                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
601
602                return Err(err);
603            }
604        }
605
606        self.publish_run_status_changed(
607            workflow_name,
608            run_id,
609            final_status,
610            None,
611            ctx,
612            total_duration,
613        );
614
615        #[cfg(feature = "prometheus")]
616        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
617
618        Ok(final_run)
619    }
620
621    /// Emit Prometheus metrics for a completed run.
622    #[cfg(feature = "prometheus")]
623    fn emit_run_metrics(
624        &self,
625        workflow_name: &str,
626        status: RunStatus,
627        duration_ms: u64,
628        ctx: &WorkflowContext,
629    ) {
630        let status_str = status.to_string();
631        let wf = workflow_name.to_string();
632
633        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
634        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
635            .record(duration_ms as f64 / 1000.0);
636        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
637            ctx.total_cost_usd()
638                .to_string()
639                .parse::<f64>()
640                .unwrap_or(0.0),
641        );
642        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
643    }
644
645    /// Publish a run status changed event to all registered subscribers.
646    ///
647    /// `from` is always `Running` because `finalize_run` is only called
648    /// from a running state.
649    fn publish_run_status_changed(
650        &self,
651        workflow_name: &str,
652        run_id: Uuid,
653        to: RunStatus,
654        error: Option<String>,
655        ctx: &WorkflowContext,
656        duration_ms: u64,
657    ) {
658        let now = Utc::now();
659        let cost_usd = ctx.total_cost_usd();
660        let wf = workflow_name.to_string();
661
662        self.event_publisher.publish(Event::RunStatusChanged {
663            run_id,
664            workflow_name: wf.clone(),
665            from: RunStatus::Running,
666            to,
667            error: error.clone(),
668            cost_usd,
669            duration_ms,
670            at: now,
671        });
672
673        if to == RunStatus::Failed {
674            self.event_publisher.publish(Event::RunFailed {
675                run_id,
676                workflow_name: wf,
677                error,
678                cost_usd,
679                duration_ms,
680                at: now,
681            });
682        }
683    }
684}
685
686impl fmt::Debug for Engine {
687    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688        f.debug_struct("Engine")
689            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
690            .finish_non_exhaustive()
691    }
692}
693
694#[cfg(test)]
695mod tests {
696    use super::*;
697    use crate::config::ShellConfig;
698    use crate::handler::{HandlerFuture, WorkflowHandler};
699    use ironflow_core::providers::claude::ClaudeCodeProvider;
700    use ironflow_core::providers::record_replay::RecordReplayProvider;
701    use ironflow_store::memory::InMemoryStore;
702    use ironflow_store::models::StepStatus;
703    use serde_json::json;
704
705    // Test handler that echoes a message via shell
706    struct EchoWorkflow;
707
708    impl WorkflowHandler for EchoWorkflow {
709        fn name(&self) -> &str {
710            "echo-workflow"
711        }
712
713        fn describe(&self) -> WorkflowInfo {
714            WorkflowInfo {
715                description: "A simple workflow that echoes hello".to_string(),
716                source_code: None,
717                sub_workflows: Vec::new(),
718                category: None,
719            }
720        }
721
722        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
723            Box::pin(async move {
724                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
725                Ok(())
726            })
727        }
728    }
729
730    // Test handler that fails
731    struct FailingWorkflow;
732
733    impl WorkflowHandler for FailingWorkflow {
734        fn name(&self) -> &str {
735            "failing-workflow"
736        }
737
738        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
739            Box::pin(async move {
740                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
741                Ok(())
742            })
743        }
744    }
745
746    fn create_test_engine() -> Engine {
747        let store = Arc::new(InMemoryStore::new());
748        let inner = ClaudeCodeProvider::new();
749        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
750            inner,
751            "/tmp/ironflow-fixtures",
752        ));
753        Engine::new(store, provider)
754    }
755
756    #[test]
757    fn engine_new_creates_instance() {
758        let engine = create_test_engine();
759        assert_eq!(engine.handler_names().len(), 0);
760    }
761
762    #[test]
763    fn engine_register_handler() {
764        let mut engine = create_test_engine();
765        let result = engine.register(EchoWorkflow);
766        assert!(result.is_ok());
767        assert_eq!(engine.handler_names().len(), 1);
768        assert!(engine.handler_names().contains(&"echo-workflow"));
769    }
770
771    #[test]
772    fn engine_register_duplicate_returns_error() {
773        let mut engine = create_test_engine();
774        engine.register(EchoWorkflow).unwrap();
775        let result = engine.register(EchoWorkflow);
776        assert!(result.is_err());
777    }
778
779    #[test]
780    fn engine_get_handler_found() {
781        let mut engine = create_test_engine();
782        engine.register(EchoWorkflow).unwrap();
783        let handler = engine.get_handler("echo-workflow");
784        assert!(handler.is_some());
785    }
786
787    #[test]
788    fn engine_get_handler_not_found() {
789        let engine = create_test_engine();
790        let handler = engine.get_handler("nonexistent");
791        assert!(handler.is_none());
792    }
793
794    #[test]
795    fn engine_handler_names_lists_all() {
796        let mut engine = create_test_engine();
797        engine.register(EchoWorkflow).unwrap();
798        engine.register(FailingWorkflow).unwrap();
799        let names = engine.handler_names();
800        assert_eq!(names.len(), 2);
801        assert!(names.contains(&"echo-workflow"));
802        assert!(names.contains(&"failing-workflow"));
803    }
804
805    #[test]
806    fn engine_handler_info_returns_description() {
807        let mut engine = create_test_engine();
808        engine.register(EchoWorkflow).unwrap();
809        let info = engine.handler_info("echo-workflow");
810        assert!(info.is_some());
811        let info = info.unwrap();
812        assert_eq!(info.description, "A simple workflow that echoes hello");
813    }
814
815    struct CategorizedWorkflow;
816
817    impl WorkflowHandler for CategorizedWorkflow {
818        fn name(&self) -> &str {
819            "categorized"
820        }
821        fn category(&self) -> Option<&str> {
822            Some("data/etl")
823        }
824        fn execute<'a>(
825            &'a self,
826            _ctx: &'a mut WorkflowContext,
827        ) -> crate::handler::HandlerFuture<'a> {
828            Box::pin(async move { Ok(()) })
829        }
830    }
831
832    #[test]
833    fn engine_default_describe_propagates_category() {
834        let mut engine = create_test_engine();
835        engine.register(CategorizedWorkflow).unwrap();
836        let info = engine.handler_info("categorized").unwrap();
837        assert_eq!(info.category.as_deref(), Some("data/etl"));
838    }
839
840    #[test]
841    fn engine_default_describe_without_category() {
842        let mut engine = create_test_engine();
843        engine.register(EchoWorkflow).unwrap();
844        let info = engine.handler_info("echo-workflow").unwrap();
845        assert!(info.category.is_none());
846    }
847
848    struct BadCategoryWorkflow(&'static str);
849
850    impl WorkflowHandler for BadCategoryWorkflow {
851        fn name(&self) -> &str {
852            "bad-category"
853        }
854        fn category(&self) -> Option<&str> {
855            Some(self.0)
856        }
857        fn execute<'a>(
858            &'a self,
859            _ctx: &'a mut WorkflowContext,
860        ) -> crate::handler::HandlerFuture<'a> {
861            Box::pin(async move { Ok(()) })
862        }
863    }
864
865    #[test]
866    fn engine_register_rejects_empty_category() {
867        let mut engine = create_test_engine();
868        let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
869        match err {
870            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
871            other => panic!("expected InvalidWorkflow, got {other:?}"),
872        }
873    }
874
875    #[test]
876    fn engine_register_rejects_leading_slash_category() {
877        let mut engine = create_test_engine();
878        let err = engine
879            .register(BadCategoryWorkflow("/data/etl"))
880            .unwrap_err();
881        match err {
882            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
883            other => panic!("expected InvalidWorkflow, got {other:?}"),
884        }
885    }
886
887    #[test]
888    fn engine_register_rejects_trailing_slash_category() {
889        let mut engine = create_test_engine();
890        let err = engine
891            .register(BadCategoryWorkflow("data/etl/"))
892            .unwrap_err();
893        match err {
894            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
895            other => panic!("expected InvalidWorkflow, got {other:?}"),
896        }
897    }
898
899    #[test]
900    fn engine_register_rejects_double_slash_category() {
901        let mut engine = create_test_engine();
902        let err = engine
903            .register(BadCategoryWorkflow("data//etl"))
904            .unwrap_err();
905        match err {
906            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
907            other => panic!("expected InvalidWorkflow, got {other:?}"),
908        }
909    }
910
911    #[test]
912    fn engine_register_rejects_whitespace_only_segment_category() {
913        let mut engine = create_test_engine();
914        let err = engine
915            .register(BadCategoryWorkflow("data/ /etl"))
916            .unwrap_err();
917        match err {
918            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
919            other => panic!("expected InvalidWorkflow, got {other:?}"),
920        }
921    }
922
923    #[test]
924    fn engine_register_accepts_valid_nested_category() {
925        let mut engine = create_test_engine();
926        assert!(engine.register(CategorizedWorkflow).is_ok());
927    }
928
929    #[tokio::test]
930    async fn engine_unknown_workflow_returns_error() {
931        let engine = create_test_engine();
932        let result = engine
933            .run_handler("unknown", TriggerKind::Manual, json!({}))
934            .await;
935        assert!(result.is_err());
936        match result {
937            Err(EngineError::InvalidWorkflow(msg)) => {
938                assert!(msg.contains("no handler registered"));
939            }
940            _ => panic!("expected InvalidWorkflow error"),
941        }
942    }
943
944    #[tokio::test]
945    async fn engine_enqueue_handler_creates_pending_run() {
946        let mut engine = create_test_engine();
947        engine.register(EchoWorkflow).unwrap();
948
949        let run = engine
950            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
951            .await
952            .unwrap();
953        assert_eq!(run.status.state, RunStatus::Pending);
954        assert_eq!(run.workflow_name, "echo-workflow");
955    }
956
957    #[tokio::test]
958    async fn engine_register_boxed() {
959        let mut engine = create_test_engine();
960        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
961        let result = engine.register_boxed(handler);
962        assert!(result.is_ok());
963        assert_eq!(engine.handler_names().len(), 1);
964    }
965
966    #[tokio::test]
967    async fn engine_store_and_provider_accessors() {
968        let store = Arc::new(InMemoryStore::new());
969        let inner = ClaudeCodeProvider::new();
970        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
971            inner,
972            "/tmp/ironflow-fixtures",
973        ));
974        let engine = Engine::new(store.clone(), provider.clone());
975
976        // Verify accessors return references
977        let _ = engine.store();
978        let _ = engine.provider();
979    }
980
981    // -----------------------------------------------------------------------
982    // Operation trait tests
983    // -----------------------------------------------------------------------
984
985    use crate::operation::Operation;
986    use ironflow_store::models::StepKind;
987    use std::future::Future;
988    use std::pin::Pin;
989
990    struct FakeGitlabOp {
991        project_id: u64,
992        title: String,
993    }
994
995    impl Operation for FakeGitlabOp {
996        fn kind(&self) -> &str {
997            "gitlab"
998        }
999
1000        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1001            Box::pin(async move {
1002                Ok(json!({
1003                    "issue_id": 42,
1004                    "project_id": self.project_id,
1005                    "title": self.title,
1006                }))
1007            })
1008        }
1009
1010        fn input(&self) -> Option<Value> {
1011            Some(json!({
1012                "project_id": self.project_id,
1013                "title": self.title,
1014            }))
1015        }
1016    }
1017
1018    struct FailingOp;
1019
1020    impl Operation for FailingOp {
1021        fn kind(&self) -> &str {
1022            "broken-service"
1023        }
1024
1025        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1026            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1027        }
1028    }
1029
1030    struct OperationWorkflow;
1031
1032    impl WorkflowHandler for OperationWorkflow {
1033        fn name(&self) -> &str {
1034            "operation-workflow"
1035        }
1036
1037        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1038            Box::pin(async move {
1039                let op = FakeGitlabOp {
1040                    project_id: 123,
1041                    title: "Bug report".to_string(),
1042                };
1043                ctx.operation("create-issue", &op).await?;
1044                Ok(())
1045            })
1046        }
1047    }
1048
1049    struct FailingOperationWorkflow;
1050
1051    impl WorkflowHandler for FailingOperationWorkflow {
1052        fn name(&self) -> &str {
1053            "failing-operation-workflow"
1054        }
1055
1056        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1057            Box::pin(async move {
1058                ctx.operation("broken-call", &FailingOp).await?;
1059                Ok(())
1060            })
1061        }
1062    }
1063
1064    struct MixedWorkflow;
1065
1066    impl WorkflowHandler for MixedWorkflow {
1067        fn name(&self) -> &str {
1068            "mixed-workflow"
1069        }
1070
1071        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1072            Box::pin(async move {
1073                ctx.shell("build", ShellConfig::new("echo built")).await?;
1074                let op = FakeGitlabOp {
1075                    project_id: 456,
1076                    title: "Deploy done".to_string(),
1077                };
1078                let result = ctx.operation("notify-gitlab", &op).await?;
1079                assert_eq!(result.output["issue_id"], 42);
1080                Ok(())
1081            })
1082        }
1083    }
1084
1085    #[tokio::test]
1086    async fn operation_step_happy_path() {
1087        let mut engine = create_test_engine();
1088        engine.register(OperationWorkflow).unwrap();
1089
1090        let run = engine
1091            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1092            .await
1093            .unwrap();
1094
1095        assert_eq!(run.status.state, RunStatus::Completed);
1096
1097        let steps = engine.store().list_steps(run.id).await.unwrap();
1098
1099        assert_eq!(steps.len(), 1);
1100        assert_eq!(steps[0].name, "create-issue");
1101        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1102        assert_eq!(
1103            steps[0].status.state,
1104            ironflow_store::models::StepStatus::Completed
1105        );
1106
1107        let output = steps[0].output.as_ref().unwrap();
1108        assert_eq!(output["issue_id"], 42);
1109        assert_eq!(output["project_id"], 123);
1110
1111        let input = steps[0].input.as_ref().unwrap();
1112        assert_eq!(input["project_id"], 123);
1113        assert_eq!(input["title"], "Bug report");
1114    }
1115
1116    #[tokio::test]
1117    async fn operation_step_failure_marks_run_failed() {
1118        let mut engine = create_test_engine();
1119        engine.register(FailingOperationWorkflow).unwrap();
1120
1121        let result = engine
1122            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1123            .await;
1124
1125        assert!(result.is_err());
1126    }
1127
1128    #[tokio::test]
1129    async fn operation_mixed_with_shell_steps() {
1130        let mut engine = create_test_engine();
1131        engine.register(MixedWorkflow).unwrap();
1132
1133        let run = engine
1134            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1135            .await
1136            .unwrap();
1137
1138        assert_eq!(run.status.state, RunStatus::Completed);
1139
1140        let steps = engine.store().list_steps(run.id).await.unwrap();
1141
1142        assert_eq!(steps.len(), 2);
1143        assert_eq!(steps[0].kind, StepKind::Shell);
1144        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1145        assert_eq!(steps[0].position, 0);
1146        assert_eq!(steps[1].position, 1);
1147    }
1148
1149    // -----------------------------------------------------------------------
1150    // Approval + resume tests
1151    // -----------------------------------------------------------------------
1152
1153    use crate::config::ApprovalConfig;
1154
1155    struct SingleApprovalWorkflow;
1156
1157    impl WorkflowHandler for SingleApprovalWorkflow {
1158        fn name(&self) -> &str {
1159            "single-approval"
1160        }
1161
1162        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1163            Box::pin(async move {
1164                ctx.shell("build", ShellConfig::new("echo built")).await?;
1165                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1166                ctx.shell("deploy", ShellConfig::new("echo deployed"))
1167                    .await?;
1168                Ok(())
1169            })
1170        }
1171    }
1172
1173    struct DoubleApprovalWorkflow;
1174
1175    impl WorkflowHandler for DoubleApprovalWorkflow {
1176        fn name(&self) -> &str {
1177            "double-approval"
1178        }
1179
1180        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1181            Box::pin(async move {
1182                ctx.shell("build", ShellConfig::new("echo built")).await?;
1183                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1184                    .await?;
1185                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1186                    .await?;
1187                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1188                    .await?;
1189                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1190                    .await?;
1191                Ok(())
1192            })
1193        }
1194    }
1195
1196    #[tokio::test]
1197    async fn approval_pauses_run() {
1198        let mut engine = create_test_engine();
1199        engine.register(SingleApprovalWorkflow).unwrap();
1200
1201        let run = engine
1202            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1203            .await
1204            .unwrap();
1205
1206        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1207
1208        let steps = engine.store().list_steps(run.id).await.unwrap();
1209        assert_eq!(steps.len(), 2); // build + approval gate
1210        assert_eq!(steps[0].kind, StepKind::Shell);
1211        assert_eq!(steps[0].status.state, StepStatus::Completed);
1212        assert_eq!(steps[1].kind, StepKind::Approval);
1213        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1214    }
1215
1216    #[tokio::test]
1217    async fn approval_resume_completes_run() {
1218        let mut engine = create_test_engine();
1219        engine.register(SingleApprovalWorkflow).unwrap();
1220
1221        // First execution: pauses at approval
1222        let run = engine
1223            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1224            .await
1225            .unwrap();
1226        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1227
1228        // Simulate approval: transition to Running
1229        engine
1230            .store()
1231            .update_run_status(run.id, RunStatus::Running)
1232            .await
1233            .unwrap();
1234
1235        // Resume: replays build, skips approval, executes deploy
1236        let resumed = engine.resume_run(run.id).await.unwrap();
1237        assert_eq!(resumed.status.state, RunStatus::Completed);
1238
1239        let steps = engine.store().list_steps(run.id).await.unwrap();
1240        assert_eq!(steps.len(), 3); // build + approval + deploy
1241        assert_eq!(steps[0].name, "build");
1242        assert_eq!(steps[0].status.state, StepStatus::Completed);
1243        assert_eq!(steps[1].name, "gate");
1244        assert_eq!(steps[1].kind, StepKind::Approval);
1245        assert_eq!(steps[1].status.state, StepStatus::Completed);
1246        assert_eq!(steps[2].name, "deploy");
1247        assert_eq!(steps[2].status.state, StepStatus::Completed);
1248    }
1249
1250    #[tokio::test]
1251    async fn double_approval_two_resumes() {
1252        let mut engine = create_test_engine();
1253        engine.register(DoubleApprovalWorkflow).unwrap();
1254
1255        // First execution: pauses at staging-gate
1256        let run = engine
1257            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1258            .await
1259            .unwrap();
1260        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1261
1262        let steps = engine.store().list_steps(run.id).await.unwrap();
1263        assert_eq!(steps.len(), 2); // build + staging-gate
1264
1265        // First approval
1266        engine
1267            .store()
1268            .update_run_status(run.id, RunStatus::Running)
1269            .await
1270            .unwrap();
1271
1272        let resumed = engine.resume_run(run.id).await.unwrap();
1273        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1274
1275        let steps = engine.store().list_steps(run.id).await.unwrap();
1276        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1277
1278        // Second approval
1279        engine
1280            .store()
1281            .update_run_status(run.id, RunStatus::Running)
1282            .await
1283            .unwrap();
1284
1285        let final_run = engine.resume_run(run.id).await.unwrap();
1286        assert_eq!(final_run.status.state, RunStatus::Completed);
1287
1288        let steps = engine.store().list_steps(run.id).await.unwrap();
1289        assert_eq!(steps.len(), 5);
1290        assert_eq!(steps[0].name, "build");
1291        assert_eq!(steps[1].name, "staging-gate");
1292        assert_eq!(steps[2].name, "deploy-staging");
1293        assert_eq!(steps[3].name, "prod-gate");
1294        assert_eq!(steps[4].name, "deploy-prod");
1295
1296        for step in &steps {
1297            assert_eq!(step.status.state, StepStatus::Completed);
1298        }
1299    }
1300}