mod agent;
mod http;
mod shell;
use std::future::Future;
use std::sync::Arc;
use rust_decimal::Decimal;
use serde_json::Value;
use uuid::Uuid;
use ironflow_core::provider::{AgentProvider, DebugMessage};
use crate::config::StepConfig;
use crate::error::EngineError;
pub use agent::AgentExecutor;
pub use http::HttpExecutor;
pub use shell::ShellExecutor;
#[derive(Debug, Clone)]
pub struct StepOutput {
pub output: Value,
pub duration_ms: u64,
pub cost_usd: Decimal,
pub input_tokens: Option<u64>,
pub output_tokens: Option<u64>,
pub debug_messages: Option<Vec<DebugMessage>>,
}
impl StepOutput {
pub fn debug_messages_json(&self) -> Option<Value> {
self.debug_messages
.as_ref()
.and_then(|msgs| serde_json::to_value(msgs).ok())
}
}
#[derive(Debug, Clone)]
pub struct ParallelStepResult {
pub name: String,
pub output: StepOutput,
pub step_id: Uuid,
}
pub trait StepExecutor: Send + Sync {
fn execute(
&self,
provider: &Arc<dyn AgentProvider>,
) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
}
pub async fn execute_step_config(
config: &StepConfig,
provider: &Arc<dyn AgentProvider>,
) -> Result<StepOutput, EngineError> {
let _kind = match config {
StepConfig::Shell(_) => "shell",
StepConfig::Http(_) => "http",
StepConfig::Agent(_) => "agent",
StepConfig::Workflow(_) => "workflow",
StepConfig::Approval(_) => "approval",
};
let result = match config {
StepConfig::Shell(cfg) => ShellExecutor::new(cfg).execute(provider).await,
StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
StepConfig::Agent(cfg) => AgentExecutor::new(cfg).execute(provider).await,
StepConfig::Workflow(_) => Err(EngineError::StepConfig(
"workflow steps are executed by WorkflowContext, not the executor".to_string(),
)),
StepConfig::Approval(_) => Err(EngineError::StepConfig(
"approval steps are executed by WorkflowContext, not the executor".to_string(),
)),
};
#[cfg(feature = "prometheus")]
{
use ironflow_core::metric_names::{
STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
};
use metrics::{counter, histogram};
let status = if result.is_ok() {
STATUS_SUCCESS
} else {
STATUS_ERROR
};
counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
if let Ok(ref output) = result {
histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
.record(output.duration_ms as f64 / 1000.0);
}
}
result
}