ironflow_engine/executor/mod.rs
1//! Step executor — reconstructs operations from configs and runs them.
2//!
3//! Each step type (shell, HTTP, agent) has its own executor implementing
4//! the [`StepExecutor`] trait. The [`execute_step_config`] function dispatches
5//! to the appropriate executor based on the [`StepConfig`] variant.
6
7mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::{AgentProvider, DebugMessage};
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22use crate::log_sender::StepLogSender;
23
24pub use agent::AgentExecutor;
25pub use http::HttpExecutor;
26pub use shell::ShellExecutor;
27
28/// Result of executing a single step.
29#[derive(Debug, Clone)]
30pub struct StepOutput {
31 /// Serialized output (stdout for shell, body for http, value for agent).
32 ///
33 /// For agent steps with a JSON schema, the value may not strictly conform
34 /// to the schema: Claude CLI can flatten wrapper objects with a single
35 /// array field, returning a bare array instead of `{"items": [...]}`.
36 /// Callers should handle both the expected wrapper and a bare value.
37 pub output: Value,
38 /// Wall-clock duration in milliseconds.
39 pub duration_ms: u64,
40 /// Cost in USD (agent steps only).
41 pub cost_usd: Decimal,
42 /// Input token count (agent steps only).
43 pub input_tokens: Option<u64>,
44 /// Output token count (agent steps only).
45 pub output_tokens: Option<u64>,
46 /// Model identifier used for agent steps (e.g. `"claude-sonnet-4-20250514"`).
47 pub model: Option<String>,
48 /// Conversation trace from verbose agent invocations.
49 pub debug_messages: Option<Vec<DebugMessage>>,
50}
51
52impl StepOutput {
53 /// Serialize debug messages to a JSON [`Value`] for store persistence.
54 ///
55 /// Returns `None` when verbose mode was off (no messages captured).
56 pub fn debug_messages_json(&self) -> Option<Value> {
57 self.debug_messages
58 .as_ref()
59 .and_then(|msgs| serde_json::to_value(msgs).ok())
60 }
61}
62
63/// Result of a single step within a [`parallel`](crate::context::WorkflowContext::parallel) batch.
64#[derive(Debug, Clone)]
65pub struct ParallelStepResult {
66 /// The step name (same as provided to `parallel()`).
67 pub name: String,
68 /// The step execution output.
69 pub output: StepOutput,
70 /// The step ID in the store (for dependency tracking).
71 pub step_id: Uuid,
72}
73
74/// Trait for step executors.
75///
76/// Each step type implements this trait to execute its specific operation
77/// and return a [`StepOutput`].
78pub trait StepExecutor: Send + Sync {
79 /// Execute the step and return structured output.
80 ///
81 /// # Errors
82 ///
83 /// Returns [`EngineError`] if the operation fails.
84 fn execute(
85 &self,
86 provider: &Arc<dyn AgentProvider>,
87 ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
88}
89
90/// Execute a [`StepConfig`] and return structured output.
91///
92/// When a [`StepLogSender`] is provided, executors that support streaming
93/// will emit log lines in real time (e.g. shell stdout/stderr).
94///
95/// # Errors
96///
97/// Returns [`EngineError::Operation`] if the operation fails.
98///
99/// # Examples
100///
101/// ```no_run
102/// use ironflow_engine::config::{StepConfig, ShellConfig};
103/// use ironflow_engine::executor::execute_step_config;
104/// use ironflow_core::provider::AgentProvider;
105/// use ironflow_core::providers::claude::ClaudeCodeProvider;
106/// use std::sync::Arc;
107///
108/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
109/// let provider: Arc<dyn AgentProvider> = Arc::new(ClaudeCodeProvider::new());
110/// let config = StepConfig::Shell(ShellConfig::new("echo hello"));
111/// let output = execute_step_config(&config, &provider, None).await?;
112/// # Ok(())
113/// # }
114/// ```
115pub async fn execute_step_config(
116 config: &StepConfig,
117 provider: &Arc<dyn AgentProvider>,
118 log_sender: Option<StepLogSender>,
119) -> Result<StepOutput, EngineError> {
120 let _kind = match config {
121 StepConfig::Shell(_) => "shell",
122 StepConfig::Http(_) => "http",
123 StepConfig::Agent(_) => "agent",
124 StepConfig::Workflow(_) => "workflow",
125 StepConfig::Approval(_) => "approval",
126 };
127
128 let result = match config {
129 StepConfig::Shell(cfg) => {
130 let mut executor = ShellExecutor::new(cfg);
131 if let Some(sender) = log_sender {
132 executor = executor.with_log_sender(sender);
133 }
134 executor.execute(provider).await
135 }
136 StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
137 StepConfig::Agent(cfg) => {
138 let mut executor = AgentExecutor::new(cfg);
139 if let Some(sender) = log_sender {
140 executor = executor.with_log_sender(sender);
141 }
142 executor.execute(provider).await
143 }
144 StepConfig::Workflow(_) => Err(EngineError::StepConfig(
145 "workflow steps are executed by WorkflowContext, not the executor".to_string(),
146 )),
147 StepConfig::Approval(_) => Err(EngineError::StepConfig(
148 "approval steps are executed by WorkflowContext, not the executor".to_string(),
149 )),
150 };
151
152 #[cfg(feature = "prometheus")]
153 {
154 use ironflow_core::metric_names::{
155 STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
156 };
157 use metrics::{counter, histogram};
158 let status = if result.is_ok() {
159 STATUS_SUCCESS
160 } else {
161 STATUS_ERROR
162 };
163 counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
164 if let Ok(ref output) = result {
165 histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
166 .record(output.duration_ms as f64 / 1000.0);
167 }
168 }
169
170 result
171}