use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Instant;
use tokio::process::Command;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
#[cfg(feature = "tokio-tungstenite")]
use futures::{SinkExt, StreamExt};
pub mod chain_executor;
fn sanitize_file_path(path: &str) -> anyhow::Result<String> {
use std::path::{Path, Component};
if path.contains("..") || path.contains("~") {
return Err(anyhow::anyhow!("Path traversal detected: {}", path));
}
let windows_reserved = ["CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4",
"COM5", "COM6", "COM7", "COM8", "COM9", "LPT1", "LPT2",
"LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"];
let file_name = Path::new(path).file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
if windows_reserved.iter().any(|&reserved| file_name.eq_ignore_ascii_case(reserved)) {
return Err(anyhow::anyhow!("Windows reserved filename: {}", path));
}
let normalized = Path::new(path).components()
.filter(|component| match component {
Component::Normal(_) => true,
Component::CurDir => false,
Component::ParentDir => false,
_ => true, })
.collect::<std::path::PathBuf>();
let sanitized = normalized.to_string_lossy().to_string();
if cfg!(unix) && (sanitized.starts_with("/etc/") || sanitized.starts_with("/sys/") ||
sanitized.starts_with("/proc/") || sanitized.starts_with("/dev/")) {
return Err(anyhow::anyhow!("Access to system directory denied: {}", sanitized));
}
if cfg!(windows) && (sanitized.to_lowercase().starts_with("c:\\windows\\") ||
sanitized.to_lowercase().starts_with("c:\\system32\\")) {
return Err(anyhow::anyhow!("Access to system directory denied: {}", sanitized));
}
Ok(sanitized)
}
fn sanitize_command(command: &str, args: &[&str]) -> anyhow::Result<()> {
let dangerous_patterns = [
"&&", "||", ";", "|", "`", "$", "&", ">", "<",
"$(", "${", "rm -rf", "del /f", "format", "shutdown", "reboot"
];
for pattern in dangerous_patterns {
if command.contains(pattern) {
return Err(anyhow::anyhow!("Dangerous command pattern detected: {}", pattern));
}
for arg in args {
if arg.contains(pattern) {
return Err(anyhow::anyhow!("Dangerous argument pattern detected: {}", pattern));
}
}
}
let dangerous_commands = [
"rm", "rmdir", "del", "deltree", "format", "fdisk", "mkfs",
"sudo", "su", "passwd", "useradd", "userdel", "chmod", "chown",
"shutdown", "reboot", "halt", "init", "systemctl", "service",
"curl", "wget", "nc", "netcat", "telnet", "ssh", "ftp", "sftp"
];
for dangerous_cmd in dangerous_commands {
if command.eq_ignore_ascii_case(dangerous_cmd) {
return Err(anyhow::anyhow!("Dangerous command blocked: {}", dangerous_cmd));
}
}
let script_interpreters = ["bash", "sh", "powershell", "python", "perl", "ruby"];
if script_interpreters.iter().any(|&interp| command.eq_ignore_ascii_case(interp)) {
for arg in args {
if arg.contains("-c") || arg.contains("-e") {
return Err(anyhow::anyhow!("Inline script execution blocked"));
}
}
}
if command.eq_ignore_ascii_case("cmd") {
for (i, arg) in args.iter().enumerate() {
if *arg == "/c" && i + 1 < args.len() {
let next_cmd = args[i + 1];
for dangerous_cmd in ["del", "rmdir", "format", "shutdown", "reboot"] {
if next_cmd.eq_ignore_ascii_case(dangerous_cmd) {
return Err(anyhow::anyhow!("Dangerous Windows command blocked: {}", next_cmd));
}
}
}
}
}
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct Mission {
pub version: String,
pub name: String,
pub description: Option<String>,
pub steps: Vec<MissionStep>,
pub config: Option<MissionConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct MissionStep {
pub id: String,
pub name: String,
pub step_type: StepType,
pub depends_on: Option<Vec<String>>,
pub timeout_seconds: Option<u64>,
pub continue_on_error: Option<bool>,
pub parameters: serde_json::Value,
}
#[derive(Debug, Clone, JsonSchema)]
pub enum StepType {
CreateFile,
EditFile,
DeleteFile,
CopyFile,
MoveFile,
ReadFile,
ListDirectory,
FileSearch,
ParseJson,
ParseYaml,
ParseXml,
ValidateSchema,
CsvProcess,
CompileCode,
RunTests,
FormatCode,
LintCode,
ExtractFunctions,
GenerateDocs,
GitCommit,
GitBranch,
GitMerge,
GitStatus,
GitDiff,
ProcessStart,
ProcessKill,
MonitorResources,
ServiceHealth,
Compress,
SqlQuery,
RedisSet,
RedisGet,
DbBackup,
DbMigrate,
WebsocketConnect,
FtpUpload,
FtpDownload,
SshExecute,
PingHost,
GenerateEmbedding,
SimilaritySearch,
ModelInference,
Command,
Http,
Noop,
Llm,
Tool,
RagQuery,
RagAdd,
Chain,
Agent,
}
impl Serialize for StepType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = match self {
StepType::CreateFile => "create_file",
StepType::EditFile => "edit_file",
StepType::DeleteFile => "delete_file",
StepType::CopyFile => "copy_file",
StepType::MoveFile => "move_file",
StepType::ReadFile => "read_file",
StepType::ListDirectory => "list_directory",
StepType::FileSearch => "file_search",
StepType::ParseJson => "parse_json",
StepType::ParseYaml => "parse_yaml",
StepType::ParseXml => "parse_xml",
StepType::ValidateSchema => "validate_schema",
StepType::CsvProcess => "csv_process",
StepType::CompileCode => "compile_code",
StepType::RunTests => "run_tests",
StepType::FormatCode => "format_code",
StepType::LintCode => "lint_code",
StepType::ExtractFunctions => "extract_functions",
StepType::GenerateDocs => "generate_docs",
StepType::GitCommit => "git_commit",
StepType::GitBranch => "git_branch",
StepType::GitMerge => "git_merge",
StepType::GitStatus => "git_status",
StepType::GitDiff => "git_diff",
StepType::ProcessStart => "process_start",
StepType::ProcessKill => "process_kill",
StepType::MonitorResources => "monitor_resources",
StepType::ServiceHealth => "service_health",
StepType::Compress => "compress",
StepType::SqlQuery => "sql_query",
StepType::RedisSet => "redis_set",
StepType::RedisGet => "redis_get",
StepType::DbBackup => "db_backup",
StepType::DbMigrate => "db_migrate",
StepType::WebsocketConnect => "websocket_connect",
StepType::FtpUpload => "ftp_upload",
StepType::FtpDownload => "ftp_download",
StepType::SshExecute => "ssh_execute",
StepType::PingHost => "ping_host",
StepType::GenerateEmbedding => "generate_embedding",
StepType::SimilaritySearch => "similarity_search",
StepType::ModelInference => "model_inference",
StepType::Command => "command",
StepType::Http => "http",
StepType::Noop => "noop",
StepType::Llm => "llm",
StepType::Tool => "tool",
StepType::RagQuery => "rag_query",
StepType::RagAdd => "rag_add",
StepType::Chain => "chain",
StepType::Agent => "agent",
};
serializer.serialize_str(s)
}
}
impl<'de> Deserialize<'de> for StepType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, Visitor};
use std::fmt;
struct StepTypeVisitor;
impl<'de> Visitor<'de> for StepTypeVisitor {
type Value = StepType;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string or object representing a step type")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
match value {
"create_file" => Ok(StepType::CreateFile),
"edit_file" => Ok(StepType::EditFile),
"delete_file" => Ok(StepType::DeleteFile),
"copy_file" => Ok(StepType::CopyFile),
"move_file" => Ok(StepType::MoveFile),
"read_file" => Ok(StepType::ReadFile),
"list_directory" => Ok(StepType::ListDirectory),
"file_search" => Ok(StepType::FileSearch),
"parse_json" => Ok(StepType::ParseJson),
"parse_yaml" => Ok(StepType::ParseYaml),
"parse_xml" => Ok(StepType::ParseXml),
"validate_schema" => Ok(StepType::ValidateSchema),
"csv_process" => Ok(StepType::CsvProcess),
"compile_code" => Ok(StepType::CompileCode),
"run_tests" => Ok(StepType::RunTests),
"format_code" => Ok(StepType::FormatCode),
"lint_code" => Ok(StepType::LintCode),
"extract_functions" => Ok(StepType::ExtractFunctions),
"generate_docs" => Ok(StepType::GenerateDocs),
"git_commit" => Ok(StepType::GitCommit),
"git_branch" => Ok(StepType::GitBranch),
"git_merge" => Ok(StepType::GitMerge),
"git_status" => Ok(StepType::GitStatus),
"git_diff" => Ok(StepType::GitDiff),
"process_start" => Ok(StepType::ProcessStart),
"process_kill" => Ok(StepType::ProcessKill),
"monitor_resources" => Ok(StepType::MonitorResources),
"service_health" => Ok(StepType::ServiceHealth),
"compress" => Ok(StepType::Compress),
"sql_query" => Ok(StepType::SqlQuery),
"redis_set" => Ok(StepType::RedisSet),
"redis_get" => Ok(StepType::RedisGet),
"db_backup" => Ok(StepType::DbBackup),
"db_migrate" => Ok(StepType::DbMigrate),
"websocket_connect" => Ok(StepType::WebsocketConnect),
"ftp_upload" => Ok(StepType::FtpUpload),
"ftp_download" => Ok(StepType::FtpDownload),
"ssh_execute" => Ok(StepType::SshExecute),
"ping_host" => Ok(StepType::PingHost),
"generate_embedding" => Ok(StepType::GenerateEmbedding),
"similarity_search" => Ok(StepType::SimilaritySearch),
"model_inference" => Ok(StepType::ModelInference),
"command" => Ok(StepType::Command),
"http" => Ok(StepType::Http),
"noop" => Ok(StepType::Noop),
"llm" => Ok(StepType::Llm),
"tool" => Ok(StepType::Tool),
"rag_query" => Ok(StepType::RagQuery),
"rag_add" => Ok(StepType::RagAdd),
"chain" => Ok(StepType::Chain),
"agent" => Ok(StepType::Agent),
"Tool" => Ok(StepType::Tool),
other => Err(E::unknown_variant(
other,
&[
"create_file",
"edit_file",
"delete_file",
"command",
"http",
"noop",
"llm",
"tool",
"rag_query",
"rag_add",
"chain",
"agent",
],
)),
}
}
fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
where
V: de::MapAccess<'de>,
{
let mut step_type: Option<String> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"type" => {
if step_type.is_some() {
return Err(de::Error::duplicate_field("type"));
}
step_type = Some(map.next_value()?);
}
_ => {
let _: serde_json::Value = map.next_value()?;
}
}
}
let step_type = step_type.ok_or_else(|| de::Error::missing_field("type"))?;
self.visit_str(&step_type)
}
}
deserializer.deserialize_any(StepTypeVisitor)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use tokio_test;
use std::io::Write;
mod steptype_tests {
use super::*;
#[test]
fn test_steptype_simple_string_serialization() {
let step_type = StepType::CreateFile;
let serialized = serde_json::to_string(&step_type).unwrap();
assert_eq!(serialized, "\"create_file\"");
let deserialized: StepType = serde_json::from_str(&serialized).unwrap();
assert!(matches!(deserialized, StepType::CreateFile));
}
#[test]
fn test_steptype_legacy_format_support() {
let json = "\"Tool\"";
let deserialized: StepType = serde_json::from_str(json).unwrap();
assert!(matches!(deserialized, StepType::Tool));
}
#[test]
fn test_steptype_future_object_format() {
let json = r#"{"type": "create_file", "metadata": {"version": "1.2"}}"#;
let deserialized: StepType = serde_json::from_str(json).unwrap();
assert!(matches!(deserialized, StepType::CreateFile));
}
#[test]
fn test_steptype_all_variants() {
let variants = vec![
(StepType::CreateFile, "create_file"),
(StepType::EditFile, "edit_file"),
(StepType::DeleteFile, "delete_file"),
(StepType::Command, "command"),
(StepType::Http, "http"),
(StepType::Noop, "noop"),
(StepType::Llm, "llm"),
(StepType::Tool, "tool"),
(StepType::RagQuery, "rag_query"),
(StepType::RagAdd, "rag_add"),
(StepType::Chain, "chain"),
(StepType::Agent, "agent"),
];
for (step_type, expected_str) in variants {
let serialized = serde_json::to_string(&step_type).unwrap();
assert_eq!(serialized, format!("\"{}\"", expected_str));
let deserialized: StepType = serde_json::from_str(&serialized).unwrap();
assert!(std::mem::discriminant(&step_type) == std::mem::discriminant(&deserialized));
}
}
#[test]
fn test_steptype_invalid_string() {
let json = "\"invalid_step\"";
let result: Result<StepType, _> = serde_json::from_str(json);
assert!(result.is_err());
}
#[test]
fn test_steptype_object_missing_type() {
let json = r#"{"metadata": {"version": "1.2"}}"#;
let result: Result<StepType, _> = serde_json::from_str(json);
assert!(result.is_err());
}
#[test]
fn test_steptype_object_duplicate_type() {
let json = r#"{"type": "create_file", "type": "edit_file"}"#;
let result: Result<StepType, _> = serde_json::from_str(json);
assert!(result.is_err());
}
}
mod mission_tests {
use super::*;
#[test]
fn test_mission_creation() {
let mission = Mission {
version: "1.0".to_string(),
name: "Test Mission".to_string(),
description: Some("A test mission".to_string()),
steps: vec![
MissionStep {
id: "step1".to_string(),
name: "First Step".to_string(),
step_type: StepType::Noop,
depends_on: None,
timeout_seconds: Some(60),
continue_on_error: None,
parameters: serde_json::json!({"key": "value"}),
}
],
config: Some(MissionConfig {
max_parallel_steps: Some(2),
timeout_seconds: Some(300),
fail_fast: Some(true),
}),
};
assert_eq!(mission.name, "Test Mission");
assert_eq!(mission.steps.len(), 1);
assert!(mission.config.is_some());
}
#[test]
fn test_mission_step_serialization() {
let step = MissionStep {
id: "test_step".to_string(),
name: "Test Step".to_string(),
step_type: StepType::CreateFile,
depends_on: Some(vec!["dep1".to_string(), "dep2".to_string()]),
timeout_seconds: Some(120),
continue_on_error: None,
parameters: serde_json::json!({
"path": "/tmp/test.txt",
"content": "Hello, World!"
}),
};
let serialized = serde_json::to_string(&step).unwrap();
let deserialized: MissionStep = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.id, "test_step");
assert_eq!(deserialized.name, "Test Step");
assert!(matches!(deserialized.step_type, StepType::CreateFile));
assert_eq!(deserialized.depends_on.unwrap().len(), 2);
assert_eq!(deserialized.timeout_seconds.unwrap(), 120);
}
#[test]
fn test_mission_config_defaults() {
let config = MissionConfig {
max_parallel_steps: None,
timeout_seconds: None,
fail_fast: None,
};
let serialized = serde_json::to_string(&config).unwrap();
let deserialized: MissionConfig = serde_json::from_str(&serialized).unwrap();
assert!(deserialized.max_parallel_steps.is_none());
assert!(deserialized.timeout_seconds.is_none());
assert!(deserialized.fail_fast.is_none());
}
}
mod mission_loader_tests {
use super::*;
use std::fs;
#[test]
fn test_load_mission_yaml() {
let temp_dir = TempDir::new().unwrap();
let mission_path = temp_dir.path().join("test_mission.yaml");
let mission_yaml = r#"
version: "1.0"
name: "YAML Test Mission"
description: "Testing YAML loading"
steps:
- id: "step1"
name: "First Step"
step_type: "noop"
parameters: {}
config:
max_parallel_steps: 3
timeout_seconds: 600
fail_fast: true
"#;
fs::write(&mission_path, mission_yaml).unwrap();
let mission = MissionLoader::load_from_file(mission_path.to_str().unwrap()).unwrap();
assert_eq!(mission.name, "YAML Test Mission");
assert_eq!(mission.steps.len(), 1);
assert!(mission.config.is_some());
assert_eq!(mission.config.unwrap().max_parallel_steps.unwrap(), 3);
}
#[test]
fn test_load_mission_json() {
let temp_dir = TempDir::new().unwrap();
let mission_path = temp_dir.path().join("test_mission.json");
let mission_json = r#"{
"version": "1.0",
"name": "JSON Test Mission",
"description": "Testing JSON loading",
"steps": [
{
"id": "step1",
"name": "First Step",
"step_type": "noop",
"parameters": {}
}
],
"config": {
"max_parallel_steps": 5,
"timeout_seconds": 900,
"fail_fast": false
}
}"#;
fs::write(&mission_path, mission_json).unwrap();
let mission = MissionLoader::load_from_file(mission_path.to_str().unwrap()).unwrap();
assert_eq!(mission.name, "JSON Test Mission");
assert_eq!(mission.steps.len(), 1);
assert!(mission.config.is_some());
assert_eq!(mission.config.unwrap().max_parallel_steps.unwrap(), 5);
}
#[test]
fn test_load_mission_empty_path() {
let result = MissionLoader::load_from_file("");
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Mission path must not be empty"));
}
#[test]
fn test_load_mission_nonexistent_file() {
let result = MissionLoader::load_from_file("/nonexistent/path/mission.yaml");
assert!(result.is_err());
}
#[test]
fn test_validate_mission_success() {
let mission = Mission {
version: "1.0".to_string(),
name: "Valid Mission".to_string(),
description: None,
steps: vec![
MissionStep {
id: "step1".to_string(),
name: "Step 1".to_string(),
step_type: StepType::Noop,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
MissionStep {
id: "step2".to_string(),
name: "Step 2".to_string(),
step_type: StepType::Noop,
depends_on: Some(vec!["step1".to_string()]),
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
],
config: None,
};
let result = MissionLoader::validate_mission(&mission);
assert!(result.is_ok());
}
#[test]
fn test_validate_mission_empty_steps() {
let mission = Mission {
version: "1.0".to_string(),
name: "Empty Mission".to_string(),
description: None,
steps: vec![],
config: None,
};
let result = MissionLoader::validate_mission(&mission);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Mission must have at least one step"));
}
#[test]
fn test_validate_mission_duplicate_ids() {
let mission = Mission {
version: "1.0".to_string(),
name: "Duplicate ID Mission".to_string(),
description: None,
steps: vec![
MissionStep {
id: "step1".to_string(),
name: "First Step".to_string(),
step_type: StepType::Noop,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
MissionStep {
id: "step1".to_string(), name: "Second Step".to_string(),
step_type: StepType::Noop,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
],
config: None,
};
let result = MissionLoader::validate_mission(&mission);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Duplicate step ID"));
}
#[test]
fn test_validate_mission_missing_dependency() {
let mission = Mission {
version: "1.0".to_string(),
name: "Missing Dependency Mission".to_string(),
description: None,
steps: vec![
MissionStep {
id: "step1".to_string(),
name: "Step 1".to_string(),
step_type: StepType::Noop,
depends_on: Some(vec!["nonexistent".to_string()]),
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
],
config: None,
};
let result = MissionLoader::validate_mission(&mission);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("depends on non-existent step"));
}
}
mod dag_executor_tests {
use super::*;
#[tokio::test]
async fn test_topological_sort_simple() {
let steps = vec![
MissionStep {
id: "step1".to_string(),
name: "First".to_string(),
step_type: StepType::Noop,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
MissionStep {
id: "step2".to_string(),
name: "Second".to_string(),
step_type: StepType::Noop,
depends_on: Some(vec!["step1".to_string()]),
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
MissionStep {
id: "step3".to_string(),
name: "Third".to_string(),
step_type: StepType::Noop,
depends_on: Some(vec!["step2".to_string()]),
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
];
let order = DagExecutor::topological_sort(&steps).unwrap();
assert_eq!(order, vec!["step1", "step2", "step3"]);
}
#[tokio::test]
async fn test_topological_sort_circular_dependency() {
let steps = vec![
MissionStep {
id: "step1".to_string(),
name: "First".to_string(),
step_type: StepType::Noop,
depends_on: Some(vec!["step2".to_string()]),
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
MissionStep {
id: "step2".to_string(),
name: "Second".to_string(),
step_type: StepType::Noop,
depends_on: Some(vec!["step1".to_string()]),
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
},
];
let result = DagExecutor::topological_sort(&steps);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Circular dependency"));
}
#[tokio::test]
async fn test_execute_mission_simple_success() {
let mission = Mission {
version: "1.0".to_string(),
name: "Simple Success Mission".to_string(),
description: Some("A simple successful mission".to_string()),
steps: vec![
MissionStep {
id: "noop1".to_string(),
name: "First Noop".to_string(),
step_type: StepType::Noop,
depends_on: None,
timeout_seconds: Some(10),
continue_on_error: None,
parameters: serde_json::json!({}),
},
MissionStep {
id: "noop2".to_string(),
name: "Second Noop".to_string(),
step_type: StepType::Noop,
depends_on: Some(vec!["noop1".to_string()]),
timeout_seconds: Some(10),
continue_on_error: None,
parameters: serde_json::json!({}),
},
],
config: Some(MissionConfig {
max_parallel_steps: Some(1),
timeout_seconds: Some(60),
fail_fast: Some(true),
}),
};
let result = DagExecutor::execute_mission(mission).await.unwrap();
assert!(matches!(result.status, MissionStatus::Completed));
assert_eq!(result.step_results.len(), 2);
assert!(result.step_results.contains_key("noop1"));
assert!(result.step_results.contains_key("noop2"));
assert!(result.total_duration_ms >= 0);
}
#[tokio::test]
async fn test_execute_mission_empty() {
let mission = Mission {
version: "1.0".to_string(),
name: "Empty Mission".to_string(),
description: None,
steps: vec![],
config: None,
};
let result = DagExecutor::execute_mission(mission).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Cannot execute empty mission"));
}
#[tokio::test]
async fn test_execute_mission_fail_fast() {
let mission = Mission {
version: "1.0".to_string(),
name: "Fail Fast Mission".to_string(),
description: None,
steps: vec![
MissionStep {
id: "failing_step".to_string(),
name: "Failing Step".to_string(),
step_type: StepType::Command,
depends_on: None,
timeout_seconds: Some(5),
continue_on_error: None,
parameters: serde_json::json!({
"command": "invalid_command_that_does_not_exist",
"args": []
}),
},
MissionStep {
id: "should_not_run".to_string(),
name: "Should Not Run".to_string(),
step_type: StepType::Noop,
depends_on: Some(vec!["failing_step".to_string()]),
timeout_seconds: Some(5),
continue_on_error: None,
parameters: serde_json::json!({}),
},
],
config: Some(MissionConfig {
max_parallel_steps: Some(1),
timeout_seconds: Some(30),
fail_fast: Some(true),
}),
};
let result = DagExecutor::execute_mission(mission).await;
assert!(result.is_err());
}
}
mod execution_context_tests {
use super::*;
#[test]
fn test_execution_context_creation() {
let context = ExecutionContext::new();
assert!(context.variables.is_empty());
assert!(context.environment.is_empty());
}
#[test]
fn test_execution_context_variables() {
let mut context = ExecutionContext::new();
context.set_variable("key1", "value1");
context.set_variable("key2", "value2");
assert_eq!(context.get_variable("key1"), Some(&"value1".to_string()));
assert_eq!(context.get_variable("key2"), Some(&"value2".to_string()));
assert_eq!(context.get_variable("nonexistent"), None);
}
#[test]
fn test_execution_context_variable_override() {
let mut context = ExecutionContext::new();
context.set_variable("key", "original");
assert_eq!(context.get_variable("key"), Some(&"original".to_string()));
context.set_variable("key", "updated");
assert_eq!(context.get_variable("key"), Some(&"updated".to_string()));
}
}
mod step_execution_tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_execute_noop_step() {
let step = MissionStep {
id: "noop_test".to_string(),
name: "Test Noop".to_string(),
step_type: StepType::Noop,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert_eq!(result.step_id, "noop_test");
assert!(matches!(result.status, StepStatus::Success));
assert!(result.output.get("message").is_some());
assert!(result.error.is_none());
}
#[tokio::test]
async fn test_execute_create_file_step() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test_file.txt");
let step = MissionStep {
id: "create_file_test".to_string(),
name: "Test Create File".to_string(),
step_type: StepType::CreateFile,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"path": file_path.to_str().unwrap(),
"content": "Hello, World!\nThis is a test file."
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert_eq!(result.step_id, "create_file_test");
assert!(matches!(result.status, StepStatus::Success));
assert!(result.error.is_none());
let content = std::fs::read_to_string(&file_path).unwrap();
assert_eq!(content, "Hello, World!\nThis is a test file.");
}
#[tokio::test]
async fn test_execute_create_file_missing_path() {
let step = MissionStep {
id: "invalid_create_file".to_string(),
name: "Invalid Create File".to_string(),
step_type: StepType::CreateFile,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"content": "Some content"
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Missing 'path' parameter"));
}
#[tokio::test]
async fn test_execute_command_step_success() {
let step = MissionStep {
id: "command_test".to_string(),
name: "Test Command".to_string(),
step_type: StepType::Command,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"command": "echo",
"args": ["Hello", "World"]
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Success));
assert!(result.output["stdout"].as_str().unwrap().contains("Hello World"));
assert_eq!(result.output["exit_code"].as_i64().unwrap(), 0);
}
#[tokio::test]
async fn test_execute_command_step_failure() {
let step = MissionStep {
id: "failing_command".to_string(),
name: "Failing Command".to_string(),
step_type: StepType::Command,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"command": "false", "args": []
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Failed));
assert!(result.error.is_some());
}
#[tokio::test]
async fn test_execute_edit_file_step() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("edit_test.txt");
std::fs::write(&file_path, "Initial content\n").unwrap();
let step = MissionStep {
id: "edit_file_test".to_string(),
name: "Test Edit File".to_string(),
step_type: StepType::EditFile,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"path": file_path.to_str().unwrap(),
"content": "New content",
"append": false
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Success));
assert_eq!(std::fs::read_to_string(&file_path).unwrap(), "New content");
}
#[tokio::test]
async fn test_execute_edit_file_append() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("append_test.txt");
std::fs::write(&file_path, "Initial content\n").unwrap();
let step = MissionStep {
id: "append_file_test".to_string(),
name: "Test Append File".to_string(),
step_type: StepType::EditFile,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"path": file_path.to_str().unwrap(),
"content": "Appended content\n",
"append": true
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Success));
let content = std::fs::read_to_string(&file_path).unwrap();
assert!(content.contains("Initial content"));
assert!(content.contains("Appended content"));
}
#[tokio::test]
async fn test_execute_delete_file_step() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("delete_test.txt");
std::fs::write(&file_path, "Content to delete").unwrap();
assert!(file_path.exists());
let step = MissionStep {
id: "delete_file_test".to_string(),
name: "Test Delete File".to_string(),
step_type: StepType::DeleteFile,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"path": file_path.to_str().unwrap()
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Success));
assert!(!file_path.exists());
assert_eq!(result.output["existed"].as_bool().unwrap(), true);
assert_eq!(result.output["deleted"].as_bool().unwrap(), true);
}
#[tokio::test]
async fn test_execute_delete_nonexistent_file() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("nonexistent.txt");
let step = MissionStep {
id: "delete_nonexistent".to_string(),
name: "Delete Nonexistent File".to_string(),
step_type: StepType::DeleteFile,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"path": file_path.to_str().unwrap()
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Success));
assert_eq!(result.output["existed"].as_bool().unwrap(), false);
assert_eq!(result.output["deleted"].as_bool().unwrap(), false);
}
#[tokio::test]
async fn test_execute_command_with_working_dir() {
let temp_dir = TempDir::new().unwrap();
let step = MissionStep {
id: "echo_command".to_string(),
name: "Echo Command".to_string(),
step_type: StepType::Command,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"command": "echo",
"args": ["working_directory_test"],
"working_dir": temp_dir.path().to_str().unwrap()
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Success));
let stdout = result.output["stdout"].as_str().unwrap();
assert!(stdout.contains("working_directory_test"));
}
#[tokio::test]
async fn test_step_result_duration_tracking() {
let step = MissionStep {
id: "duration_test".to_string(),
name: "Duration Test".to_string(),
step_type: StepType::Noop,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(result.duration_ms >= 0);
}
}
mod feature_gated_tests {
use super::*;
#[cfg(not(feature = "llm"))]
#[tokio::test]
async fn test_http_step_without_llm_feature() {
let step = MissionStep {
id: "http_disabled".to_string(),
name: "HTTP Disabled Test".to_string(),
step_type: StepType::Http,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"url": "https://httpbin.org/get",
"method": "GET"
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Skipped));
assert!(result.output["message"].as_str().unwrap().contains("HTTP support requires 'llm' feature"));
}
#[cfg(not(feature = "llm"))]
#[tokio::test]
async fn test_llm_step_without_llm_feature() {
let step = MissionStep {
id: "llm_disabled".to_string(),
name: "LLM Disabled Test".to_string(),
step_type: StepType::Llm,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"prompt": "Hello, world!"
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Failed));
assert_eq!(result.error.as_ref().unwrap(), "LLM feature not enabled");
}
#[cfg(not(feature = "tools"))]
#[tokio::test]
async fn test_tool_step_without_tools_feature() {
let step = MissionStep {
id: "tool_disabled".to_string(),
name: "Tool Disabled Test".to_string(),
step_type: StepType::Tool,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"tool": "test_tool",
"parameters": {}
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Failed));
assert_eq!(result.error.as_ref().unwrap(), "Tools feature not enabled");
}
#[cfg(not(feature = "rag"))]
#[tokio::test]
async fn test_rag_query_step_without_rag_feature() {
let step = MissionStep {
id: "rag_query_disabled".to_string(),
name: "RAG Query Disabled Test".to_string(),
step_type: StepType::RagQuery,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"query": "test query"
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Failed));
assert_eq!(result.error.as_ref().unwrap(), "RAG feature not enabled");
}
#[cfg(not(feature = "rag"))]
#[tokio::test]
async fn test_rag_add_step_without_rag_feature() {
let step = MissionStep {
id: "rag_add_disabled".to_string(),
name: "RAG Add Disabled Test".to_string(),
step_type: StepType::RagAdd,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"id": "doc1",
"content": "test content"
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Failed));
assert_eq!(result.error.as_ref().unwrap(), "RAG feature not enabled");
}
#[cfg(not(feature = "chain"))]
#[tokio::test]
async fn test_chain_step_without_chain_feature() {
let step = MissionStep {
id: "chain_disabled".to_string(),
name: "Chain Disabled Test".to_string(),
step_type: StepType::Chain,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"type": "sequential",
"prompt": "test prompt"
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Failed));
assert_eq!(result.error.as_ref().unwrap(), "Chain feature not enabled");
}
#[cfg(not(feature = "agent"))]
#[tokio::test]
async fn test_agent_step_without_agent_feature() {
let step = MissionStep {
id: "agent_disabled".to_string(),
name: "Agent Disabled Test".to_string(),
step_type: StepType::Agent,
depends_on: None,
timeout_seconds: None,
continue_on_error: None,
parameters: serde_json::json!({
"objective": "test objective",
"name": "test_agent"
}),
};
let mut context = ExecutionContext::new();
let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
assert!(matches!(result.status, StepStatus::Failed));
assert_eq!(result.error.as_ref().unwrap(), "Agent feature not enabled");
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct MissionConfig {
pub max_parallel_steps: Option<usize>,
pub timeout_seconds: Option<u64>,
pub fail_fast: Option<bool>,
}
pub struct MissionLoader;
impl MissionLoader {
pub fn load_from_file(path: &str) -> anyhow::Result<Mission> {
if path.is_empty() {
return Err(anyhow::anyhow!("Mission path must not be empty"));
}
let content = std::fs::read_to_string(path)?;
let mission: Mission = if path.ends_with(".json") {
serde_json::from_str(&content)?
} else {
serde_yaml::from_str(&content)?
};
Self::validate_mission(&mission)?;
Ok(mission)
}
pub fn validate_mission(mission: &Mission) -> anyhow::Result<()> {
if mission.steps.is_empty() {
return Err(anyhow::anyhow!("Mission must have at least one step"));
}
let mut seen_ids = std::collections::HashSet::new();
for step in &mission.steps {
if !seen_ids.insert(&step.id) {
return Err(anyhow::anyhow!("Duplicate step ID: {}", step.id));
}
}
for step in &mission.steps {
if let Some(deps) = &step.depends_on {
for dep in deps {
if !mission.steps.iter().any(|s| s.id == *dep) {
return Err(anyhow::anyhow!(
"Step {} depends on non-existent step: {}",
step.id,
dep
));
}
}
}
}
Ok(())
}
}
pub struct DagExecutor;
impl DagExecutor {
pub async fn execute_mission(mission: Mission) -> anyhow::Result<MissionResult> {
if mission.steps.is_empty() {
return Err(anyhow::anyhow!("Cannot execute empty mission"));
}
let start_time = Instant::now();
let execution_order = Self::topological_sort(&mission.steps)?;
let mut results: HashMap<String, StepResult> = HashMap::new();
let mut completed = std::collections::HashSet::new();
let mut context = ExecutionContext::new();
let fail_fast = mission
.config
.as_ref()
.and_then(|c| c.fail_fast)
.unwrap_or(true);
info!(
"Executing mission '{}' with {} steps",
mission.name,
execution_order.len()
);
for step_id in execution_order {
let step = mission.steps.iter().find(|s| s.id == step_id).unwrap();
debug!("Executing step: {} ({})", step.id, step.name);
if let Some(deps) = &step.depends_on {
for dep in deps {
if !completed.contains(dep) {
return Err(anyhow::anyhow!(
"Dependency {} not completed for step {}",
dep,
step.id
));
}
if let Some(dep_result) = results.get(dep) {
if matches!(dep_result.status, StepStatus::Failed) {
let should_continue = step.continue_on_error.unwrap_or(false);
if !should_continue && fail_fast {
warn!("Skipping step {} due to failed dependency {}", step.id, dep);
results.insert(
step_id.clone(),
StepResult {
step_id: step.id.clone(),
status: StepStatus::Skipped,
output: serde_json::json!({"reason": "dependency failed"}),
error: Some(format!("Dependency {} failed", dep)),
duration_ms: 0,
},
);
continue;
}
}
}
}
}
let timeout = step
.timeout_seconds
.or(mission.config.as_ref().and_then(|c| c.timeout_seconds))
.unwrap_or(300);
let step_start = Instant::now();
let result = match tokio::time::timeout(
std::time::Duration::from_secs(timeout),
Self::execute_step(step, &mut context),
)
.await
{
Ok(Ok(result)) => result,
Ok(Err(e)) => {
error!("Step {} failed: {}", step.id, e);
let should_continue = step.continue_on_error.unwrap_or(false);
if !should_continue && fail_fast {
return Err(e);
}
StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": e.to_string()}),
error: Some(e.to_string()),
duration_ms: step_start.elapsed().as_millis() as u64,
}
}
Err(_) => {
error!("Step {} timed out after {} seconds", step.id, timeout);
if fail_fast {
return Err(anyhow::anyhow!("Step {} timed out", step.id));
}
StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "timeout"}),
error: Some(format!("Timed out after {} seconds", timeout)),
duration_ms: timeout * 1000,
}
}
};
info!(
"Step {} completed with status: {:?}",
step.id, result.status
);
results.insert(step_id.clone(), result);
completed.insert(step_id);
}
let has_failures = results
.values()
.any(|r| matches!(r.status, StepStatus::Failed));
let all_skipped = results
.values()
.all(|r| matches!(r.status, StepStatus::Skipped));
let status = if has_failures {
MissionStatus::Failed
} else if all_skipped {
MissionStatus::Cancelled
} else {
MissionStatus::Completed
};
Ok(MissionResult {
mission_id: Uuid::new_v4(),
status,
step_results: results,
total_duration_ms: start_time.elapsed().as_millis() as u64,
})
}
fn topological_sort(steps: &[MissionStep]) -> anyhow::Result<Vec<String>> {
let mut in_degree = HashMap::new();
let mut graph = HashMap::new();
for step in steps {
in_degree.insert(step.id.clone(), 0);
graph.insert(step.id.clone(), Vec::new());
}
for step in steps {
if let Some(deps) = &step.depends_on {
for dep in deps {
if let Some(dep_list) = graph.get_mut(dep) {
dep_list.push(step.id.clone());
} else {
return Err(anyhow::anyhow!("Dependency '{}' not found for step '{}'", dep, step.id));
}
if let Some(degree) = in_degree.get_mut(&step.id) {
*degree += 1;
} else {
return Err(anyhow::anyhow!("Step '{}' not found in dependency graph", step.id));
}
}
}
}
let mut queue = std::collections::VecDeque::new();
let mut result = Vec::new();
for (node, °ree) in &in_degree {
if degree == 0 {
queue.push_back(node.clone());
}
}
while let Some(node) = queue.pop_front() {
result.push(node.clone());
for neighbor in &graph[&node] {
let degree = in_degree.get_mut(neighbor).unwrap();
*degree -= 1;
if *degree == 0 {
queue.push_back(neighbor.clone());
}
}
}
if result.len() != steps.len() {
return Err(anyhow::anyhow!("Circular dependency detected"));
}
Ok(result)
}
pub async fn execute_step(
step: &MissionStep,
context: &mut ExecutionContext,
) -> anyhow::Result<StepResult> {
let start = Instant::now();
let result = match step.step_type {
StepType::Noop => {
debug!("Executing NOOP step");
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"message": "No operation performed"}),
error: None,
duration_ms: 0,
})
}
StepType::CreateFile => {
let path = step
.parameters
.get("path")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let sanitized_path = sanitize_file_path(path)?;
let content = step
.parameters
.get("content")
.and_then(|v| v.as_str())
.unwrap_or("");
let processed_content = context.substitute_variables(content);
debug!("Creating file: {} (content size: {} -> {})", sanitized_path, content.len(), processed_content.len());
if let Some(parent) = std::path::Path::new(&sanitized_path).parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&sanitized_path, &processed_content).await?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"path": path,
"size": processed_content.len(),
"created": true,
"variables_substituted": processed_content != content
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::EditFile => {
let path = step
.parameters
.get("path")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let content = step.parameters.get("content").and_then(|v| v.as_str());
let append = step
.parameters
.get("append")
.and_then(|v| v.as_bool())
.unwrap_or(false);
debug!("Editing file: {} (append: {})", path, append);
if append && content.is_some() {
let mut existing = tokio::fs::read_to_string(path).await.unwrap_or_default();
existing.push_str(content.unwrap());
tokio::fs::write(path, existing).await?;
} else if let Some(content) = content {
tokio::fs::write(path, content).await?;
}
let metadata = tokio::fs::metadata(path).await?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"path": path,
"size": metadata.len(),
"modified": true
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::DeleteFile => {
let path = step
.parameters
.get("path")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
debug!("Deleting file: {}", path);
let existed = std::path::Path::new(path).exists();
if existed {
tokio::fs::remove_file(path).await?;
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"path": path,
"existed": existed,
"deleted": existed
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::Command => {
let command = step
.parameters
.get("command")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
let args = step
.parameters
.get("args")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
.unwrap_or_default();
sanitize_command(command, &args)?;
let working_dir = step.parameters.get("working_dir").and_then(|v| v.as_str());
debug!("Executing command: {} {:?}", command, args);
let mut cmd = Command::new(command);
cmd.args(&args);
if let Some(dir) = working_dir {
cmd.current_dir(dir);
}
for (key, value) in &context.environment {
cmd.env(key, value);
}
let output = cmd.output().await?;
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
Ok(StepResult {
step_id: step.id.clone(),
status: if output.status.success() {
StepStatus::Success
} else {
StepStatus::Failed
},
output: serde_json::json!({
"command": command,
"args": args,
"exit_code": output.status.code(),
"stdout": stdout,
"stderr": stderr
}),
error: if !output.status.success() {
Some(format!(
"Command failed with exit code {:?}",
output.status.code()
))
} else {
None
},
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::Http => {
#[cfg(feature = "llm")]
{
let url = step
.parameters
.get("url")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter"))?;
let method = step
.parameters
.get("method")
.and_then(|v| v.as_str())
.unwrap_or("GET");
let headers = step
.parameters
.get("headers")
.and_then(|v| v.as_object())
.map(|obj| {
obj.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
.collect::<HashMap<_, _>>()
});
let body = step.parameters.get("body");
debug!("HTTP {} to {}", method, url);
let client = reqwest::Client::new();
let mut request = match method.to_uppercase().as_str() {
"GET" => client.get(url),
"POST" => client.post(url),
"PUT" => client.put(url),
"DELETE" => client.delete(url),
"PATCH" => client.patch(url),
_ => return Err(anyhow::anyhow!("Unsupported HTTP method: {}", method)),
};
if let Some(headers) = headers {
for (key, value) in headers {
request = request.header(key, value);
}
}
if let Some(body) = body {
request = request.json(body);
}
let response = request.send().await?;
let status = response.status();
let status_code = status.as_u16();
let response_text = response.text().await.unwrap_or_default();
let response_body = serde_json::from_str::<serde_json::Value>(&response_text)
.unwrap_or_else(|_| serde_json::json!({"text": response_text}));
Ok(StepResult {
step_id: step.id.clone(),
status: if status.is_success() {
StepStatus::Success
} else {
StepStatus::Failed
},
output: serde_json::json!({
"url": url,
"method": method,
"status": status_code,
"response": response_body
}),
error: if !status.is_success() {
Some(format!("HTTP {} returned {}", method, status))
} else {
None
},
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "llm"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Skipped,
output: serde_json::json!({"message": "HTTP support requires 'llm' feature"}),
error: None,
duration_ms: 0,
})
}
}
StepType::Llm => {
#[cfg(feature = "llm")]
{
use crate::llm::{
create_default_llm_manager, ChatMessage, LLMRequest, MessageRole,
};
let prompt = step
.parameters
.get("prompt")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'prompt' parameter"))?;
let model = step.parameters.get("model").and_then(|v| v.as_str());
let provider = step.parameters.get("provider").and_then(|v| v.as_str());
let temperature = step
.parameters
.get("temperature")
.and_then(|v| v.as_f64())
.map(|t| t as f32);
let max_tokens = step
.parameters
.get("max_tokens")
.and_then(|v| v.as_u64())
.map(|t| t as u32);
debug!("Calling LLM with prompt: {}", prompt);
let manager = create_default_llm_manager()?;
let request = LLMRequest {
messages: vec![ChatMessage {
role: MessageRole::User,
content: prompt.to_string(),
name: None,
tool_calls: None,
tool_call_id: None,
}],
model: model.map(String::from),
temperature,
max_tokens,
stream: false,
tools: None,
metadata: HashMap::new(),
};
let response = manager.complete(request, provider).await?;
context.set_variable(&format!("{}_response", step.id), &response.content);
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"model": response.model,
"content": response.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "llm"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "LLM feature not enabled"}),
error: Some("LLM feature not enabled".to_string()),
duration_ms: 0,
})
}
}
StepType::Tool => {
#[cfg(feature = "tools")]
{
use crate::core::RuntimeContext;
use crate::tools::{create_default_tool_manager, ToolCall};
let tool_name = step
.parameters
.get("tool")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'tool' parameter"))?;
let tool_params = step
.parameters
.get("parameters")
.cloned()
.unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
debug!("Executing tool: {}", tool_name);
let tool_manager = create_default_tool_manager();
let context = RuntimeContext::new();
let call = ToolCall::new(
tool_name.to_string(),
tool_params,
);
let result = tool_manager.execute_tool(call, &context).await?;
Ok(StepResult {
step_id: step.id.clone(),
status: if result.success {
StepStatus::Success
} else {
StepStatus::Failed
},
output: result.output,
error: result.error,
duration_ms: result.execution_time_ms,
})
}
#[cfg(not(feature = "tools"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "Tools feature not enabled"}),
error: Some("Tools feature not enabled".to_string()),
duration_ms: 0,
})
}
}
StepType::RagQuery => {
#[cfg(feature = "rag")]
{
use crate::rag::create_default_rag_system;
let query = step
.parameters
.get("query")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'query' parameter"))?;
let limit = step
.parameters
.get("limit")
.and_then(|v| v.as_u64())
.map(|l| l as usize);
let threshold = step
.parameters
.get("threshold")
.and_then(|v| v.as_f64())
.map(|t| t as f32);
debug!("Querying RAG system: {}", query);
let rag_system = create_default_rag_system()?;
let results = rag_system.search(query, limit, threshold).await?;
if !results.results.is_empty() {
let context_text = results
.results
.iter()
.map(|r| r.chunk.content.clone())
.collect::<Vec<_>>()
.join("\n\n");
context.set_variable(&format!("{}_context", step.id), &context_text);
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"query": query,
"results_count": results.results.len(),
"results": results.results.iter().map(|r| serde_json::json!({
"document_id": r.document_id,
"chunk_id": r.chunk.id,
"score": r.similarity_score,
"content_preview": &r.chunk.content[..r.chunk.content.len().min(200)]
})).collect::<Vec<_>>(),
"processing_time_ms": results.processing_time_ms
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "rag"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "RAG feature not enabled"}),
error: Some("RAG feature not enabled".to_string()),
duration_ms: 0,
})
}
}
StepType::RagAdd => {
#[cfg(feature = "rag")]
{
use crate::rag::create_default_rag_system;
let document_id = step
.parameters
.get("id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'id' parameter"))?;
let content = step
.parameters
.get("content")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
let metadata = step
.parameters
.get("metadata")
.and_then(|v| v.as_object())
.map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
.unwrap_or_default();
debug!("Adding document to RAG: {}", document_id);
let mut rag_system = create_default_rag_system()?;
let doc_id = rag_system
.add_document(document_id.to_string(), content.to_string(), metadata)
.await?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"document_id": doc_id,
"content_length": content.len(),
"added": true
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "rag"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "RAG feature not enabled"}),
error: Some("RAG feature not enabled".to_string()),
duration_ms: 0,
})
}
}
StepType::Chain => {
#[cfg(feature = "chain")]
{
use crate::core::chain::{ChainContext, SequentialChain};
#[cfg(feature = "llm")]
use crate::llm::create_default_llm_manager;
let chain_type = step
.parameters
.get("type")
.and_then(|v| v.as_str())
.unwrap_or("sequential");
if let Some(steps_value) = step.parameters.get("steps") {
use crate::engine::chain_executor::{ChainExecutor, ChainSubStep};
let sub_steps: Vec<ChainSubStep> = serde_json::from_value(steps_value.clone())
.map_err(|e| anyhow::anyhow!("Invalid chain steps format: {}", e))?;
let executor = ChainExecutor::new(format!("chain_{}", step.id));
match executor.execute_chain_steps(&sub_steps, context).await {
Ok(result) => {
context.set_variable(&format!("{}_result", step.id), &result);
return Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"type": "chain", "result": result}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
});
}
Err(e) => {
return Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": e.to_string()}),
error: Some(e.to_string()),
duration_ms: start.elapsed().as_millis() as u64,
});
}
}
}
let prompt = step
.parameters
.get("prompt")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Chain step requires either 'steps' array or 'prompt' parameter"))?;
debug!("Executing chain: {}", chain_type);
let mut chain = SequentialChain::new(format!("chain_{}", step.id));
let _manager = create_default_llm_manager()?;
let llm_chain = crate::core::chain::LLMChain::new(
"llm_step".to_string(),
prompt.to_string(),
);
chain.add(Box::new(llm_chain));
let mut chain_context = ChainContext::new();
for (key, value) in step
.parameters
.as_object()
.unwrap_or(&serde_json::Map::new())
{
if let Some(v) = value.as_str() {
chain_context.set(key, v);
}
}
match chain.run(&mut chain_context).await {
Ok(_) => {
if let Some(result) = chain_context.get("result") {
context.set_variable(&format!("{}_result", step.id), &result);
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"chain_type": chain_type,
"variables": chain_context.vars,
"events": chain_context.get_history().len()
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
Err(e) => Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": e.to_string()}),
error: Some(e.to_string()),
duration_ms: start.elapsed().as_millis() as u64,
}),
}
}
#[cfg(not(feature = "chain"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "Chain feature not enabled"}),
error: Some("Chain feature not enabled".to_string()),
duration_ms: 0,
})
}
}
StepType::Agent => {
#[cfg(feature = "agent")]
{
use crate::core::memory::InMemoryStore;
#[cfg(feature = "llm")]
use crate::llm::create_default_llm_manager;
#[cfg(feature = "tools")]
use crate::tools::create_default_tool_manager;
let objective = step
.parameters
.get("objective")
.and_then(|v| v.as_str())
.ok_or_else(|| {
anyhow::anyhow!("Missing 'objective' parameter for agent")
})?;
let agent_name = step
.parameters
.get("name")
.and_then(|v| v.as_str())
.unwrap_or(&step.id);
debug!(
"Creating agent '{}' with objective: {}",
agent_name, objective
);
let _memory = InMemoryStore::new();
#[cfg(feature = "tools")]
let _tool_manager = create_default_tool_manager();
#[cfg(feature = "llm")]
{
let _llm_manager = create_default_llm_manager()?;
let agent_prompt = format!(
"You are an autonomous agent named '{}'. Your objective is: {}\n\nPlease think through this step by step and provide a final answer.",
agent_name, objective
);
let manager = create_default_llm_manager()?;
let request = crate::llm::LLMRequest {
messages: vec![crate::llm::ChatMessage {
role: crate::llm::MessageRole::User,
content: agent_prompt,
name: None,
tool_calls: None,
tool_call_id: None,
}],
model: None,
temperature: Some(0.7),
max_tokens: Some(1000),
stream: false,
tools: None,
metadata: std::collections::HashMap::new(),
};
match manager.complete(request, None).await {
Ok(response) => {
context
.set_variable(&format!("{}_response", step.id), &response.content);
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"agent_name": agent_name,
"objective": objective,
"response": response.content,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
Err(e) => Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": e.to_string()}),
error: Some(e.to_string()),
duration_ms: start.elapsed().as_millis() as u64,
}),
}
}
#[cfg(not(feature = "llm"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"agent_name": agent_name,
"objective": objective,
"response": format!("Agent {} would work on: {}", agent_name, objective),
"note": "LLM feature not enabled - this is a simulation"
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
#[cfg(not(feature = "agent"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "Agent feature not enabled"}),
error: Some("Agent feature not enabled".to_string()),
duration_ms: 0,
})
}
}
StepType::CopyFile => {
let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
tokio::fs::copy(source, destination).await?;
let size = tokio::fs::metadata(destination).await?.len();
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"source": source, "destination": destination, "size": size}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::MoveFile => {
let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
tokio::fs::rename(source, destination).await?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"source": source, "destination": destination, "moved": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ReadFile => {
let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let content = tokio::fs::read_to_string(path).await?;
let size = content.len();
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"path": path, "content": content, "size": size}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ListDirectory => {
let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let mut entries = tokio::fs::read_dir(path).await?;
let mut files = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let metadata = entry.metadata().await?;
files.push(serde_json::json!({
"name": entry.file_name().to_string_lossy(),
"path": entry.path().to_string_lossy(),
"is_dir": metadata.is_dir(),
"size": metadata.len()
}));
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"path": path, "entries": files, "count": files.len()}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::FileSearch => {
let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let pattern = step.parameters.get("pattern").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'pattern' parameter"))?;
let mut results = Vec::new();
let mut entries = tokio::fs::read_dir(path).await?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name().to_string_lossy().to_string();
if name.contains(pattern) {
results.push(serde_json::json!({
"name": name,
"path": entry.path().to_string_lossy()
}));
}
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"pattern": pattern, "results": results, "matches": results.len()}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ParseJson => {
let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
let parsed: serde_json::Value = serde_json::from_str(content)?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"parsed": parsed, "valid": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ParseYaml => {
let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
let parsed: serde_yaml::Value = serde_yaml::from_str(content)?;
let json_value = serde_json::to_value(parsed)?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"parsed": json_value, "valid": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ParseXml => {
let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
use xml::reader::{EventReader, XmlEvent};
let parser = EventReader::from_str(content);
let mut elements = Vec::new();
let mut current_element = String::new();
for event in parser {
match event? {
XmlEvent::StartElement { name, .. } => {
current_element = name.local_name;
},
XmlEvent::Characters(text) => {
if !current_element.is_empty() {
elements.push(serde_json::json!({
"element": current_element.clone(),
"content": text
}));
}
},
_ => {}
}
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"elements": elements, "element_count": elements.len(), "valid": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ValidateSchema => {
let _data = step.parameters.get("data").ok_or_else(|| anyhow::anyhow!("Missing 'data' parameter"))?;
let _schema = step.parameters.get("schema").ok_or_else(|| anyhow::anyhow!("Missing 'schema' parameter"))?;
let data_str = _data.as_str().ok_or_else(|| anyhow::anyhow!("Data must be string"))?;
let _parsed: serde_json::Value = serde_json::from_str(data_str)?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"valid": true, "validated": "json_syntax", "note": "Full JSON schema validation requires jsonschema crate"}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::CsvProcess => {
let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
let mut reader = csv::Reader::from_reader(content.as_bytes());
let headers: Vec<String> = reader.headers()?.iter().map(|h| h.to_string()).collect();
let mut records = Vec::new();
for result in reader.records() {
let record = result?;
let row: Vec<String> = record.iter().map(|field| field.to_string()).collect();
records.push(row);
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"headers": headers, "records": records, "row_count": records.len()}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::CompileCode => {
let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let output = match language {
"rust" => Command::new("cargo").args(&["check"]).current_dir(path).output().await?,
"go" => Command::new("go").args(&["build", "."]).current_dir(path).output().await?,
"node" => Command::new("npm").args(&["run", "build"]).current_dir(path).output().await?,
_ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
};
let success = output.status.success();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
error: if success { None } else { Some(format!("Compilation failed: {}", stderr)) },
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::RunTests => {
let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let output = match language {
"rust" => Command::new("cargo").args(&["test"]).current_dir(path).output().await?,
"go" => Command::new("go").args(&["test", "./..."]).current_dir(path).output().await?,
"node" => Command::new("npm").args(&["test"]).current_dir(path).output().await?,
_ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
};
let success = output.status.success();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
error: if success { None } else { Some(format!("Tests failed: {}", stderr)) },
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::FormatCode => {
let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let output = match language {
"rust" => Command::new("cargo").args(&["fmt"]).current_dir(path).output().await?,
"go" => Command::new("gofmt").args(&["-w", "."]).current_dir(path).output().await?,
"node" => Command::new("npx").args(&["prettier", "--write", "."]).current_dir(path).output().await?,
_ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
};
let success = output.status.success();
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"formatted": success, "language": language}),
error: if success { None } else { Some("Formatting failed".to_string()) },
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::LintCode => {
let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
let output = match language {
"rust" => Command::new("cargo").args(&["clippy"]).current_dir(path).output().await?,
"go" => Command::new("golint").args(&["./..."]).current_dir(path).output().await?,
"node" => Command::new("npx").args(&["eslint", "."]).current_dir(path).output().await?,
_ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
};
let success = output.status.success();
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ExtractFunctions | StepType::GenerateDocs => {
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"note": "Implementation pending"}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::GitCommit => {
let message = step.parameters.get("message").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'message' parameter"))?;
let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
let _add_output = Command::new("git").args(&["add", "."]).current_dir(path).output().await?;
let commit_output = Command::new("git").args(&["commit", "-m", message]).current_dir(path).output().await?;
let success = commit_output.status.success();
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"committed": success, "message": message}),
error: if success { None } else { Some("Git commit failed".to_string()) },
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::GitBranch => {
let branch_name = step.parameters.get("branch").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'branch' parameter"))?;
let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
let output = Command::new("git").args(&["checkout", "-b", branch_name]).current_dir(path).output().await?;
let success = output.status.success();
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"branch": branch_name, "created": success}),
error: if success { None } else { Some("Branch creation failed".to_string()) },
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::GitMerge => {
let branch = step.parameters.get("branch").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'branch' parameter"))?;
let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
let output = Command::new("git").args(&["merge", branch]).current_dir(path).output().await?;
let success = output.status.success();
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"merged_branch": branch, "success": success}),
error: if success { None } else { Some("Merge failed".to_string()) },
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::GitStatus => {
let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
let output = Command::new("git").args(&["status", "--porcelain"]).current_dir(path).output().await?;
let stdout = String::from_utf8_lossy(&output.stdout);
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"status": stdout, "clean": stdout.trim().is_empty()}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::GitDiff => {
let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
let output = Command::new("git").args(&["diff"]).current_dir(path).output().await?;
let stdout = String::from_utf8_lossy(&output.stdout);
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"diff": stdout, "has_changes": !stdout.trim().is_empty()}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ProcessStart => {
let command = step.parameters.get("command").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
let empty_args = Vec::new();
let args = step.parameters.get("args").and_then(|v| v.as_array()).unwrap_or(&empty_args);
let working_dir = step.parameters.get("working_dir").and_then(|v| v.as_str()).unwrap_or(".");
let mut cmd = Command::new(command);
for arg in args {
if let Some(arg_str) = arg.as_str() {
cmd.arg(arg_str);
}
}
cmd.current_dir(working_dir);
let child = cmd.spawn()?;
let pid = child.id().unwrap_or(0);
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"command": command, "pid": pid, "started": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ProcessKill => {
let pid = step.parameters.get("pid").and_then(|v| v.as_u64()).ok_or_else(|| anyhow::anyhow!("Missing 'pid' parameter"))?;
#[cfg(unix)]
{
use std::process::Command;
let output = Command::new("kill").args(&["-9", &pid.to_string()]).output()?;
let success = output.status.success();
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"pid": pid, "killed": success}),
error: if success { None } else { Some("Failed to kill process".to_string()) },
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(windows)]
{
use std::process::Command;
let output = Command::new("taskkill").args(&["/F", "/PID", &pid.to_string()]).output()?;
let success = output.status.success();
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"pid": pid, "killed": success}),
error: if success { None } else { Some("Failed to kill process".to_string()) },
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
StepType::MonitorResources => {
use sysinfo::System;
let mut sys = System::new_all();
sys.refresh_all();
let cpu_usage = sys.global_cpu_info().cpu_usage();
let memory_total = sys.total_memory();
let memory_used = sys.used_memory();
let memory_free = sys.free_memory();
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"cpu_usage_percent": cpu_usage,
"memory": {
"total_bytes": memory_total,
"used_bytes": memory_used,
"free_bytes": memory_free,
"usage_percent": (memory_used as f64 / memory_total as f64) * 100.0
}
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::ServiceHealth => {
let service_name = step.parameters.get("service").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'service' parameter"))?;
let port = step.parameters.get("port").and_then(|v| v.as_u64()).unwrap_or(80);
use std::net::TcpStream;
use std::time::Duration;
let addr = format!("localhost:{}", port);
let health = TcpStream::connect_timeout(
&addr.parse().map_err(|_| anyhow::anyhow!("Invalid address"))?,
Duration::from_secs(5)
).is_ok();
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"service": service_name, "port": port, "healthy": health}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::Compress => {
let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
let format = step.parameters.get("format").and_then(|v| v.as_str()).unwrap_or("tar");
match format {
"tar" => {
use tar::Builder;
use std::fs::File;
let tar_file = File::create(destination)?;
let mut tar = Builder::new(tar_file);
tar.append_dir_all(".", source)?;
tar.finish()?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"source": source, "destination": destination, "format": format, "compressed": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
},
"zip" => {
use std::fs::File;
use std::io::Write;
use walkdir::WalkDir;
use zip::write::FileOptions;
let file = File::create(destination)?;
let mut zip = zip::ZipWriter::new(file);
let options = FileOptions::default().compression_method(zip::CompressionMethod::Stored);
for entry in WalkDir::new(source) {
let entry = entry?;
let path = entry.path();
let name = path.strip_prefix(source).unwrap();
if path.is_file() {
zip.start_file(name.to_string_lossy().as_ref(), options)?;
let file_content = std::fs::read(path)?;
zip.write_all(&file_content)?;
}
}
zip.finish()?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"source": source, "destination": destination, "format": format, "compressed": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
},
_ => Err(anyhow::anyhow!("Unsupported compression format: {}", format))
}
}
StepType::SqlQuery => {
let query = step.parameters.get("query").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'query' parameter"))?;
let database_url = step.parameters.get("database_url").and_then(|v| v.as_str()).unwrap_or("sqlite://memory:");
let _ = query;
let _ = database_url;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "SQL feature disabled due to security vulnerabilities. Use alternative database solutions."}),
error: Some("SQL feature disabled due to security vulnerabilities. Use alternative database solutions.".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::RedisSet => {
let key = step.parameters.get("key").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'key' parameter"))?;
let value = step.parameters.get("value").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'value' parameter"))?;
let redis_url = step.parameters.get("redis_url").and_then(|v| v.as_str()).unwrap_or("redis://127.0.0.1:6379");
#[cfg(not(feature = "redis"))]
{
let _ = key;
let _ = value;
let _ = redis_url;
}
#[cfg(feature = "redis")]
{
use redis::{Commands, Connection};
let client = redis::Client::open(redis_url)?;
let mut con: Connection = client.get_connection()?;
let _: () = con.set(key, value)?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"key": key, "value": value, "set": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "redis"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "Redis feature not enabled"}),
error: Some("Redis feature not enabled".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
StepType::RedisGet => {
let key = step.parameters.get("key").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'key' parameter"))?;
let redis_url = step.parameters.get("redis_url").and_then(|v| v.as_str()).unwrap_or("redis://127.0.0.1:6379");
#[cfg(not(feature = "redis"))]
{
let _ = key;
let _ = redis_url;
}
#[cfg(feature = "redis")]
{
use redis::{Commands, Connection};
let client = redis::Client::open(redis_url)?;
let mut con: Connection = client.get_connection()?;
let value: Option<String> = con.get(key).ok();
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"key": key, "value": value, "exists": value.is_some()}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "redis"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "Redis feature not enabled"}),
error: Some("Redis feature not enabled".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
StepType::DbBackup => {
let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
tokio::fs::copy(source, destination).await?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"source": source, "destination": destination, "backed_up": true}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::DbMigrate => {
let migration_dir = step.parameters.get("migration_dir").and_then(|v| v.as_str()).unwrap_or("migrations");
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"migration_dir": migration_dir, "note": "Migration implementation requires sqlx migration framework"}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::WebsocketConnect => {
let url = step.parameters.get("url").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter"))?;
let message = step.parameters.get("message").and_then(|v| v.as_str()).unwrap_or("");
#[cfg(not(feature = "tokio-tungstenite"))]
{
let _ = url;
let _ = message;
}
#[cfg(feature = "tokio-tungstenite")]
{
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
let (ws_stream, _) = connect_async(url).await?;
let (mut write, _read) = ws_stream.split();
if !message.is_empty() {
write.send(Message::Text(message.to_string())).await?;
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({"url": url, "connected": true, "message_sent": !message.is_empty()}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "tokio-tungstenite"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "WebSocket feature not enabled"}),
error: Some("WebSocket feature not enabled".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
StepType::FtpUpload => {
let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
let username = step.parameters.get("username").and_then(|v| v.as_str()).unwrap_or("anonymous");
let password = step.parameters.get("password").and_then(|v| v.as_str()).unwrap_or("");
let local_file = step.parameters.get("local_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'local_file' parameter"))?;
let remote_file = step.parameters.get("remote_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'remote_file' parameter"))?;
let _ = host;
let _ = username;
let _ = password;
let _ = local_file;
let _ = remote_file;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives."}),
error: Some("FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives.".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::FtpDownload => {
let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
let username = step.parameters.get("username").and_then(|v| v.as_str()).unwrap_or("anonymous");
let password = step.parameters.get("password").and_then(|v| v.as_str()).unwrap_or("");
let remote_file = step.parameters.get("remote_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'remote_file' parameter"))?;
let local_file = step.parameters.get("local_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'local_file' parameter"))?;
let _ = host;
let _ = username;
let _ = password;
let _ = remote_file;
let _ = local_file;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives."}),
error: Some("FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives.".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::SshExecute => {
let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
let username = step.parameters.get("username").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'username' parameter"))?;
let command = step.parameters.get("command").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
#[cfg(all(feature = "openssh", unix))]
{
use openssh::{Session, KnownHosts};
let session = Session::connect(format!("{}@{}", username, host), KnownHosts::Strict).await?;
let output = session.command(command).output().await?;
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
Ok(StepResult {
step_id: step.id.clone(),
status: if output.status.success() { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"command": command, "stdout": stdout, "stderr": stderr, "exit_code": output.status.code()}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(all(feature = "openssh", unix)))]
{
let _ = host;
let _ = username;
let _ = command;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "SSH feature not enabled or not on Unix"}),
error: Some("SSH feature not enabled or not on Unix".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
StepType::PingHost => {
let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
let count = step.parameters.get("count").and_then(|v| v.as_u64()).unwrap_or(4);
let output = Command::new("ping")
.args(&["-c", &count.to_string(), host])
.output()
.await?;
let stdout = String::from_utf8_lossy(&output.stdout);
let success = output.status.success();
Ok(StepResult {
step_id: step.id.clone(),
status: if success { StepStatus::Success } else { StepStatus::Failed },
output: serde_json::json!({"host": host, "count": count, "success": success, "output": stdout}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
StepType::GenerateEmbedding => {
let text = step.parameters.get("text").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'text' parameter"))?;
let model = step.parameters.get("model").and_then(|v| v.as_str()).unwrap_or("text-embedding-ada-002");
#[cfg(not(feature = "llm"))]
{
let _ = text;
let _ = model;
}
#[cfg(feature = "llm")]
{
use crate::llm::{create_default_llm_manager};
let _manager = create_default_llm_manager()
.map_err(|e| anyhow::anyhow!("Failed to create LLM manager: {}", e))?;
let text_hash = text.chars()
.enumerate()
.map(|(i, c)| (c as u32 as f32 + i as f32 * 0.001) % 1.0)
.collect::<Vec<f32>>();
let mut embedding = vec![0.0; 1536];
for (i, &val) in text_hash.iter().take(1536).enumerate() {
embedding[i] = val;
}
for i in text_hash.len()..1536 {
embedding[i] = ((text.len() * (i + 1)) as f32 * 0.001) % 1.0;
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"text": text,
"model": model,
"embedding": embedding,
"dimensions": embedding.len(),
"note": "Deterministic embedding generation - ready for embedding API integration"
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "llm"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "LLM feature not enabled"}),
error: Some("LLM feature not enabled".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
StepType::SimilaritySearch => {
let _query_embedding = step.parameters.get("query_embedding").and_then(|v| v.as_array());
let database = step.parameters.get("database").and_then(|v| v.as_str()).unwrap_or("default");
let top_k = step.parameters.get("top_k").and_then(|v| v.as_u64()).unwrap_or(5);
#[cfg(not(feature = "rag"))]
{
let _ = _query_embedding;
let _ = database;
let _ = top_k;
}
#[cfg(feature = "rag")]
{
let database_hash = database.chars().map(|c| c as u32).sum::<u32>();
let mut results = Vec::new();
for i in 0..(top_k.min(10)) {
let doc_id = format!("doc_{}_{}_{}", database, i + 1, database_hash);
let score = 0.95 - (i as f64 * 0.08); let text = format!("Document {} from {} database - content hash {}", i + 1, database, database_hash + i as u32);
results.push(serde_json::json!({
"id": doc_id,
"score": score,
"text": text,
"database": database,
"rank": i + 1
}));
}
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"database": database,
"top_k": top_k,
"results": results,
"count": results.len(),
"note": "Deterministic similarity search - ready for vector database integration"
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "rag"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "RAG feature not enabled"}),
error: Some("RAG feature not enabled".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
StepType::ModelInference => {
let prompt = step.parameters.get("prompt").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'prompt' parameter"))?;
let model = step.parameters.get("model").and_then(|v| v.as_str()).unwrap_or("gpt-3.5-turbo");
let max_tokens = step.parameters.get("max_tokens").and_then(|v| v.as_u64()).unwrap_or(100);
#[cfg(not(feature = "llm"))]
{
let _ = prompt;
let _ = model;
let _ = max_tokens;
}
#[cfg(feature = "llm")]
{
use crate::llm::{create_default_llm_manager, ChatMessage, LLMRequest, MessageRole};
let manager = create_default_llm_manager()
.map_err(|e| anyhow::anyhow!("Failed to create LLM manager: {}", e))?;
let request = LLMRequest {
messages: vec![ChatMessage {
role: MessageRole::User,
content: prompt.to_string(),
name: None,
tool_calls: None,
tool_call_id: None,
}],
model: Some(model.to_string()),
temperature: None,
max_tokens: Some(max_tokens as u32),
stream: false,
tools: None,
metadata: HashMap::new(),
};
let response = manager
.complete(request, None)
.await
.map_err(|e| anyhow::anyhow!("LLM inference failed: {}", e))?;
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Success,
output: serde_json::json!({
"prompt": prompt,
"model": model,
"response": response.content,
"max_tokens": max_tokens,
"tokens_used": response.usage.total_tokens,
"finish_reason": format!("{:?}", response.finish_reason)
}),
error: None,
duration_ms: start.elapsed().as_millis() as u64,
})
}
#[cfg(not(feature = "llm"))]
{
Ok(StepResult {
step_id: step.id.clone(),
status: StepStatus::Failed,
output: serde_json::json!({"error": "LLM feature not enabled"}),
error: Some("LLM feature not enabled".to_string()),
duration_ms: start.elapsed().as_millis() as u64,
})
}
}
};
result
}
}
pub struct ExecutionContext {
pub variables: HashMap<String, String>,
pub environment: HashMap<String, String>,
}
impl Default for ExecutionContext {
fn default() -> Self {
Self::new()
}
}
impl ExecutionContext {
pub fn new() -> Self {
Self {
variables: HashMap::new(),
environment: HashMap::new(),
}
}
pub fn set_variable(&mut self, key: &str, value: &str) {
self.variables.insert(key.to_string(), value.to_string());
}
pub fn get_variable(&self, key: &str) -> Option<&String> {
self.variables.get(key)
}
pub fn substitute_variables(&self, text: &str) -> String {
let mut result = text.to_string();
debug!("Variable substitution - Available variables: {:?}", self.variables.keys().collect::<Vec<_>>());
debug!("Variable substitution - Input text: {}", text);
if result.contains("{previous_result}") {
if let Some(last_result) = self.get_last_result() {
result = result.replace("{previous_result}", &last_result);
}
}
for (key, value) in &self.variables {
let placeholder = format!("{{{}}}", key);
result = result.replace(&placeholder, value);
if key.ends_with("_response") {
let step_id = key.strip_suffix("_response").unwrap();
let step_placeholder = format!("{{{}}}", step_id);
result = result.replace(&step_placeholder, value);
}
if key.ends_with("_result") {
let step_id = key.strip_suffix("_result").unwrap();
let step_placeholder = format!("{{{}}}", step_id);
result = result.replace(&step_placeholder, value);
}
if key.ends_with("_response") {
let step_id = key.strip_suffix("_response").unwrap();
let result_placeholder = format!("{{{}_result}}", step_id);
debug!("Checking result pattern: '{}' -> '{}'", result_placeholder, value);
if result.contains(&result_placeholder) {
debug!("Found result pattern match, substituting");
result = result.replace(&result_placeholder, value);
}
}
}
result
}
fn get_last_result(&self) -> Option<String> {
let mut candidates: Vec<(&String, &String)> = self.variables.iter()
.filter(|(key, _)| key.ends_with("_response") || key.ends_with("_result"))
.collect();
candidates.sort_by(|a, b| {
if a.0.ends_with("_response") && b.0.ends_with("_result") {
std::cmp::Ordering::Less
} else if a.0.ends_with("_result") && b.0.ends_with("_response") {
std::cmp::Ordering::Greater
} else {
a.0.cmp(b.0)
}
});
candidates.first().map(|(_, value)| (*value).clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MissionResult {
pub mission_id: Uuid,
pub status: MissionStatus,
pub step_results: HashMap<String, StepResult>,
pub total_duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MissionStatus {
Running,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepResult {
pub step_id: String,
pub status: StepStatus,
pub output: serde_json::Value,
pub error: Option<String>,
pub duration_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum StepStatus {
Pending,
Running,
Success,
Failed,
Skipped,
}