Skip to main content

ironflow_engine/executor/
mod.rs

1//! Step executor — reconstructs operations from configs and runs them.
2//!
3//! Each step type (shell, HTTP, agent) has its own executor implementing
4//! the [`StepExecutor`] trait. The [`execute_step_config`] function dispatches
5//! to the appropriate executor based on the [`StepConfig`] variant.
6
7mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::{AgentProvider, DebugMessage};
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22
23pub use agent::AgentExecutor;
24pub use http::HttpExecutor;
25pub use shell::ShellExecutor;
26
27/// Result of executing a single step.
28#[derive(Debug, Clone)]
29pub struct StepOutput {
30    /// Serialized output (stdout for shell, body for http, value for agent).
31    pub output: Value,
32    /// Wall-clock duration in milliseconds.
33    pub duration_ms: u64,
34    /// Cost in USD (agent steps only).
35    pub cost_usd: Decimal,
36    /// Input token count (agent steps only).
37    pub input_tokens: Option<u64>,
38    /// Output token count (agent steps only).
39    pub output_tokens: Option<u64>,
40    /// Conversation trace from verbose agent invocations.
41    pub debug_messages: Option<Vec<DebugMessage>>,
42}
43
44impl StepOutput {
45    /// Serialize debug messages to a JSON [`Value`] for store persistence.
46    ///
47    /// Returns `None` when verbose mode was off (no messages captured).
48    pub fn debug_messages_json(&self) -> Option<Value> {
49        self.debug_messages
50            .as_ref()
51            .and_then(|msgs| serde_json::to_value(msgs).ok())
52    }
53}
54
55/// Result of a single step within a [`parallel`](crate::context::WorkflowContext::parallel) batch.
56#[derive(Debug, Clone)]
57pub struct ParallelStepResult {
58    /// The step name (same as provided to `parallel()`).
59    pub name: String,
60    /// The step execution output.
61    pub output: StepOutput,
62    /// The step ID in the store (for dependency tracking).
63    pub step_id: Uuid,
64}
65
66/// Trait for step executors.
67///
68/// Each step type implements this trait to execute its specific operation
69/// and return a [`StepOutput`].
70pub trait StepExecutor: Send + Sync {
71    /// Execute the step and return structured output.
72    ///
73    /// # Errors
74    ///
75    /// Returns [`EngineError`] if the operation fails.
76    fn execute(
77        &self,
78        provider: &Arc<dyn AgentProvider>,
79    ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
80}
81
82/// Execute a [`StepConfig`] and return structured output.
83///
84/// # Errors
85///
86/// Returns [`EngineError::Operation`] if the operation fails.
87///
88/// # Examples
89///
90/// ```no_run
91/// use ironflow_engine::config::{StepConfig, ShellConfig};
92/// use ironflow_engine::executor::execute_step_config;
93/// use ironflow_core::provider::AgentProvider;
94/// use ironflow_core::providers::claude::ClaudeCodeProvider;
95/// use std::sync::Arc;
96///
97/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
98/// let provider: Arc<dyn AgentProvider> = Arc::new(ClaudeCodeProvider::new());
99/// let config = StepConfig::Shell(ShellConfig::new("echo hello"));
100/// let output = execute_step_config(&config, &provider).await?;
101/// # Ok(())
102/// # }
103/// ```
104pub async fn execute_step_config(
105    config: &StepConfig,
106    provider: &Arc<dyn AgentProvider>,
107) -> Result<StepOutput, EngineError> {
108    let _kind = match config {
109        StepConfig::Shell(_) => "shell",
110        StepConfig::Http(_) => "http",
111        StepConfig::Agent(_) => "agent",
112        StepConfig::Workflow(_) => "workflow",
113        StepConfig::Approval(_) => "approval",
114    };
115
116    let result = match config {
117        StepConfig::Shell(cfg) => ShellExecutor::new(cfg).execute(provider).await,
118        StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
119        StepConfig::Agent(cfg) => AgentExecutor::new(cfg).execute(provider).await,
120        StepConfig::Workflow(_) => Err(EngineError::StepConfig(
121            "workflow steps are executed by WorkflowContext, not the executor".to_string(),
122        )),
123        StepConfig::Approval(_) => Err(EngineError::StepConfig(
124            "approval steps are executed by WorkflowContext, not the executor".to_string(),
125        )),
126    };
127
128    #[cfg(feature = "prometheus")]
129    {
130        use ironflow_core::metric_names::{
131            STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
132        };
133        use metrics::{counter, histogram};
134        let status = if result.is_ok() {
135            STATUS_SUCCESS
136        } else {
137            STATUS_ERROR
138        };
139        counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
140        if let Ok(ref output) = result {
141            histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
142                .record(output.duration_ms as f64 / 1000.0);
143        }
144    }
145
146    result
147}