ironflow_engine/executor/
mod.rs1mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::{AgentProvider, DebugMessage};
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22use crate::log_sender::StepLogSender;
23
24pub use agent::AgentExecutor;
25pub use http::HttpExecutor;
26pub use shell::ShellExecutor;
27
28#[derive(Debug, Clone)]
30pub struct StepOutput {
31 pub output: Value,
38 pub duration_ms: u64,
40 pub cost_usd: Decimal,
42 pub input_tokens: Option<u64>,
44 pub output_tokens: Option<u64>,
46 pub model: Option<String>,
48 pub debug_messages: Option<Vec<DebugMessage>>,
50}
51
52impl StepOutput {
53 pub fn debug_messages_json(&self) -> Option<Value> {
57 self.debug_messages
58 .as_ref()
59 .and_then(|msgs| serde_json::to_value(msgs).ok())
60 }
61}
62
63#[derive(Debug, Clone)]
65pub struct ParallelStepResult {
66 pub name: String,
68 pub output: StepOutput,
70 pub step_id: Uuid,
72}
73
74pub trait StepExecutor: Send + Sync {
79 fn execute(
85 &self,
86 provider: &Arc<dyn AgentProvider>,
87 ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
88}
89
90pub async fn execute_step_config(
116 config: &StepConfig,
117 provider: &Arc<dyn AgentProvider>,
118 log_sender: Option<StepLogSender>,
119) -> Result<StepOutput, EngineError> {
120 let _kind = match config {
121 StepConfig::Shell(_) => "shell",
122 StepConfig::Http(_) => "http",
123 StepConfig::Agent(_) => "agent",
124 StepConfig::Workflow(_) => "workflow",
125 StepConfig::Approval(_) => "approval",
126 };
127
128 let result = match config {
129 StepConfig::Shell(cfg) => {
130 let mut executor = ShellExecutor::new(cfg);
131 if let Some(sender) = log_sender {
132 executor = executor.with_log_sender(sender);
133 }
134 executor.execute(provider).await
135 }
136 StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
137 StepConfig::Agent(cfg) => {
138 let mut executor = AgentExecutor::new(cfg);
139 if let Some(sender) = log_sender {
140 executor = executor.with_log_sender(sender);
141 }
142 executor.execute(provider).await
143 }
144 StepConfig::Workflow(_) => Err(EngineError::StepConfig(
145 "workflow steps are executed by WorkflowContext, not the executor".to_string(),
146 )),
147 StepConfig::Approval(_) => Err(EngineError::StepConfig(
148 "approval steps are executed by WorkflowContext, not the executor".to_string(),
149 )),
150 };
151
152 #[cfg(feature = "prometheus")]
153 {
154 use ironflow_core::metric_names::{
155 STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
156 };
157 use metrics::{counter, histogram};
158 let status = if result.is_ok() {
159 STATUS_SUCCESS
160 } else {
161 STATUS_ERROR
162 };
163 counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
164 if let Ok(ref output) = result {
165 histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
166 .record(output.duration_ms as f64 / 1000.0);
167 }
168 }
169
170 result
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use ironflow_core::provider::DebugMessage;
177 use serde_json::json;
178
179 #[test]
180 fn step_output_with_no_debug_messages_returns_none() {
181 let output = StepOutput {
182 output: json!({"result": "ok"}),
183 duration_ms: 100,
184 cost_usd: rust_decimal::Decimal::ZERO,
185 input_tokens: None,
186 output_tokens: None,
187 model: None,
188 debug_messages: None,
189 };
190
191 assert_eq!(output.debug_messages_json(), None);
192 }
193
194 #[test]
195 fn step_output_with_empty_debug_messages_returns_some_empty_array() {
196 let output = StepOutput {
197 output: json!({"result": "ok"}),
198 duration_ms: 100,
199 cost_usd: rust_decimal::Decimal::ZERO,
200 input_tokens: None,
201 output_tokens: None,
202 model: None,
203 debug_messages: Some(Vec::new()),
204 };
205
206 let json_val = output.debug_messages_json();
207 assert!(json_val.is_some());
208 let arr = json_val.unwrap();
209 assert!(arr.is_array());
210 assert_eq!(arr.as_array().unwrap().len(), 0);
211 }
212
213 #[test]
214 fn step_output_debug_messages_json_serializes_messages() {
215 let json_msgs = json!([
216 {
217 "text": "Hello",
218 "thinking": null,
219 "thinking_redacted": false,
220 "tool_calls": [],
221 "tool_results": [],
222 "stop_reason": "end_turn",
223 "input_tokens": 10,
224 "output_tokens": 20
225 },
226 {
227 "text": "Hi there",
228 "thinking": null,
229 "thinking_redacted": false,
230 "tool_calls": [],
231 "tool_results": [],
232 "stop_reason": "end_turn",
233 "input_tokens": 15,
234 "output_tokens": 25
235 }
236 ]);
237
238 let messages: Vec<DebugMessage> =
239 serde_json::from_value(json_msgs.clone()).expect("deserialize debug messages");
240
241 let output = StepOutput {
242 output: json!({"result": "ok"}),
243 duration_ms: 100,
244 cost_usd: rust_decimal::Decimal::ZERO,
245 input_tokens: None,
246 output_tokens: None,
247 model: None,
248 debug_messages: Some(messages),
249 };
250
251 let json_val = output.debug_messages_json();
252 assert!(json_val.is_some());
253
254 let arr = json_val.unwrap();
255 assert!(arr.is_array());
256 let messages_array = arr.as_array().unwrap();
257 assert_eq!(messages_array.len(), 2);
258 assert_eq!(messages_array[0]["text"], "Hello");
259 assert_eq!(messages_array[1]["text"], "Hi there");
260 }
261
262 #[test]
263 fn step_output_contains_all_metrics() {
264 let output = StepOutput {
265 output: json!({"data": "test"}),
266 duration_ms: 5000,
267 cost_usd: rust_decimal::Decimal::new(123, 2),
268 input_tokens: Some(100),
269 output_tokens: Some(200),
270 model: Some("claude-sonnet".to_string()),
271 debug_messages: None,
272 };
273
274 assert_eq!(output.duration_ms, 5000);
275 assert_eq!(output.cost_usd, rust_decimal::Decimal::new(123, 2));
276 assert_eq!(output.input_tokens, Some(100));
277 assert_eq!(output.output_tokens, Some(200));
278 assert_eq!(output.model, Some("claude-sonnet".to_string()));
279 }
280
281 #[test]
282 fn step_output_default_tokens_and_model_are_none() {
283 let output = StepOutput {
284 output: json!({}),
285 duration_ms: 0,
286 cost_usd: rust_decimal::Decimal::ZERO,
287 input_tokens: None,
288 output_tokens: None,
289 model: None,
290 debug_messages: None,
291 };
292
293 assert!(output.input_tokens.is_none());
294 assert!(output.output_tokens.is_none());
295 assert!(output.model.is_none());
296 }
297
298 #[test]
299 fn parallel_step_result_contains_step_metadata() {
300 let step_id = uuid::Uuid::now_v7();
301 let output = StepOutput {
302 output: json!({"done": true}),
303 duration_ms: 1000,
304 cost_usd: rust_decimal::Decimal::ZERO,
305 input_tokens: None,
306 output_tokens: None,
307 model: None,
308 debug_messages: None,
309 };
310
311 let result = ParallelStepResult {
312 name: "build".to_string(),
313 output,
314 step_id,
315 };
316
317 assert_eq!(result.name, "build");
318 assert_eq!(result.step_id, step_id);
319 assert_eq!(result.output.duration_ms, 1000);
320 }
321
322 #[test]
323 fn step_output_serializes_complex_json_output() {
324 let complex_output = json!({
325 "status": "success",
326 "data": {
327 "items": [1, 2, 3],
328 "nested": {
329 "key": "value"
330 }
331 }
332 });
333
334 let output = StepOutput {
335 output: complex_output.clone(),
336 duration_ms: 100,
337 cost_usd: rust_decimal::Decimal::ZERO,
338 input_tokens: None,
339 output_tokens: None,
340 model: None,
341 debug_messages: None,
342 };
343
344 assert_eq!(output.output, complex_output);
345 assert_eq!(output.output["status"], "success");
346 assert_eq!(output.output["data"]["items"][0], 1);
347 assert_eq!(output.output["data"]["nested"]["key"], "value");
348 }
349}