Skip to main content

ironflow_engine/executor/
mod.rs

1//! Step executor — reconstructs operations from configs and runs them.
2//!
3//! Each step type (shell, HTTP, agent) has its own executor implementing
4//! the [`StepExecutor`] trait. The [`execute_step_config`] function dispatches
5//! to the appropriate executor based on the [`StepConfig`] variant.
6
7mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::{AgentProvider, DebugMessage};
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22use crate::log_sender::StepLogSender;
23
24pub use agent::AgentExecutor;
25pub use http::HttpExecutor;
26pub use shell::ShellExecutor;
27
28/// Result of executing a single step.
29#[derive(Debug, Clone)]
30pub struct StepOutput {
31    /// Serialized output (stdout for shell, body for http, value for agent).
32    ///
33    /// For agent steps with a JSON schema, the value may not strictly conform
34    /// to the schema: Claude CLI can flatten wrapper objects with a single
35    /// array field, returning a bare array instead of `{"items": [...]}`.
36    /// Callers should handle both the expected wrapper and a bare value.
37    pub output: Value,
38    /// Wall-clock duration in milliseconds.
39    pub duration_ms: u64,
40    /// Cost in USD (agent steps only).
41    pub cost_usd: Decimal,
42    /// Input token count (agent steps only).
43    pub input_tokens: Option<u64>,
44    /// Output token count (agent steps only).
45    pub output_tokens: Option<u64>,
46    /// Model identifier used for agent steps (e.g. `"claude-sonnet-4-20250514"`).
47    pub model: Option<String>,
48    /// Conversation trace from verbose agent invocations.
49    pub debug_messages: Option<Vec<DebugMessage>>,
50}
51
52impl StepOutput {
53    /// Serialize debug messages to a JSON [`Value`] for store persistence.
54    ///
55    /// Returns `None` when verbose mode was off (no messages captured).
56    pub fn debug_messages_json(&self) -> Option<Value> {
57        self.debug_messages
58            .as_ref()
59            .and_then(|msgs| serde_json::to_value(msgs).ok())
60    }
61}
62
63/// Result of a single step within a [`parallel`](crate::context::WorkflowContext::parallel) batch.
64#[derive(Debug, Clone)]
65pub struct ParallelStepResult {
66    /// The step name (same as provided to `parallel()`).
67    pub name: String,
68    /// The step execution output.
69    pub output: StepOutput,
70    /// The step ID in the store (for dependency tracking).
71    pub step_id: Uuid,
72}
73
74/// Trait for step executors.
75///
76/// Each step type implements this trait to execute its specific operation
77/// and return a [`StepOutput`].
78pub trait StepExecutor: Send + Sync {
79    /// Execute the step and return structured output.
80    ///
81    /// # Errors
82    ///
83    /// Returns [`EngineError`] if the operation fails.
84    fn execute(
85        &self,
86        provider: &Arc<dyn AgentProvider>,
87    ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
88}
89
90/// Execute a [`StepConfig`] and return structured output.
91///
92/// When a [`StepLogSender`] is provided, executors that support streaming
93/// will emit log lines in real time (e.g. shell stdout/stderr).
94///
95/// # Errors
96///
97/// Returns [`EngineError::Operation`] if the operation fails.
98///
99/// # Examples
100///
101/// ```no_run
102/// use ironflow_engine::config::{StepConfig, ShellConfig};
103/// use ironflow_engine::executor::execute_step_config;
104/// use ironflow_core::provider::AgentProvider;
105/// use ironflow_core::providers::claude::ClaudeCodeProvider;
106/// use std::sync::Arc;
107///
108/// # async fn example() -> Result<(), ironflow_engine::error::EngineError> {
109/// let provider: Arc<dyn AgentProvider> = Arc::new(ClaudeCodeProvider::new());
110/// let config = StepConfig::Shell(ShellConfig::new("echo hello"));
111/// let output = execute_step_config(&config, &provider, None).await?;
112/// # Ok(())
113/// # }
114/// ```
115pub async fn execute_step_config(
116    config: &StepConfig,
117    provider: &Arc<dyn AgentProvider>,
118    log_sender: Option<StepLogSender>,
119) -> Result<StepOutput, EngineError> {
120    let _kind = match config {
121        StepConfig::Shell(_) => "shell",
122        StepConfig::Http(_) => "http",
123        StepConfig::Agent(_) => "agent",
124        StepConfig::Workflow(_) => "workflow",
125        StepConfig::Approval(_) => "approval",
126    };
127
128    let result = match config {
129        StepConfig::Shell(cfg) => {
130            let mut executor = ShellExecutor::new(cfg);
131            if let Some(sender) = log_sender {
132                executor = executor.with_log_sender(sender);
133            }
134            executor.execute(provider).await
135        }
136        StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
137        StepConfig::Agent(cfg) => {
138            let mut executor = AgentExecutor::new(cfg);
139            if let Some(sender) = log_sender {
140                executor = executor.with_log_sender(sender);
141            }
142            executor.execute(provider).await
143        }
144        StepConfig::Workflow(_) => Err(EngineError::StepConfig(
145            "workflow steps are executed by WorkflowContext, not the executor".to_string(),
146        )),
147        StepConfig::Approval(_) => Err(EngineError::StepConfig(
148            "approval steps are executed by WorkflowContext, not the executor".to_string(),
149        )),
150    };
151
152    #[cfg(feature = "prometheus")]
153    {
154        use ironflow_core::metric_names::{
155            STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
156        };
157        use metrics::{counter, histogram};
158        let status = if result.is_ok() {
159            STATUS_SUCCESS
160        } else {
161            STATUS_ERROR
162        };
163        counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
164        if let Ok(ref output) = result {
165            histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
166                .record(output.duration_ms as f64 / 1000.0);
167        }
168    }
169
170    result
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use ironflow_core::provider::DebugMessage;
177    use serde_json::json;
178
179    #[test]
180    fn step_output_with_no_debug_messages_returns_none() {
181        let output = StepOutput {
182            output: json!({"result": "ok"}),
183            duration_ms: 100,
184            cost_usd: rust_decimal::Decimal::ZERO,
185            input_tokens: None,
186            output_tokens: None,
187            model: None,
188            debug_messages: None,
189        };
190
191        assert_eq!(output.debug_messages_json(), None);
192    }
193
194    #[test]
195    fn step_output_with_empty_debug_messages_returns_some_empty_array() {
196        let output = StepOutput {
197            output: json!({"result": "ok"}),
198            duration_ms: 100,
199            cost_usd: rust_decimal::Decimal::ZERO,
200            input_tokens: None,
201            output_tokens: None,
202            model: None,
203            debug_messages: Some(Vec::new()),
204        };
205
206        let json_val = output.debug_messages_json();
207        assert!(json_val.is_some());
208        let arr = json_val.unwrap();
209        assert!(arr.is_array());
210        assert_eq!(arr.as_array().unwrap().len(), 0);
211    }
212
213    #[test]
214    fn step_output_debug_messages_json_serializes_messages() {
215        let json_msgs = json!([
216            {
217                "text": "Hello",
218                "thinking": null,
219                "thinking_redacted": false,
220                "tool_calls": [],
221                "tool_results": [],
222                "stop_reason": "end_turn",
223                "input_tokens": 10,
224                "output_tokens": 20
225            },
226            {
227                "text": "Hi there",
228                "thinking": null,
229                "thinking_redacted": false,
230                "tool_calls": [],
231                "tool_results": [],
232                "stop_reason": "end_turn",
233                "input_tokens": 15,
234                "output_tokens": 25
235            }
236        ]);
237
238        let messages: Vec<DebugMessage> =
239            serde_json::from_value(json_msgs.clone()).expect("deserialize debug messages");
240
241        let output = StepOutput {
242            output: json!({"result": "ok"}),
243            duration_ms: 100,
244            cost_usd: rust_decimal::Decimal::ZERO,
245            input_tokens: None,
246            output_tokens: None,
247            model: None,
248            debug_messages: Some(messages),
249        };
250
251        let json_val = output.debug_messages_json();
252        assert!(json_val.is_some());
253
254        let arr = json_val.unwrap();
255        assert!(arr.is_array());
256        let messages_array = arr.as_array().unwrap();
257        assert_eq!(messages_array.len(), 2);
258        assert_eq!(messages_array[0]["text"], "Hello");
259        assert_eq!(messages_array[1]["text"], "Hi there");
260    }
261
262    #[test]
263    fn step_output_contains_all_metrics() {
264        let output = StepOutput {
265            output: json!({"data": "test"}),
266            duration_ms: 5000,
267            cost_usd: rust_decimal::Decimal::new(123, 2),
268            input_tokens: Some(100),
269            output_tokens: Some(200),
270            model: Some("claude-sonnet".to_string()),
271            debug_messages: None,
272        };
273
274        assert_eq!(output.duration_ms, 5000);
275        assert_eq!(output.cost_usd, rust_decimal::Decimal::new(123, 2));
276        assert_eq!(output.input_tokens, Some(100));
277        assert_eq!(output.output_tokens, Some(200));
278        assert_eq!(output.model, Some("claude-sonnet".to_string()));
279    }
280
281    #[test]
282    fn step_output_default_tokens_and_model_are_none() {
283        let output = StepOutput {
284            output: json!({}),
285            duration_ms: 0,
286            cost_usd: rust_decimal::Decimal::ZERO,
287            input_tokens: None,
288            output_tokens: None,
289            model: None,
290            debug_messages: None,
291        };
292
293        assert!(output.input_tokens.is_none());
294        assert!(output.output_tokens.is_none());
295        assert!(output.model.is_none());
296    }
297
298    #[test]
299    fn parallel_step_result_contains_step_metadata() {
300        let step_id = uuid::Uuid::now_v7();
301        let output = StepOutput {
302            output: json!({"done": true}),
303            duration_ms: 1000,
304            cost_usd: rust_decimal::Decimal::ZERO,
305            input_tokens: None,
306            output_tokens: None,
307            model: None,
308            debug_messages: None,
309        };
310
311        let result = ParallelStepResult {
312            name: "build".to_string(),
313            output,
314            step_id,
315        };
316
317        assert_eq!(result.name, "build");
318        assert_eq!(result.step_id, step_id);
319        assert_eq!(result.output.duration_ms, 1000);
320    }
321
322    #[test]
323    fn step_output_serializes_complex_json_output() {
324        let complex_output = json!({
325            "status": "success",
326            "data": {
327                "items": [1, 2, 3],
328                "nested": {
329                    "key": "value"
330                }
331            }
332        });
333
334        let output = StepOutput {
335            output: complex_output.clone(),
336            duration_ms: 100,
337            cost_usd: rust_decimal::Decimal::ZERO,
338            input_tokens: None,
339            output_tokens: None,
340            model: None,
341            debug_messages: None,
342        };
343
344        assert_eq!(output.output, complex_output);
345        assert_eq!(output.output["status"], "success");
346        assert_eq!(output.output["data"]["items"][0], 1);
347        assert_eq!(output.output["data"]["nested"]["key"], "value");
348    }
349}