ironflow_engine/executor/
mod.rs1mod agent;
8mod http;
9mod shell;
10
11use std::future::Future;
12use std::sync::Arc;
13
14use rust_decimal::Decimal;
15use serde_json::Value;
16use uuid::Uuid;
17
18use ironflow_core::provider::AgentProvider;
19
20use crate::config::StepConfig;
21use crate::error::EngineError;
22
23pub use agent::AgentExecutor;
24pub use http::HttpExecutor;
25pub use shell::ShellExecutor;
26
27#[derive(Debug, Clone)]
29pub struct StepOutput {
30 pub output: Value,
32 pub duration_ms: u64,
34 pub cost_usd: Decimal,
36 pub input_tokens: Option<u64>,
38 pub output_tokens: Option<u64>,
40}
41
42#[derive(Debug, Clone)]
44pub struct ParallelStepResult {
45 pub name: String,
47 pub output: StepOutput,
49 pub step_id: Uuid,
51}
52
53pub trait StepExecutor: Send + Sync {
58 fn execute(
64 &self,
65 provider: &Arc<dyn AgentProvider>,
66 ) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
67}
68
69pub async fn execute_step_config(
92 config: &StepConfig,
93 provider: &Arc<dyn AgentProvider>,
94) -> Result<StepOutput, EngineError> {
95 let _kind = match config {
96 StepConfig::Shell(_) => "shell",
97 StepConfig::Http(_) => "http",
98 StepConfig::Agent(_) => "agent",
99 StepConfig::Workflow(_) => "workflow",
100 StepConfig::Approval(_) => "approval",
101 };
102
103 let result = match config {
104 StepConfig::Shell(cfg) => ShellExecutor::new(cfg).execute(provider).await,
105 StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
106 StepConfig::Agent(cfg) => AgentExecutor::new(cfg).execute(provider).await,
107 StepConfig::Workflow(_) => Err(EngineError::StepConfig(
108 "workflow steps are executed by WorkflowContext, not the executor".to_string(),
109 )),
110 StepConfig::Approval(_) => Err(EngineError::StepConfig(
111 "approval steps are executed by WorkflowContext, not the executor".to_string(),
112 )),
113 };
114
115 #[cfg(feature = "prometheus")]
116 {
117 use ironflow_core::metric_names::{
118 STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
119 };
120 use metrics::{counter, histogram};
121 let status = if result.is_ok() {
122 STATUS_SUCCESS
123 } else {
124 STATUS_ERROR
125 };
126 counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
127 if let Ok(ref output) = result {
128 histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
129 .record(output.duration_ms as f64 / 1000.0);
130 }
131 }
132
133 result
134}