mod agent;
mod http;
mod shell;
use std::future::Future;
use std::sync::Arc;
use rust_decimal::Decimal;
use serde_json::Value;
use uuid::Uuid;
use ironflow_core::provider::{AgentProvider, DebugMessage};
use crate::config::StepConfig;
use crate::error::EngineError;
use crate::log_sender::StepLogSender;
pub use agent::AgentExecutor;
pub use http::HttpExecutor;
pub use shell::ShellExecutor;
#[derive(Debug, Clone)]
pub struct StepOutput {
pub output: Value,
pub duration_ms: u64,
pub cost_usd: Decimal,
pub input_tokens: Option<u64>,
pub output_tokens: Option<u64>,
pub model: Option<String>,
pub debug_messages: Option<Vec<DebugMessage>>,
}
impl StepOutput {
pub fn debug_messages_json(&self) -> Option<Value> {
self.debug_messages
.as_ref()
.and_then(|msgs| serde_json::to_value(msgs).ok())
}
}
#[derive(Debug, Clone)]
pub struct ParallelStepResult {
pub name: String,
pub output: StepOutput,
pub step_id: Uuid,
}
pub trait StepExecutor: Send + Sync {
fn execute(
&self,
provider: &Arc<dyn AgentProvider>,
) -> impl Future<Output = Result<StepOutput, EngineError>> + Send;
}
pub async fn execute_step_config(
config: &StepConfig,
provider: &Arc<dyn AgentProvider>,
log_sender: Option<StepLogSender>,
) -> Result<StepOutput, EngineError> {
let _kind = match config {
StepConfig::Shell(_) => "shell",
StepConfig::Http(_) => "http",
StepConfig::Agent(_) => "agent",
StepConfig::Workflow(_) => "workflow",
StepConfig::Approval(_) => "approval",
};
let result = match config {
StepConfig::Shell(cfg) => {
let mut executor = ShellExecutor::new(cfg);
if let Some(sender) = log_sender {
executor = executor.with_log_sender(sender);
}
executor.execute(provider).await
}
StepConfig::Http(cfg) => HttpExecutor::new(cfg).execute(provider).await,
StepConfig::Agent(cfg) => {
let mut executor = AgentExecutor::new(cfg);
if let Some(sender) = log_sender {
executor = executor.with_log_sender(sender);
}
executor.execute(provider).await
}
StepConfig::Workflow(_) => Err(EngineError::StepConfig(
"workflow steps are executed by WorkflowContext, not the executor".to_string(),
)),
StepConfig::Approval(_) => Err(EngineError::StepConfig(
"approval steps are executed by WorkflowContext, not the executor".to_string(),
)),
};
#[cfg(feature = "prometheus")]
{
use ironflow_core::metric_names::{
STATUS_ERROR, STATUS_SUCCESS, STEP_DURATION_SECONDS, STEPS_TOTAL,
};
use metrics::{counter, histogram};
let status = if result.is_ok() {
STATUS_SUCCESS
} else {
STATUS_ERROR
};
counter!(STEPS_TOTAL, "kind" => _kind, "status" => status).increment(1);
if let Ok(ref output) = result {
histogram!(STEP_DURATION_SECONDS, "kind" => _kind)
.record(output.duration_ms as f64 / 1000.0);
}
}
result
}
#[cfg(test)]
mod tests {
use super::*;
use ironflow_core::provider::DebugMessage;
use serde_json::json;
#[test]
fn step_output_with_no_debug_messages_returns_none() {
let output = StepOutput {
output: json!({"result": "ok"}),
duration_ms: 100,
cost_usd: rust_decimal::Decimal::ZERO,
input_tokens: None,
output_tokens: None,
model: None,
debug_messages: None,
};
assert_eq!(output.debug_messages_json(), None);
}
#[test]
fn step_output_with_empty_debug_messages_returns_some_empty_array() {
let output = StepOutput {
output: json!({"result": "ok"}),
duration_ms: 100,
cost_usd: rust_decimal::Decimal::ZERO,
input_tokens: None,
output_tokens: None,
model: None,
debug_messages: Some(Vec::new()),
};
let json_val = output.debug_messages_json();
assert!(json_val.is_some());
let arr = json_val.unwrap();
assert!(arr.is_array());
assert_eq!(arr.as_array().unwrap().len(), 0);
}
#[test]
fn step_output_debug_messages_json_serializes_messages() {
let json_msgs = json!([
{
"text": "Hello",
"thinking": null,
"thinking_redacted": false,
"tool_calls": [],
"tool_results": [],
"stop_reason": "end_turn",
"input_tokens": 10,
"output_tokens": 20
},
{
"text": "Hi there",
"thinking": null,
"thinking_redacted": false,
"tool_calls": [],
"tool_results": [],
"stop_reason": "end_turn",
"input_tokens": 15,
"output_tokens": 25
}
]);
let messages: Vec<DebugMessage> =
serde_json::from_value(json_msgs.clone()).expect("deserialize debug messages");
let output = StepOutput {
output: json!({"result": "ok"}),
duration_ms: 100,
cost_usd: rust_decimal::Decimal::ZERO,
input_tokens: None,
output_tokens: None,
model: None,
debug_messages: Some(messages),
};
let json_val = output.debug_messages_json();
assert!(json_val.is_some());
let arr = json_val.unwrap();
assert!(arr.is_array());
let messages_array = arr.as_array().unwrap();
assert_eq!(messages_array.len(), 2);
assert_eq!(messages_array[0]["text"], "Hello");
assert_eq!(messages_array[1]["text"], "Hi there");
}
#[test]
fn step_output_contains_all_metrics() {
let output = StepOutput {
output: json!({"data": "test"}),
duration_ms: 5000,
cost_usd: rust_decimal::Decimal::new(123, 2),
input_tokens: Some(100),
output_tokens: Some(200),
model: Some("claude-sonnet".to_string()),
debug_messages: None,
};
assert_eq!(output.duration_ms, 5000);
assert_eq!(output.cost_usd, rust_decimal::Decimal::new(123, 2));
assert_eq!(output.input_tokens, Some(100));
assert_eq!(output.output_tokens, Some(200));
assert_eq!(output.model, Some("claude-sonnet".to_string()));
}
#[test]
fn step_output_default_tokens_and_model_are_none() {
let output = StepOutput {
output: json!({}),
duration_ms: 0,
cost_usd: rust_decimal::Decimal::ZERO,
input_tokens: None,
output_tokens: None,
model: None,
debug_messages: None,
};
assert!(output.input_tokens.is_none());
assert!(output.output_tokens.is_none());
assert!(output.model.is_none());
}
#[test]
fn parallel_step_result_contains_step_metadata() {
let step_id = uuid::Uuid::now_v7();
let output = StepOutput {
output: json!({"done": true}),
duration_ms: 1000,
cost_usd: rust_decimal::Decimal::ZERO,
input_tokens: None,
output_tokens: None,
model: None,
debug_messages: None,
};
let result = ParallelStepResult {
name: "build".to_string(),
output,
step_id,
};
assert_eq!(result.name, "build");
assert_eq!(result.step_id, step_id);
assert_eq!(result.output.duration_ms, 1000);
}
#[test]
fn step_output_serializes_complex_json_output() {
let complex_output = json!({
"status": "success",
"data": {
"items": [1, 2, 3],
"nested": {
"key": "value"
}
}
});
let output = StepOutput {
output: complex_output.clone(),
duration_ms: 100,
cost_usd: rust_decimal::Decimal::ZERO,
input_tokens: None,
output_tokens: None,
model: None,
debug_messages: None,
};
assert_eq!(output.output, complex_output);
assert_eq!(output.output["status"], "success");
assert_eq!(output.output["data"]["items"][0], 1);
assert_eq!(output.output["data"]["nested"]["key"], "value");
}
}