Skip to main content

ironflow_engine/
engine.rs

1//! The core [`Engine`] -- orchestrates workflow execution and persistence.
2//!
3//! The engine ties together a [`RunStore`] for persistence, an [`AgentProvider`]
4//! for AI operations, and a registry of [`WorkflowHandler`]s.
5//!
6//! Handlers are Rust-native: steps can reference previous outputs, use native
7//! `if`/`else`/`match` for conditional branching, and execute in parallel.
8
9use std::collections::HashMap;
10use std::fmt;
11use std::sync::Arc;
12use std::time::Instant;
13
14use chrono::{DateTime, Utc};
15use serde_json::Value;
16use tracing::{error, info};
17use uuid::Uuid;
18
19#[cfg(feature = "prometheus")]
20use ironflow_core::metric_names::{RUN_COST_USD, RUN_DURATION_SECONDS, RUNS_ACTIVE, RUNS_TOTAL};
21use ironflow_core::provider::AgentProvider;
22use ironflow_store::error::StoreError;
23use ironflow_store::models::{NewRun, Run, RunStatus, RunUpdate, TriggerKind};
24use ironflow_store::store::Store;
25#[cfg(feature = "prometheus")]
26use metrics::{counter, gauge, histogram};
27
28use crate::context::WorkflowContext;
29use crate::error::EngineError;
30use crate::handler::{WorkflowHandler, WorkflowInfo};
31use crate::notify::{Event, EventPublisher, EventSubscriber};
32
33/// The workflow orchestration engine.
34///
35/// Holds references to the store, agent provider, and a registry of
36/// [`WorkflowHandler`]s.
37///
38/// # Examples
39///
40/// ```no_run
41/// use std::sync::Arc;
42/// use ironflow_engine::engine::Engine;
43/// use ironflow_engine::config::ShellConfig;
44/// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture, WorkflowInfo};
45/// use ironflow_engine::context::WorkflowContext;
46/// use ironflow_store::memory::InMemoryStore;
47/// use ironflow_store::models::TriggerKind;
48/// use ironflow_core::providers::claude::ClaudeCodeProvider;
49/// use serde_json::json;
50///
51/// struct CiWorkflow;
52/// impl WorkflowHandler for CiWorkflow {
53///     fn name(&self) -> &str { "ci" }
54///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
55///         Box::pin(async move {
56///             ctx.shell("test", ShellConfig::new("cargo test")).await?;
57///             Ok(())
58///         })
59///     }
60/// }
61///
62/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
63/// let store = Arc::new(InMemoryStore::new());
64/// let provider = Arc::new(ClaudeCodeProvider::new());
65/// let mut engine = Engine::new(store, provider);
66/// engine.register(CiWorkflow)?;
67///
68/// let run = engine.run_handler("ci", TriggerKind::Manual, json!({})).await?;
69/// println!("Run {} completed with status {:?}", run.id, run.status);
70/// # Ok(())
71/// # }
72/// ```
73pub struct Engine {
74    store: Arc<dyn Store>,
75    provider: Arc<dyn AgentProvider>,
76    handlers: HashMap<String, Arc<dyn WorkflowHandler>>,
77    event_publisher: EventPublisher,
78}
79
80/// Validate a workflow category path.
81///
82/// A category is a `/`-separated list of non-empty segments. This function
83/// rejects empty paths, leading or trailing `/`, consecutive `/`, and
84/// segments containing only whitespace.
85///
86/// # Errors
87///
88/// Returns [`EngineError::InvalidWorkflow`] when the category is malformed.
89fn validate_category(handler_name: &str, category: &str) -> Result<(), EngineError> {
90    let reject = |reason: &str| {
91        Err(EngineError::InvalidWorkflow(format!(
92            "handler '{handler_name}' has invalid category '{category}': {reason}"
93        )))
94    };
95
96    if category.is_empty() {
97        return reject("empty category");
98    }
99    if category.starts_with('/') {
100        return reject("leading '/'");
101    }
102    if category.ends_with('/') {
103        return reject("trailing '/'");
104    }
105    for segment in category.split('/') {
106        if segment.is_empty() {
107            return reject("empty segment (double '/')");
108        }
109        if segment.trim().is_empty() {
110            return reject("whitespace-only segment");
111        }
112    }
113    Ok(())
114}
115
116impl Engine {
117    /// Create a new engine with the given store and agent provider.
118    ///
119    /// # Examples
120    ///
121    /// ```no_run
122    /// use std::sync::Arc;
123    /// use ironflow_engine::engine::Engine;
124    /// use ironflow_store::memory::InMemoryStore;
125    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
126    ///
127    /// let engine = Engine::new(
128    ///     Arc::new(InMemoryStore::new()),
129    ///     Arc::new(ClaudeCodeProvider::new()),
130    /// );
131    /// ```
132    pub fn new(store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
133        Self {
134            store,
135            provider,
136            handlers: HashMap::new(),
137            event_publisher: EventPublisher::new(),
138        }
139    }
140
141    /// Returns a reference to the backing store.
142    pub fn store(&self) -> &Arc<dyn Store> {
143        &self.store
144    }
145
146    /// Returns a reference to the agent provider.
147    pub fn provider(&self) -> &Arc<dyn AgentProvider> {
148        &self.provider
149    }
150
151    /// Build a [`WorkflowContext`] with access to the handler registry.
152    fn build_context(&self, run_id: Uuid) -> WorkflowContext {
153        let handlers = self.handlers.clone();
154        let resolver: crate::context::HandlerResolver =
155            Arc::new(move |name: &str| handlers.get(name).cloned());
156        WorkflowContext::with_handler_resolver(
157            run_id,
158            self.store.clone(),
159            self.provider.clone(),
160            resolver,
161        )
162    }
163
164    // -----------------------------------------------------------------------
165    // Handler registration
166    // -----------------------------------------------------------------------
167
168    /// Register a [`WorkflowHandler`] for dynamic workflow execution.
169    ///
170    /// The handler is looked up by [`WorkflowHandler::name`] when executing
171    /// or enqueuing.
172    ///
173    /// # Errors
174    ///
175    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
176    /// name is already registered.
177    ///
178    /// # Examples
179    ///
180    /// ```no_run
181    /// use std::sync::Arc;
182    /// use ironflow_engine::engine::Engine;
183    /// use ironflow_engine::handler::{WorkflowHandler, HandlerFuture};
184    /// use ironflow_engine::context::WorkflowContext;
185    /// use ironflow_engine::config::ShellConfig;
186    /// use ironflow_store::memory::InMemoryStore;
187    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
188    ///
189    /// struct MyWorkflow;
190    /// impl WorkflowHandler for MyWorkflow {
191    ///     fn name(&self) -> &str { "my-workflow" }
192    ///     fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
193    ///         Box::pin(async move {
194    ///             ctx.shell("step1", ShellConfig::new("echo done")).await?;
195    ///             Ok(())
196    ///         })
197    ///     }
198    /// }
199    ///
200    /// let mut engine = Engine::new(
201    ///     Arc::new(InMemoryStore::new()),
202    ///     Arc::new(ClaudeCodeProvider::new()),
203    /// );
204    /// engine.register(MyWorkflow)?;
205    /// # Ok::<(), ironflow_engine::error::EngineError>(())
206    /// ```
207    pub fn register(&mut self, handler: impl WorkflowHandler + 'static) -> Result<(), EngineError> {
208        let name = handler.name().to_string();
209        if self.handlers.contains_key(&name) {
210            return Err(EngineError::InvalidWorkflow(format!(
211                "handler '{}' already registered",
212                name
213            )));
214        }
215        if let Some(category) = handler.category() {
216            validate_category(&name, category)?;
217        }
218        self.handlers.insert(name, Arc::new(handler));
219        Ok(())
220    }
221
222    /// Register a pre-boxed workflow handler.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`EngineError::InvalidWorkflow`] if a handler with the same
227    /// name is already registered or if its category is invalid.
228    pub fn register_boxed(&mut self, handler: Box<dyn WorkflowHandler>) -> Result<(), EngineError> {
229        let name = handler.name().to_string();
230        if self.handlers.contains_key(&name) {
231            return Err(EngineError::InvalidWorkflow(format!(
232                "handler '{}' already registered",
233                name
234            )));
235        }
236        if let Some(category) = handler.category() {
237            validate_category(&name, category)?;
238        }
239        self.handlers.insert(name, Arc::from(handler));
240        Ok(())
241    }
242
243    /// Get a registered handler by name.
244    pub fn get_handler(&self, name: &str) -> Option<&Arc<dyn WorkflowHandler>> {
245        self.handlers.get(name)
246    }
247
248    /// List registered handler names.
249    pub fn handler_names(&self) -> Vec<&str> {
250        self.handlers.keys().map(|s| s.as_str()).collect()
251    }
252
253    /// Get detailed info about a registered workflow handler.
254    pub fn handler_info(&self, name: &str) -> Option<WorkflowInfo> {
255        self.handlers.get(name).map(|h| h.describe())
256    }
257
258    /// Register an event subscriber for domain events.
259    ///
260    /// The subscriber is called only for events whose type is in
261    /// `event_types`. Pass [`Event::ALL`] to receive every event.
262    ///
263    /// # Examples
264    ///
265    /// ```no_run
266    /// use ironflow_engine::engine::Engine;
267    /// use ironflow_engine::notify::{Event, WebhookSubscriber};
268    /// use ironflow_store::memory::InMemoryStore;
269    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
270    /// use std::sync::Arc;
271    ///
272    /// let mut engine = Engine::new(
273    ///     Arc::new(InMemoryStore::new()),
274    ///     Arc::new(ClaudeCodeProvider::new()),
275    /// );
276    ///
277    /// engine.subscribe(
278    ///     WebhookSubscriber::new("https://hooks.example.com/events"),
279    ///     &[Event::RUN_STATUS_CHANGED, Event::STEP_FAILED],
280    /// );
281    /// ```
282    pub fn subscribe(
283        &mut self,
284        subscriber: impl EventSubscriber + 'static,
285        event_types: &[&'static str],
286    ) {
287        self.event_publisher.subscribe(subscriber, event_types);
288    }
289
290    /// Returns a reference to the event publisher.
291    ///
292    /// Useful for publishing events from outside the engine (e.g. auth
293    /// routes in the API layer).
294    pub fn event_publisher(&self) -> &EventPublisher {
295        &self.event_publisher
296    }
297
298    // -----------------------------------------------------------------------
299    // Dynamic workflow execution (WorkflowHandler)
300    // -----------------------------------------------------------------------
301
302    /// Execute a registered handler inline.
303    ///
304    /// Creates a run, builds a [`WorkflowContext`], calls the handler's
305    /// [`execute`](WorkflowHandler::execute), and finalizes the run.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
310    /// with that name. Returns [`EngineError`] if execution fails.
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// use std::sync::Arc;
316    /// use ironflow_engine::engine::Engine;
317    /// use ironflow_store::memory::InMemoryStore;
318    /// use ironflow_store::models::TriggerKind;
319    /// use ironflow_core::providers::claude::ClaudeCodeProvider;
320    /// use serde_json::json;
321    ///
322    /// # async fn example(engine: &Engine) -> Result<(), ironflow_engine::error::EngineError> {
323    /// let run = engine.run_handler("deploy", TriggerKind::Manual, json!({})).await?;
324    /// # Ok(())
325    /// # }
326    /// ```
327    #[tracing::instrument(name = "engine.run_handler", skip_all, fields(workflow = %handler_name))]
328    pub async fn run_handler(
329        &self,
330        handler_name: &str,
331        trigger: TriggerKind,
332        payload: Value,
333    ) -> Result<Run, EngineError> {
334        let handler = self
335            .handlers
336            .get(handler_name)
337            .ok_or_else(|| {
338                EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
339            })?
340            .clone();
341
342        let handler_version = handler.version().map(str::to_string);
343        let run = self
344            .store
345            .create_run(NewRun {
346                workflow_name: handler_name.to_string(),
347                trigger,
348                payload,
349                max_retries: 0,
350                handler_version,
351                labels: handler.default_labels(),
352                scheduled_at: None,
353            })
354            .await?;
355
356        let run_id = run.id;
357        info!(run_id = %run_id, handler_version = run.handler_version.as_deref().unwrap_or(""), "run created");
358
359        self.store
360            .update_run_status(run_id, RunStatus::Running)
361            .await?;
362
363        #[cfg(feature = "prometheus")]
364        gauge!(RUNS_ACTIVE, "workflow" => handler_name.to_string()).increment(1.0);
365
366        let run_start = Instant::now();
367        let mut ctx = self.build_context(run_id);
368
369        let result = handler.execute(&mut ctx).await;
370        self.finalize_run(run_id, handler_name, result, &ctx, run_start)
371            .await
372    }
373
374    /// Enqueue a handler-based workflow for worker execution.
375    ///
376    /// The workflow name is stored in the run. The worker looks up the
377    /// handler by name when executing.
378    ///
379    /// # Errors
380    ///
381    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
382    #[tracing::instrument(name = "engine.enqueue_handler", skip_all, fields(workflow = %handler_name))]
383    pub async fn enqueue_handler(
384        &self,
385        handler_name: &str,
386        trigger: TriggerKind,
387        payload: Value,
388        max_retries: u32,
389    ) -> Result<Run, EngineError> {
390        self.enqueue_handler_with_options(
391            handler_name,
392            trigger,
393            payload,
394            max_retries,
395            HashMap::new(),
396            None,
397        )
398        .await
399    }
400
401    /// Enqueue a handler-based workflow with labels and optional deferred scheduling.
402    ///
403    /// # Errors
404    ///
405    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered.
406    #[tracing::instrument(name = "engine.enqueue_handler_with_options", skip_all, fields(workflow = %handler_name))]
407    pub async fn enqueue_handler_with_options(
408        &self,
409        handler_name: &str,
410        trigger: TriggerKind,
411        payload: Value,
412        max_retries: u32,
413        labels: HashMap<String, String>,
414        scheduled_at: Option<DateTime<Utc>>,
415    ) -> Result<Run, EngineError> {
416        let handler = self.handlers.get(handler_name).ok_or_else(|| {
417            EngineError::InvalidWorkflow(format!("no handler registered: {handler_name}"))
418        })?;
419
420        let handler_version = handler.version().map(str::to_string);
421        let mut merged_labels = handler.default_labels();
422        merged_labels.extend(labels);
423
424        let run = self
425            .store
426            .create_run(NewRun {
427                workflow_name: handler_name.to_string(),
428                trigger,
429                payload,
430                max_retries,
431                handler_version,
432                labels: merged_labels,
433                scheduled_at,
434            })
435            .await?;
436
437        info!(run_id = %run.id, workflow = %handler_name, "handler run enqueued");
438        Ok(run)
439    }
440
441    /// Execute a handler-based run (used by the worker after pick_next_pending).
442    ///
443    /// Looks up the handler by the run's `workflow_name` and executes it
444    /// with a fresh [`WorkflowContext`].
445    ///
446    /// # Errors
447    ///
448    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
449    #[tracing::instrument(name = "engine.execute_handler_run", skip_all, fields(run_id = %run_id))]
450    pub async fn execute_handler_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
451        let run = self
452            .store
453            .get_run(run_id)
454            .await?
455            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
456
457        let handler = self
458            .handlers
459            .get(&run.workflow_name)
460            .ok_or_else(|| {
461                EngineError::InvalidWorkflow(format!(
462                    "no handler registered: {}",
463                    run.workflow_name
464                ))
465            })?
466            .clone();
467
468        #[cfg(feature = "prometheus")]
469        gauge!(RUNS_ACTIVE, "workflow" => run.workflow_name.clone()).increment(1.0);
470
471        let run_start = Instant::now();
472        let mut ctx = self.build_context(run_id);
473
474        let result = handler.execute(&mut ctx).await;
475        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
476            .await
477    }
478
479    /// Execute a run by its ID (used by the worker after pick_next_pending).
480    ///
481    /// Delegates to [`execute_handler_run`](Self::execute_handler_run).
482    ///
483    /// # Errors
484    ///
485    /// Returns [`EngineError`] if the run is not found or execution fails.
486    #[tracing::instrument(name = "engine.execute_run", skip_all, fields(run_id = %run_id))]
487    pub async fn execute_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
488        self.execute_handler_run(run_id).await
489    }
490
491    /// Resume a run after human approval.
492    ///
493    /// Re-executes the handler with step replay: completed steps return
494    /// cached output, approved approval steps are skipped, and execution
495    /// continues from the first unexecuted step.
496    ///
497    /// Supports multiple approval gates -- each resume replays all prior
498    /// steps and stops at the next approval (or completes the run).
499    ///
500    /// # Errors
501    ///
502    /// Returns [`EngineError::InvalidWorkflow`] if no handler matches.
503    /// Returns [`EngineError`] if execution fails or hits another approval.
504    #[tracing::instrument(name = "engine.resume_run", skip_all, fields(run_id = %run_id))]
505    pub async fn resume_run(&self, run_id: Uuid) -> Result<Run, EngineError> {
506        let run = self
507            .store
508            .get_run(run_id)
509            .await?
510            .ok_or(EngineError::Store(StoreError::RunNotFound(run_id)))?;
511
512        let handler = self
513            .handlers
514            .get(&run.workflow_name)
515            .ok_or_else(|| {
516                EngineError::InvalidWorkflow(format!(
517                    "no handler registered: {}",
518                    run.workflow_name
519                ))
520            })?
521            .clone();
522
523        info!(run_id = %run_id, workflow = %run.workflow_name, "resuming run after approval");
524
525        let run_start = Instant::now();
526        let mut ctx = self.build_context(run_id);
527        ctx.load_replay_steps().await?;
528
529        let result = handler.execute(&mut ctx).await;
530        self.finalize_run(run_id, &run.workflow_name, result, &ctx, run_start)
531            .await
532    }
533
534    /// Finalize a run with the given result and context.
535    ///
536    /// On success: updates run to Completed with cost, duration, and completed_at.
537    /// On failure: updates run to Failed with error, cost, duration, and completed_at.
538    /// Always: fetches and returns the final Run.
539    async fn finalize_run(
540        &self,
541        run_id: Uuid,
542        workflow_name: &str,
543        result: Result<(), EngineError>,
544        ctx: &WorkflowContext,
545        run_start: Instant,
546    ) -> Result<Run, EngineError> {
547        let total_duration = run_start.elapsed().as_millis() as u64;
548        let completed_at = Utc::now();
549
550        let final_status;
551        let final_run;
552
553        match result {
554            Ok(()) => {
555                final_status = RunStatus::Completed;
556                final_run = self
557                    .store
558                    .update_run_returning(
559                        run_id,
560                        RunUpdate {
561                            status: Some(RunStatus::Completed),
562                            cost_usd: Some(ctx.total_cost_usd()),
563                            duration_ms: Some(total_duration),
564                            completed_at: Some(completed_at),
565                            ..RunUpdate::default()
566                        },
567                    )
568                    .await?;
569
570                info!(
571                    run_id = %run_id,
572                    cost_usd = %ctx.total_cost_usd(),
573                    duration_ms = total_duration,
574                    "run completed"
575                );
576            }
577            Err(EngineError::ApprovalRequired {
578                run_id: approval_run_id,
579                step_id,
580                ref message,
581            }) => {
582                final_status = RunStatus::AwaitingApproval;
583                final_run = self
584                    .store
585                    .update_run_returning(
586                        run_id,
587                        RunUpdate {
588                            status: Some(RunStatus::AwaitingApproval),
589                            cost_usd: Some(ctx.total_cost_usd()),
590                            duration_ms: Some(total_duration),
591                            ..RunUpdate::default()
592                        },
593                    )
594                    .await?;
595
596                info!(
597                    run_id = %approval_run_id,
598                    step_id = %step_id,
599                    message = %message,
600                    "run awaiting approval"
601                );
602            }
603            Err(err) => {
604                final_status = RunStatus::Failed;
605                if let Err(store_err) = self
606                    .store
607                    .update_run(
608                        run_id,
609                        RunUpdate {
610                            status: Some(RunStatus::Failed),
611                            error: Some(err.to_string()),
612                            cost_usd: Some(ctx.total_cost_usd()),
613                            duration_ms: Some(total_duration),
614                            completed_at: Some(completed_at),
615                            ..RunUpdate::default()
616                        },
617                    )
618                    .await
619                {
620                    error!(run_id = %run_id, store_error = %store_err, "failed to persist run failure");
621                }
622
623                error!(run_id = %run_id, error = %err, "run failed");
624
625                self.publish_run_status_changed(
626                    workflow_name,
627                    run_id,
628                    final_status,
629                    Some(err.to_string()),
630                    ctx,
631                    total_duration,
632                );
633
634                #[cfg(feature = "prometheus")]
635                self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
636
637                return Err(err);
638            }
639        }
640
641        self.publish_run_status_changed(
642            workflow_name,
643            run_id,
644            final_status,
645            None,
646            ctx,
647            total_duration,
648        );
649
650        #[cfg(feature = "prometheus")]
651        self.emit_run_metrics(workflow_name, final_status, total_duration, ctx);
652
653        Ok(final_run)
654    }
655
656    /// Emit Prometheus metrics for a completed run.
657    #[cfg(feature = "prometheus")]
658    fn emit_run_metrics(
659        &self,
660        workflow_name: &str,
661        status: RunStatus,
662        duration_ms: u64,
663        ctx: &WorkflowContext,
664    ) {
665        let status_str = status.to_string();
666        let wf = workflow_name.to_string();
667
668        counter!(RUNS_TOTAL, "workflow" => wf.clone(), "status" => status_str.clone()).increment(1);
669        histogram!(RUN_DURATION_SECONDS, "workflow" => wf.clone(), "status" => status_str)
670            .record(duration_ms as f64 / 1000.0);
671        histogram!(RUN_COST_USD, "workflow" => wf.clone()).record(
672            ctx.total_cost_usd()
673                .to_string()
674                .parse::<f64>()
675                .unwrap_or(0.0),
676        );
677        gauge!(RUNS_ACTIVE, "workflow" => wf).decrement(1.0);
678    }
679
680    /// Publish a run status changed event to all registered subscribers.
681    ///
682    /// `from` is always `Running` because `finalize_run` is only called
683    /// from a running state.
684    fn publish_run_status_changed(
685        &self,
686        workflow_name: &str,
687        run_id: Uuid,
688        to: RunStatus,
689        error: Option<String>,
690        ctx: &WorkflowContext,
691        duration_ms: u64,
692    ) {
693        let now = Utc::now();
694        let cost_usd = ctx.total_cost_usd();
695        let wf = workflow_name.to_string();
696
697        self.event_publisher.publish(Event::RunStatusChanged {
698            run_id,
699            workflow_name: wf.clone(),
700            from: RunStatus::Running,
701            to,
702            error: error.clone(),
703            cost_usd,
704            duration_ms,
705            at: now,
706        });
707
708        if to == RunStatus::Failed {
709            self.event_publisher.publish(Event::RunFailed {
710                run_id,
711                workflow_name: wf,
712                error,
713                cost_usd,
714                duration_ms,
715                at: now,
716            });
717        }
718    }
719}
720
721impl fmt::Debug for Engine {
722    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
723        f.debug_struct("Engine")
724            .field("handlers", &self.handlers.keys().collect::<Vec<_>>())
725            .finish_non_exhaustive()
726    }
727}
728
729#[cfg(test)]
730mod tests {
731    use super::*;
732    use crate::config::ShellConfig;
733    use crate::handler::{HandlerFuture, WorkflowHandler};
734    use ironflow_core::providers::claude::ClaudeCodeProvider;
735    use ironflow_core::providers::record_replay::RecordReplayProvider;
736    use ironflow_store::memory::InMemoryStore;
737    use ironflow_store::models::StepStatus;
738    use serde_json::json;
739
740    // Test handler that echoes a message via shell
741    struct EchoWorkflow;
742
743    impl WorkflowHandler for EchoWorkflow {
744        fn name(&self) -> &str {
745            "echo-workflow"
746        }
747
748        fn describe(&self) -> WorkflowInfo {
749            WorkflowInfo {
750                description: "A simple workflow that echoes hello".to_string(),
751                source_code: None,
752                sub_workflows: Vec::new(),
753                category: None,
754                version: self.version().map(str::to_string),
755                input_schema: None,
756                default_labels: HashMap::new(),
757            }
758        }
759
760        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
761            Box::pin(async move {
762                ctx.shell("greet", ShellConfig::new("echo hello")).await?;
763                Ok(())
764            })
765        }
766    }
767
768    // Test handler that fails
769    struct FailingWorkflow;
770
771    impl WorkflowHandler for FailingWorkflow {
772        fn name(&self) -> &str {
773            "failing-workflow"
774        }
775
776        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
777            Box::pin(async move {
778                ctx.shell("fail", ShellConfig::new("exit 1")).await?;
779                Ok(())
780            })
781        }
782    }
783
784    fn create_test_engine() -> Engine {
785        let store = Arc::new(InMemoryStore::new());
786        let inner = ClaudeCodeProvider::new();
787        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
788            inner,
789            "/tmp/ironflow-fixtures",
790        ));
791        Engine::new(store, provider)
792    }
793
794    #[test]
795    fn engine_new_creates_instance() {
796        let engine = create_test_engine();
797        assert_eq!(engine.handler_names().len(), 0);
798    }
799
800    #[test]
801    fn engine_register_handler() {
802        let mut engine = create_test_engine();
803        let result = engine.register(EchoWorkflow);
804        assert!(result.is_ok());
805        assert_eq!(engine.handler_names().len(), 1);
806        assert!(engine.handler_names().contains(&"echo-workflow"));
807    }
808
809    #[test]
810    fn engine_register_duplicate_returns_error() {
811        let mut engine = create_test_engine();
812        engine.register(EchoWorkflow).unwrap();
813        let result = engine.register(EchoWorkflow);
814        assert!(result.is_err());
815    }
816
817    #[test]
818    fn engine_get_handler_found() {
819        let mut engine = create_test_engine();
820        engine.register(EchoWorkflow).unwrap();
821        let handler = engine.get_handler("echo-workflow");
822        assert!(handler.is_some());
823    }
824
825    #[test]
826    fn engine_get_handler_not_found() {
827        let engine = create_test_engine();
828        let handler = engine.get_handler("nonexistent");
829        assert!(handler.is_none());
830    }
831
832    #[test]
833    fn engine_handler_names_lists_all() {
834        let mut engine = create_test_engine();
835        engine.register(EchoWorkflow).unwrap();
836        engine.register(FailingWorkflow).unwrap();
837        let names = engine.handler_names();
838        assert_eq!(names.len(), 2);
839        assert!(names.contains(&"echo-workflow"));
840        assert!(names.contains(&"failing-workflow"));
841    }
842
843    #[test]
844    fn engine_handler_info_returns_description() {
845        let mut engine = create_test_engine();
846        engine.register(EchoWorkflow).unwrap();
847        let info = engine.handler_info("echo-workflow");
848        assert!(info.is_some());
849        let info = info.unwrap();
850        assert_eq!(info.description, "A simple workflow that echoes hello");
851    }
852
853    struct CategorizedWorkflow;
854
855    impl WorkflowHandler for CategorizedWorkflow {
856        fn name(&self) -> &str {
857            "categorized"
858        }
859        fn category(&self) -> Option<&str> {
860            Some("data/etl")
861        }
862        fn execute<'a>(
863            &'a self,
864            _ctx: &'a mut WorkflowContext,
865        ) -> crate::handler::HandlerFuture<'a> {
866            Box::pin(async move { Ok(()) })
867        }
868    }
869
870    #[test]
871    fn engine_default_describe_propagates_category() {
872        let mut engine = create_test_engine();
873        engine.register(CategorizedWorkflow).unwrap();
874        let info = engine.handler_info("categorized").unwrap();
875        assert_eq!(info.category.as_deref(), Some("data/etl"));
876    }
877
878    #[test]
879    fn engine_default_describe_without_category() {
880        let mut engine = create_test_engine();
881        engine.register(EchoWorkflow).unwrap();
882        let info = engine.handler_info("echo-workflow").unwrap();
883        assert!(info.category.is_none());
884    }
885
886    struct BadCategoryWorkflow(&'static str);
887
888    impl WorkflowHandler for BadCategoryWorkflow {
889        fn name(&self) -> &str {
890            "bad-category"
891        }
892        fn category(&self) -> Option<&str> {
893            Some(self.0)
894        }
895        fn execute<'a>(
896            &'a self,
897            _ctx: &'a mut WorkflowContext,
898        ) -> crate::handler::HandlerFuture<'a> {
899            Box::pin(async move { Ok(()) })
900        }
901    }
902
903    #[test]
904    fn engine_register_rejects_empty_category() {
905        let mut engine = create_test_engine();
906        let err = engine.register(BadCategoryWorkflow("")).unwrap_err();
907        match err {
908            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty category")),
909            other => panic!("expected InvalidWorkflow, got {other:?}"),
910        }
911    }
912
913    #[test]
914    fn engine_register_rejects_leading_slash_category() {
915        let mut engine = create_test_engine();
916        let err = engine
917            .register(BadCategoryWorkflow("/data/etl"))
918            .unwrap_err();
919        match err {
920            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("leading '/'")),
921            other => panic!("expected InvalidWorkflow, got {other:?}"),
922        }
923    }
924
925    #[test]
926    fn engine_register_rejects_trailing_slash_category() {
927        let mut engine = create_test_engine();
928        let err = engine
929            .register(BadCategoryWorkflow("data/etl/"))
930            .unwrap_err();
931        match err {
932            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("trailing '/'")),
933            other => panic!("expected InvalidWorkflow, got {other:?}"),
934        }
935    }
936
937    #[test]
938    fn engine_register_rejects_double_slash_category() {
939        let mut engine = create_test_engine();
940        let err = engine
941            .register(BadCategoryWorkflow("data//etl"))
942            .unwrap_err();
943        match err {
944            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("empty segment")),
945            other => panic!("expected InvalidWorkflow, got {other:?}"),
946        }
947    }
948
949    #[test]
950    fn engine_register_rejects_whitespace_only_segment_category() {
951        let mut engine = create_test_engine();
952        let err = engine
953            .register(BadCategoryWorkflow("data/ /etl"))
954            .unwrap_err();
955        match err {
956            EngineError::InvalidWorkflow(msg) => assert!(msg.contains("whitespace-only segment")),
957            other => panic!("expected InvalidWorkflow, got {other:?}"),
958        }
959    }
960
961    #[test]
962    fn engine_register_accepts_valid_nested_category() {
963        let mut engine = create_test_engine();
964        assert!(engine.register(CategorizedWorkflow).is_ok());
965    }
966
967    #[tokio::test]
968    async fn engine_unknown_workflow_returns_error() {
969        let engine = create_test_engine();
970        let result = engine
971            .run_handler("unknown", TriggerKind::Manual, json!({}))
972            .await;
973        assert!(result.is_err());
974        match result {
975            Err(EngineError::InvalidWorkflow(msg)) => {
976                assert!(msg.contains("no handler registered"));
977            }
978            _ => panic!("expected InvalidWorkflow error"),
979        }
980    }
981
982    #[tokio::test]
983    async fn engine_enqueue_handler_creates_pending_run() {
984        let mut engine = create_test_engine();
985        engine.register(EchoWorkflow).unwrap();
986
987        let run = engine
988            .enqueue_handler("echo-workflow", TriggerKind::Manual, json!({}), 0)
989            .await
990            .unwrap();
991        assert_eq!(run.status.state, RunStatus::Pending);
992        assert_eq!(run.workflow_name, "echo-workflow");
993    }
994
995    #[tokio::test]
996    async fn engine_register_boxed() {
997        let mut engine = create_test_engine();
998        let handler: Box<dyn WorkflowHandler> = Box::new(EchoWorkflow);
999        let result = engine.register_boxed(handler);
1000        assert!(result.is_ok());
1001        assert_eq!(engine.handler_names().len(), 1);
1002    }
1003
1004    #[tokio::test]
1005    async fn engine_store_and_provider_accessors() {
1006        let store = Arc::new(InMemoryStore::new());
1007        let inner = ClaudeCodeProvider::new();
1008        let provider: Arc<dyn AgentProvider> = Arc::new(RecordReplayProvider::replay(
1009            inner,
1010            "/tmp/ironflow-fixtures",
1011        ));
1012        let engine = Engine::new(store.clone(), provider.clone());
1013
1014        // Verify accessors return references
1015        let _ = engine.store();
1016        let _ = engine.provider();
1017    }
1018
1019    // -----------------------------------------------------------------------
1020    // Operation trait tests
1021    // -----------------------------------------------------------------------
1022
1023    use crate::operation::Operation;
1024    use ironflow_store::models::StepKind;
1025    use std::future::Future;
1026    use std::pin::Pin;
1027
1028    struct FakeGitlabOp {
1029        project_id: u64,
1030        title: String,
1031    }
1032
1033    impl Operation for FakeGitlabOp {
1034        fn kind(&self) -> &str {
1035            "gitlab"
1036        }
1037
1038        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1039            Box::pin(async move {
1040                Ok(json!({
1041                    "issue_id": 42,
1042                    "project_id": self.project_id,
1043                    "title": self.title,
1044                }))
1045            })
1046        }
1047
1048        fn input(&self) -> Option<Value> {
1049            Some(json!({
1050                "project_id": self.project_id,
1051                "title": self.title,
1052            }))
1053        }
1054    }
1055
1056    struct FailingOp;
1057
1058    impl Operation for FailingOp {
1059        fn kind(&self) -> &str {
1060            "broken-service"
1061        }
1062
1063        fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
1064            Box::pin(async move { Err(EngineError::StepConfig("service unavailable".to_string())) })
1065        }
1066    }
1067
1068    struct OperationWorkflow;
1069
1070    impl WorkflowHandler for OperationWorkflow {
1071        fn name(&self) -> &str {
1072            "operation-workflow"
1073        }
1074
1075        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1076            Box::pin(async move {
1077                let op = FakeGitlabOp {
1078                    project_id: 123,
1079                    title: "Bug report".to_string(),
1080                };
1081                ctx.operation("create-issue", &op).await?;
1082                Ok(())
1083            })
1084        }
1085    }
1086
1087    struct FailingOperationWorkflow;
1088
1089    impl WorkflowHandler for FailingOperationWorkflow {
1090        fn name(&self) -> &str {
1091            "failing-operation-workflow"
1092        }
1093
1094        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1095            Box::pin(async move {
1096                ctx.operation("broken-call", &FailingOp).await?;
1097                Ok(())
1098            })
1099        }
1100    }
1101
1102    struct MixedWorkflow;
1103
1104    impl WorkflowHandler for MixedWorkflow {
1105        fn name(&self) -> &str {
1106            "mixed-workflow"
1107        }
1108
1109        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1110            Box::pin(async move {
1111                ctx.shell("build", ShellConfig::new("echo built")).await?;
1112                let op = FakeGitlabOp {
1113                    project_id: 456,
1114                    title: "Deploy done".to_string(),
1115                };
1116                let result = ctx.operation("notify-gitlab", &op).await?;
1117                assert_eq!(result.output["issue_id"], 42);
1118                Ok(())
1119            })
1120        }
1121    }
1122
1123    #[tokio::test]
1124    async fn operation_step_happy_path() {
1125        let mut engine = create_test_engine();
1126        engine.register(OperationWorkflow).unwrap();
1127
1128        let run = engine
1129            .run_handler("operation-workflow", TriggerKind::Manual, json!({}))
1130            .await
1131            .unwrap();
1132
1133        assert_eq!(run.status.state, RunStatus::Completed);
1134
1135        let steps = engine.store().list_steps(run.id).await.unwrap();
1136
1137        assert_eq!(steps.len(), 1);
1138        assert_eq!(steps[0].name, "create-issue");
1139        assert_eq!(steps[0].kind, StepKind::Custom("gitlab".to_string()));
1140        assert_eq!(
1141            steps[0].status.state,
1142            ironflow_store::models::StepStatus::Completed
1143        );
1144
1145        let output = steps[0].output.as_ref().unwrap();
1146        assert_eq!(output["issue_id"], 42);
1147        assert_eq!(output["project_id"], 123);
1148
1149        let input = steps[0].input.as_ref().unwrap();
1150        assert_eq!(input["project_id"], 123);
1151        assert_eq!(input["title"], "Bug report");
1152    }
1153
1154    #[tokio::test]
1155    async fn operation_step_failure_marks_run_failed() {
1156        let mut engine = create_test_engine();
1157        engine.register(FailingOperationWorkflow).unwrap();
1158
1159        let result = engine
1160            .run_handler("failing-operation-workflow", TriggerKind::Manual, json!({}))
1161            .await;
1162
1163        assert!(result.is_err());
1164    }
1165
1166    #[tokio::test]
1167    async fn operation_mixed_with_shell_steps() {
1168        let mut engine = create_test_engine();
1169        engine.register(MixedWorkflow).unwrap();
1170
1171        let run = engine
1172            .run_handler("mixed-workflow", TriggerKind::Manual, json!({}))
1173            .await
1174            .unwrap();
1175
1176        assert_eq!(run.status.state, RunStatus::Completed);
1177
1178        let steps = engine.store().list_steps(run.id).await.unwrap();
1179
1180        assert_eq!(steps.len(), 2);
1181        assert_eq!(steps[0].kind, StepKind::Shell);
1182        assert_eq!(steps[1].kind, StepKind::Custom("gitlab".to_string()));
1183        assert_eq!(steps[0].position, 0);
1184        assert_eq!(steps[1].position, 1);
1185    }
1186
1187    // -----------------------------------------------------------------------
1188    // Approval + resume tests
1189    // -----------------------------------------------------------------------
1190
1191    use crate::config::ApprovalConfig;
1192
1193    struct SingleApprovalWorkflow;
1194
1195    impl WorkflowHandler for SingleApprovalWorkflow {
1196        fn name(&self) -> &str {
1197            "single-approval"
1198        }
1199
1200        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1201            Box::pin(async move {
1202                ctx.shell("build", ShellConfig::new("echo built")).await?;
1203                ctx.approval("gate", ApprovalConfig::new("OK?")).await?;
1204                ctx.shell("deploy", ShellConfig::new("echo deployed"))
1205                    .await?;
1206                Ok(())
1207            })
1208        }
1209    }
1210
1211    struct DoubleApprovalWorkflow;
1212
1213    impl WorkflowHandler for DoubleApprovalWorkflow {
1214        fn name(&self) -> &str {
1215            "double-approval"
1216        }
1217
1218        fn execute<'a>(&'a self, ctx: &'a mut WorkflowContext) -> HandlerFuture<'a> {
1219            Box::pin(async move {
1220                ctx.shell("build", ShellConfig::new("echo built")).await?;
1221                ctx.approval("staging-gate", ApprovalConfig::new("Deploy staging?"))
1222                    .await?;
1223                ctx.shell("deploy-staging", ShellConfig::new("echo staging"))
1224                    .await?;
1225                ctx.approval("prod-gate", ApprovalConfig::new("Deploy prod?"))
1226                    .await?;
1227                ctx.shell("deploy-prod", ShellConfig::new("echo prod"))
1228                    .await?;
1229                Ok(())
1230            })
1231        }
1232    }
1233
1234    #[tokio::test]
1235    async fn approval_pauses_run() {
1236        let mut engine = create_test_engine();
1237        engine.register(SingleApprovalWorkflow).unwrap();
1238
1239        let run = engine
1240            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1241            .await
1242            .unwrap();
1243
1244        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1245
1246        let steps = engine.store().list_steps(run.id).await.unwrap();
1247        assert_eq!(steps.len(), 2); // build + approval gate
1248        assert_eq!(steps[0].kind, StepKind::Shell);
1249        assert_eq!(steps[0].status.state, StepStatus::Completed);
1250        assert_eq!(steps[1].kind, StepKind::Approval);
1251        assert_eq!(steps[1].status.state, StepStatus::AwaitingApproval);
1252    }
1253
1254    #[tokio::test]
1255    async fn approval_resume_completes_run() {
1256        let mut engine = create_test_engine();
1257        engine.register(SingleApprovalWorkflow).unwrap();
1258
1259        // First execution: pauses at approval
1260        let run = engine
1261            .run_handler("single-approval", TriggerKind::Manual, json!({}))
1262            .await
1263            .unwrap();
1264        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1265
1266        // Simulate approval: transition to Running
1267        engine
1268            .store()
1269            .update_run_status(run.id, RunStatus::Running)
1270            .await
1271            .unwrap();
1272
1273        // Resume: replays build, skips approval, executes deploy
1274        let resumed = engine.resume_run(run.id).await.unwrap();
1275        assert_eq!(resumed.status.state, RunStatus::Completed);
1276
1277        let steps = engine.store().list_steps(run.id).await.unwrap();
1278        assert_eq!(steps.len(), 3); // build + approval + deploy
1279        assert_eq!(steps[0].name, "build");
1280        assert_eq!(steps[0].status.state, StepStatus::Completed);
1281        assert_eq!(steps[1].name, "gate");
1282        assert_eq!(steps[1].kind, StepKind::Approval);
1283        assert_eq!(steps[1].status.state, StepStatus::Completed);
1284        assert_eq!(steps[2].name, "deploy");
1285        assert_eq!(steps[2].status.state, StepStatus::Completed);
1286    }
1287
1288    #[tokio::test]
1289    async fn double_approval_two_resumes() {
1290        let mut engine = create_test_engine();
1291        engine.register(DoubleApprovalWorkflow).unwrap();
1292
1293        // First execution: pauses at staging-gate
1294        let run = engine
1295            .run_handler("double-approval", TriggerKind::Manual, json!({}))
1296            .await
1297            .unwrap();
1298        assert_eq!(run.status.state, RunStatus::AwaitingApproval);
1299
1300        let steps = engine.store().list_steps(run.id).await.unwrap();
1301        assert_eq!(steps.len(), 2); // build + staging-gate
1302
1303        // First approval
1304        engine
1305            .store()
1306            .update_run_status(run.id, RunStatus::Running)
1307            .await
1308            .unwrap();
1309
1310        let resumed = engine.resume_run(run.id).await.unwrap();
1311        assert_eq!(resumed.status.state, RunStatus::AwaitingApproval);
1312
1313        let steps = engine.store().list_steps(run.id).await.unwrap();
1314        assert_eq!(steps.len(), 4); // build + staging-gate + deploy-staging + prod-gate
1315
1316        // Second approval
1317        engine
1318            .store()
1319            .update_run_status(run.id, RunStatus::Running)
1320            .await
1321            .unwrap();
1322
1323        let final_run = engine.resume_run(run.id).await.unwrap();
1324        assert_eq!(final_run.status.state, RunStatus::Completed);
1325
1326        let steps = engine.store().list_steps(run.id).await.unwrap();
1327        assert_eq!(steps.len(), 5);
1328        assert_eq!(steps[0].name, "build");
1329        assert_eq!(steps[1].name, "staging-gate");
1330        assert_eq!(steps[2].name, "deploy-staging");
1331        assert_eq!(steps[3].name, "prod-gate");
1332        assert_eq!(steps[4].name, "deploy-prod");
1333
1334        for step in &steps {
1335            assert_eq!(step.status.state, StepStatus::Completed);
1336        }
1337    }
1338}