ironflow_engine/executor/
mod.rs1mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::{AgentProvider, DebugMessage};
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22
23pub use agent::AgentExecutor;
24pub use http::HttpExecutor;
25pub use shell::ShellExecutor;
26
27#[derive(Debug, Clone)]
29pub struct StepOutput {
30 pub output: Value,
32 pub duration_ms: u64,
34 pub cost_usd: Decimal,
36 pub input_tokens: Option<u64>,
38 pub output_tokens: Option<u64>,
40 pub debug_messages: Option<Vec<DebugMessage>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ParallelStepResult {
47 pub name: String,
49 pub output: StepOutput,
51 pub step_id: Uuid,
53}
54
55pub trait StepExecutor: Send + Sync {
60 fn execute(
66 &self,
67 provider: &Arc<dyn AgentProvider>,
68 ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
69}
70
71pub async fn execute_step_config(
94 config: &StepConfig,
95 provider: &Arc<dyn AgentProvider>,
96) -> Result<StepOutput, EngineError> {
97 let _kind = match config {
98 StepConfig::Shell(_) => "shell",
99 StepConfig::Http(_) => "http",
100 StepConfig::Agent(_) => "agent",
101 StepConfig::Workflow(_) => "workflow",
102 StepConfig::Approval(_) => "approval",
103 };
104
105 let result = match config {
106 StepConfig::Shell(cfg) => ShellExecutor::new(cfg).execute(provider).await,
107 StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
108 StepConfig::Agent(cfg) => AgentExecutor::new(cfg).execute(provider).await,
109 StepConfig::Workflow(_) => Err(EngineError::StepConfig(
110 "workflow steps are executed by WorkflowContext, not the executor".to_string(),
111 )),
112 StepConfig::Approval(_) => Err(EngineError::StepConfig(
113 "approval steps are executed by WorkflowContext, not the executor".to_string(),
114 )),
115 };
116
117 #[cfg(feature = "prometheus")]
118 {
119 use ironflow_core::metric_names::{
120 STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
121 };
122 use metrics::{counter, histogram};
123 let status = if result.is_ok() {
124 STATUS_SUCCESS
125 } else {
126 STATUS_ERROR
127 };
128 counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
129 if let Ok(ref output) = result {
130 histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
131 .record(output.duration_ms as f64 / 1000.0);
132 }
133 }
134
135 result
136}