ironflow_engine/executor/
mod.rs1mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::{AgentProvider, DebugMessage};
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22
23pub use agent::AgentExecutor;
24pub use http::HttpExecutor;
25pub use shell::ShellExecutor;
26
27#[derive(Debug, Clone)]
29pub struct StepOutput {
30 pub output: Value,
32 pub duration_ms: u64,
34 pub cost_usd: Decimal,
36 pub input_tokens: Option<u64>,
38 pub output_tokens: Option<u64>,
40 pub debug_messages: Option<Vec<DebugMessage>>,
42}
43
44impl StepOutput {
45 pub fn debug_messages_json(&self) -> Option<Value> {
49 self.debug_messages
50 .as_ref()
51 .and_then(|msgs| serde_json::to_value(msgs).ok())
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct ParallelStepResult {
58 pub name: String,
60 pub output: StepOutput,
62 pub step_id: Uuid,
64}
65
66pub trait StepExecutor: Send + Sync {
71 fn execute(
77 &self,
78 provider: &Arc<dyn AgentProvider>,
79 ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
80}
81
82pub async fn execute_step_config(
105 config: &StepConfig,
106 provider: &Arc<dyn AgentProvider>,
107) -> Result<StepOutput, EngineError> {
108 let _kind = match config {
109 StepConfig::Shell(_) => "shell",
110 StepConfig::Http(_) => "http",
111 StepConfig::Agent(_) => "agent",
112 StepConfig::Workflow(_) => "workflow",
113 StepConfig::Approval(_) => "approval",
114 };
115
116 let result = match config {
117 StepConfig::Shell(cfg) => ShellExecutor::new(cfg).execute(provider).await,
118 StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
119 StepConfig::Agent(cfg) => AgentExecutor::new(cfg).execute(provider).await,
120 StepConfig::Workflow(_) => Err(EngineError::StepConfig(
121 "workflow steps are executed by WorkflowContext, not the executor".to_string(),
122 )),
123 StepConfig::Approval(_) => Err(EngineError::StepConfig(
124 "approval steps are executed by WorkflowContext, not the executor".to_string(),
125 )),
126 };
127
128 #[cfg(feature = "prometheus")]
129 {
130 use ironflow_core::metric_names::{
131 STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
132 };
133 use metrics::{counter, histogram};
134 let status = if result.is_ok() {
135 STATUS_SUCCESS
136 } else {
137 STATUS_ERROR
138 };
139 counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
140 if let Ok(ref output) = result {
141 histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
142 .record(output.duration_ms as f64 / 1000.0);
143 }
144 }
145
146 result
147}