Skip to main content

ironflow_engine/executor/
mod.rs

1//! Step executor — reconstructs operations from configs and runs them.
2//!
3//! Each step type (shell, HTTP, agent) has its own executor implementing
4//! the [`StepExecutor`] trait. The [`execute_step_config`] function dispatches
5//! to the appropriate executor based on the [`StepConfig`] variant.
6
7mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::{AgentProvider, DebugMessage};
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22use crate::log_sender::StepLogSender;
23
24pub use agent::AgentExecutor;
25pub use http::HttpExecutor;
26pub use shell::ShellExecutor;
27
28/// Result of executing a single step.
29#[derive(Debug, Clone)]
30pub struct StepOutput {
31    /// Serialized output (stdout for shell, body for http, value for agent).
32    pub output: Value,
33    /// Wall-clock duration in milliseconds.
34    pub duration_ms: u64,
35    /// Cost in USD (agent steps only).
36    pub cost_usd: Decimal,
37    /// Input token count (agent steps only).
38    pub input_tokens: Option<u64>,
39    /// Output token count (agent steps only).
40    pub output_tokens: Option<u64>,
41    /// Conversation trace from verbose agent invocations.
42    pub debug_messages: Option<Vec<DebugMessage>>,
43}
44
45impl StepOutput {
46    /// Serialize debug messages to a JSON [`Value`] for store persistence.
47    ///
48    /// Returns `None` when verbose mode was off (no messages captured).
49    pub fn debug_messages_json(&self) -> Option<Value> {
50        self.debug_messages
51            .as_ref()
52            .and_then(|msgs| serde_json::to_value(msgs).ok())
53    }
54}
55
56/// Result of a single step within a [`parallel`](crate::context::WorkflowContext::parallel) batch.
57#[derive(Debug, Clone)]
58pub struct ParallelStepResult {
59    /// The step name (same as provided to `parallel()`).
60    pub name: String,
61    /// The step execution output.
62    pub output: StepOutput,
63    /// The step ID in the store (for dependency tracking).
64    pub step_id: Uuid,
65}
66
67/// Trait for step executors.
68///
69/// Each step type implements this trait to execute its specific operation
70/// and return a [`StepOutput`].
71pub trait StepExecutor: Send + Sync {
72    /// Execute the step and return structured output.
73    ///
74    /// # Errors
75    ///
76    /// Returns [`EngineError`] if the operation fails.
77    fn execute(
78        &self,
79        provider: &Arc<dyn AgentProvider>,
80    ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
81}
82
83/// Execute a [`StepConfig`] and return structured output.
84///
85/// When a [`StepLogSender`] is provided, executors that support streaming
86/// will emit log lines in real time (e.g. shell stdout/stderr).
87///
88/// # Errors
89///
90/// Returns [`EngineError::Operation`] if the operation fails.
91///
92/// # Examples
93///
94/// ```no_run
95/// use ironflow_engine::config::{StepConfig, ShellConfig};
96/// use ironflow_engine::executor::execute_step_config;
97/// use ironflow_core::provider::AgentProvider;
98/// use ironflow_core::providers::claude::ClaudeCodeProvider;
99/// use std::sync::Arc;
100///
101/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
102/// let provider: Arc<dyn AgentProvider> = Arc::new(ClaudeCodeProvider::new());
103/// let config = StepConfig::Shell(ShellConfig::new("echo hello"));
104/// let output = execute_step_config(&config, &provider, None).await?;
105/// # Ok(())
106/// # }
107/// ```
108pub async fn execute_step_config(
109    config: &StepConfig,
110    provider: &Arc<dyn AgentProvider>,
111    log_sender: Option<StepLogSender>,
112) -> Result<StepOutput, EngineError> {
113    let _kind = match config {
114        StepConfig::Shell(_) => "shell",
115        StepConfig::Http(_) => "http",
116        StepConfig::Agent(_) => "agent",
117        StepConfig::Workflow(_) => "workflow",
118        StepConfig::Approval(_) => "approval",
119    };
120
121    let result = match config {
122        StepConfig::Shell(cfg) => {
123            let mut executor = ShellExecutor::new(cfg);
124            if let Some(sender) = log_sender {
125                executor = executor.with_log_sender(sender);
126            }
127            executor.execute(provider).await
128        }
129        StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
130        StepConfig::Agent(cfg) => {
131            let mut executor = AgentExecutor::new(cfg);
132            if let Some(sender) = log_sender {
133                executor = executor.with_log_sender(sender);
134            }
135            executor.execute(provider).await
136        }
137        StepConfig::Workflow(_) => Err(EngineError::StepConfig(
138            "workflow steps are executed by WorkflowContext, not the executor".to_string(),
139        )),
140        StepConfig::Approval(_) => Err(EngineError::StepConfig(
141            "approval steps are executed by WorkflowContext, not the executor".to_string(),
142        )),
143    };
144
145    #[cfg(feature = "prometheus")]
146    {
147        use ironflow_core::metric_names::{
148            STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
149        };
150        use metrics::{counter, histogram};
151        let status = if result.is_ok() {
152            STATUS_SUCCESS
153        } else {
154            STATUS_ERROR
155        };
156        counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
157        if let Ok(ref output) = result {
158            histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
159                .record(output.duration_ms as f64 / 1000.0);
160        }
161    }
162
163    result
164}