ironflow_engine/executor/
mod.rs1mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::{AgentProvider, DebugMessage};
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22use crate::log_sender::StepLogSender;
23
24pub use agent::AgentExecutor;
25pub use http::HttpExecutor;
26pub use shell::ShellExecutor;
27
28#[derive(Debug, Clone)]
30pub struct StepOutput {
31 pub output: Value,
33 pub duration_ms: u64,
35 pub cost_usd: Decimal,
37 pub input_tokens: Option<u64>,
39 pub output_tokens: Option<u64>,
41 pub debug_messages: Option<Vec<DebugMessage>>,
43}
44
45impl StepOutput {
46 pub fn debug_messages_json(&self) -> Option<Value> {
50 self.debug_messages
51 .as_ref()
52 .and_then(|msgs| serde_json::to_value(msgs).ok())
53 }
54}
55
56#[derive(Debug, Clone)]
58pub struct ParallelStepResult {
59 pub name: String,
61 pub output: StepOutput,
63 pub step_id: Uuid,
65}
66
67pub trait StepExecutor: Send + Sync {
72 fn execute(
78 &self,
79 provider: &Arc<dyn AgentProvider>,
80 ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
81}
82
83pub async fn execute_step_config(
109 config: &StepConfig,
110 provider: &Arc<dyn AgentProvider>,
111 log_sender: Option<StepLogSender>,
112) -> Result<StepOutput, EngineError> {
113 let _kind = match config {
114 StepConfig::Shell(_) => "shell",
115 StepConfig::Http(_) => "http",
116 StepConfig::Agent(_) => "agent",
117 StepConfig::Workflow(_) => "workflow",
118 StepConfig::Approval(_) => "approval",
119 };
120
121 let result = match config {
122 StepConfig::Shell(cfg) => {
123 let mut executor = ShellExecutor::new(cfg);
124 if let Some(sender) = log_sender {
125 executor = executor.with_log_sender(sender);
126 }
127 executor.execute(provider).await
128 }
129 StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
130 StepConfig::Agent(cfg) => {
131 let mut executor = AgentExecutor::new(cfg);
132 if let Some(sender) = log_sender {
133 executor = executor.with_log_sender(sender);
134 }
135 executor.execute(provider).await
136 }
137 StepConfig::Workflow(_) => Err(EngineError::StepConfig(
138 "workflow steps are executed by WorkflowContext, not the executor".to_string(),
139 )),
140 StepConfig::Approval(_) => Err(EngineError::StepConfig(
141 "approval steps are executed by WorkflowContext, not the executor".to_string(),
142 )),
143 };
144
145 #[cfg(feature = "prometheus")]
146 {
147 use ironflow_core::metric_names::{
148 STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
149 };
150 use metrics::{counter, histogram};
151 let status = if result.is_ok() {
152 STATUS_SUCCESS
153 } else {
154 STATUS_ERROR
155 };
156 counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
157 if let Ok(ref output) = result {
158 histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
159 .record(output.duration_ms as f64 / 1000.0);
160 }
161 }
162
163 result
164}