rustchain/engine/
mod.rs

1// use crate::assert_invariant; // Future enhancement: assert_invariant macro implementation
2use schemars::JsonSchema;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::time::Instant;
6use tokio::process::Command;
7use tracing::{debug, error, info, warn};
8use uuid::Uuid;
9
10// Feature-gated imports - sqlx temporarily disabled due to RSA vulnerability
11// #[cfg(feature = "sqlx")]
12// use sqlx::Column;
13#[cfg(feature = "tokio-tungstenite")]
14use futures::{SinkExt, StreamExt};
15
16pub mod chain_executor;
17
18/// Security function to sanitize file paths and prevent path traversal attacks
19fn sanitize_file_path(path: &str) -> anyhow::Result<String> {
20    use std::path::{Path, Component};
21    
22    // Reject paths with dangerous patterns
23    if path.contains("..") || path.contains("~") {
24        return Err(anyhow::anyhow!("Path traversal detected: {}", path));
25    }
26    
27    // Reject Windows reserved names
28    let windows_reserved = ["CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", 
29                           "COM5", "COM6", "COM7", "COM8", "COM9", "LPT1", "LPT2", 
30                           "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"];
31    
32    let file_name = Path::new(path).file_name()
33        .and_then(|n| n.to_str())
34        .unwrap_or("");
35    
36    if windows_reserved.iter().any(|&reserved| file_name.eq_ignore_ascii_case(reserved)) {
37        return Err(anyhow::anyhow!("Windows reserved filename: {}", path));
38    }
39    
40    // Normalize the path and ensure it doesn't escape
41    let normalized = Path::new(path).components()
42        .filter(|component| match component {
43            Component::Normal(_) => true,
44            Component::CurDir => false,
45            Component::ParentDir => false,
46            _ => true, // Keep RootDir, Prefix
47        })
48        .collect::<std::path::PathBuf>();
49    
50    // Convert to string and validate
51    let sanitized = normalized.to_string_lossy().to_string();
52    
53    // Additional validation - ensure path doesn't start with system paths
54    if cfg!(unix) && (sanitized.starts_with("/etc/") || sanitized.starts_with("/sys/") || 
55                      sanitized.starts_with("/proc/") || sanitized.starts_with("/dev/")) {
56        return Err(anyhow::anyhow!("Access to system directory denied: {}", sanitized));
57    }
58    
59    if cfg!(windows) && (sanitized.to_lowercase().starts_with("c:\\windows\\") || 
60                         sanitized.to_lowercase().starts_with("c:\\system32\\")) {
61        return Err(anyhow::anyhow!("Access to system directory denied: {}", sanitized));
62    }
63    
64    Ok(sanitized)
65}
66
67/// Security function to validate commands and prevent command injection
68fn sanitize_command(command: &str, args: &[&str]) -> anyhow::Result<()> {
69    // Reject commands with dangerous patterns
70    let dangerous_patterns = [
71        "&&", "||", ";", "|", "`", "$", "&", ">", "<", 
72        "$(", "${", "rm -rf", "del /f", "format", "shutdown", "reboot"
73    ];
74    
75    for pattern in dangerous_patterns {
76        if command.contains(pattern) {
77            return Err(anyhow::anyhow!("Dangerous command pattern detected: {}", pattern));
78        }
79        
80        for arg in args {
81            if arg.contains(pattern) {
82                return Err(anyhow::anyhow!("Dangerous argument pattern detected: {}", pattern));
83            }
84        }
85    }
86    
87    // Reject direct system access commands
88    let dangerous_commands = [
89        "rm", "rmdir", "del", "deltree", "format", "fdisk", "mkfs",
90        "sudo", "su", "passwd", "useradd", "userdel", "chmod", "chown",
91        "shutdown", "reboot", "halt", "init", "systemctl", "service",
92        "curl", "wget", "nc", "netcat", "telnet", "ssh", "ftp", "sftp"
93    ];
94    
95    for dangerous_cmd in dangerous_commands {
96        if command.eq_ignore_ascii_case(dangerous_cmd) {
97            return Err(anyhow::anyhow!("Dangerous command blocked: {}", dangerous_cmd));
98        }
99    }
100    
101    // Check for script interpreters with inline code (but allow Windows cmd /c for simple commands)
102    let script_interpreters = ["bash", "sh", "powershell", "python", "perl", "ruby"];
103    
104    if script_interpreters.iter().any(|&interp| command.eq_ignore_ascii_case(interp)) {
105        // Allow simple file execution but block inline code
106        for arg in args {
107            if arg.contains("-c") || arg.contains("-e") {
108                return Err(anyhow::anyhow!("Inline script execution blocked"));
109            }
110        }
111    }
112    
113    // Special handling for Windows cmd - allow /c with simple commands only
114    if command.eq_ignore_ascii_case("cmd") {
115        for (i, arg) in args.iter().enumerate() {
116            if *arg == "/c" && i + 1 < args.len() {
117                // Check if the command after /c is dangerous
118                let next_cmd = args[i + 1];
119                for dangerous_cmd in ["del", "rmdir", "format", "shutdown", "reboot"] {
120                    if next_cmd.eq_ignore_ascii_case(dangerous_cmd) {
121                        return Err(anyhow::anyhow!("Dangerous Windows command blocked: {}", next_cmd));
122                    }
123                }
124            }
125        }
126    }
127    
128    Ok(())
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
132pub struct Mission {
133    pub version: String,
134    pub name: String,
135    pub description: Option<String>,
136    pub steps: Vec<MissionStep>,
137    pub config: Option<MissionConfig>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
141pub struct MissionStep {
142    pub id: String,
143    pub name: String,
144    pub step_type: StepType,
145    pub depends_on: Option<Vec<String>>,
146    pub timeout_seconds: Option<u64>,
147    pub continue_on_error: Option<bool>,
148    pub parameters: serde_json::Value,
149}
150
151#[derive(Debug, Clone, JsonSchema)]
152pub enum StepType {
153    // File Operations
154    CreateFile,
155    EditFile,
156    DeleteFile,
157    CopyFile,
158    MoveFile,
159    ReadFile,
160    ListDirectory,
161    FileSearch,
162    // Data Processing
163    ParseJson,
164    ParseYaml,
165    ParseXml,
166    ValidateSchema,
167    CsvProcess,
168    // Code Development
169    CompileCode,
170    RunTests,
171    FormatCode,
172    LintCode,
173    ExtractFunctions,
174    GenerateDocs,
175    // Git Operations
176    GitCommit,
177    GitBranch,
178    GitMerge,
179    GitStatus,
180    GitDiff,
181    // System Operations
182    ProcessStart,
183    ProcessKill,
184    MonitorResources,
185    ServiceHealth,
186    Compress,
187    // Database Operations
188    SqlQuery,
189    RedisSet,
190    RedisGet,
191    DbBackup,
192    DbMigrate,
193    // Network Operations
194    WebsocketConnect,
195    FtpUpload,
196    FtpDownload,
197    SshExecute,
198    PingHost,
199    // AI/ML Operations
200    GenerateEmbedding,
201    SimilaritySearch,
202    ModelInference,
203    // Existing
204    Command,
205    Http,
206    Noop,
207    Llm,
208    Tool,
209    RagQuery,
210    RagAdd,
211    Chain,
212    Agent,
213}
214
215// Custom serialization to support both simple strings and future extensibility
216impl Serialize for StepType {
217    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
218    where
219        S: serde::Serializer,
220    {
221        let s = match self {
222            // File Operations
223            StepType::CreateFile => "create_file",
224            StepType::EditFile => "edit_file",
225            StepType::DeleteFile => "delete_file",
226            StepType::CopyFile => "copy_file",
227            StepType::MoveFile => "move_file",
228            StepType::ReadFile => "read_file",
229            StepType::ListDirectory => "list_directory",
230            StepType::FileSearch => "file_search",
231            // Data Processing
232            StepType::ParseJson => "parse_json",
233            StepType::ParseYaml => "parse_yaml",
234            StepType::ParseXml => "parse_xml",
235            StepType::ValidateSchema => "validate_schema",
236            StepType::CsvProcess => "csv_process",
237            // Code Development
238            StepType::CompileCode => "compile_code",
239            StepType::RunTests => "run_tests",
240            StepType::FormatCode => "format_code",
241            StepType::LintCode => "lint_code",
242            StepType::ExtractFunctions => "extract_functions",
243            StepType::GenerateDocs => "generate_docs",
244            // Git Operations
245            StepType::GitCommit => "git_commit",
246            StepType::GitBranch => "git_branch",
247            StepType::GitMerge => "git_merge",
248            StepType::GitStatus => "git_status",
249            StepType::GitDiff => "git_diff",
250            // System Operations
251            StepType::ProcessStart => "process_start",
252            StepType::ProcessKill => "process_kill",
253            StepType::MonitorResources => "monitor_resources",
254            StepType::ServiceHealth => "service_health",
255            StepType::Compress => "compress",
256            // Database Operations
257            StepType::SqlQuery => "sql_query",
258            StepType::RedisSet => "redis_set",
259            StepType::RedisGet => "redis_get",
260            StepType::DbBackup => "db_backup",
261            StepType::DbMigrate => "db_migrate",
262            // Network Operations
263            StepType::WebsocketConnect => "websocket_connect",
264            StepType::FtpUpload => "ftp_upload",
265            StepType::FtpDownload => "ftp_download",
266            StepType::SshExecute => "ssh_execute",
267            StepType::PingHost => "ping_host",
268            // AI/ML Operations
269            StepType::GenerateEmbedding => "generate_embedding",
270            StepType::SimilaritySearch => "similarity_search",
271            StepType::ModelInference => "model_inference",
272            // Existing
273            StepType::Command => "command",
274            StepType::Http => "http",
275            StepType::Noop => "noop",
276            StepType::Llm => "llm",
277            StepType::Tool => "tool",
278            StepType::RagQuery => "rag_query",
279            StepType::RagAdd => "rag_add",
280            StepType::Chain => "chain",
281            StepType::Agent => "agent",
282        };
283        serializer.serialize_str(s)
284    }
285}
286
287impl<'de> Deserialize<'de> for StepType {
288    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
289    where
290        D: serde::Deserializer<'de>,
291    {
292        use serde::de::{self, Visitor};
293        use std::fmt;
294
295        struct StepTypeVisitor;
296
297        impl<'de> Visitor<'de> for StepTypeVisitor {
298            type Value = StepType;
299
300            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
301                formatter.write_str("a string or object representing a step type")
302            }
303
304            // Support simple string format (current)
305            fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
306            where
307                E: de::Error,
308            {
309                match value {
310                    // File Operations
311                    "create_file" => Ok(StepType::CreateFile),
312                    "edit_file" => Ok(StepType::EditFile),
313                    "delete_file" => Ok(StepType::DeleteFile),
314                    "copy_file" => Ok(StepType::CopyFile),
315                    "move_file" => Ok(StepType::MoveFile),
316                    "read_file" => Ok(StepType::ReadFile),
317                    "list_directory" => Ok(StepType::ListDirectory),
318                    "file_search" => Ok(StepType::FileSearch),
319                    // Data Processing
320                    "parse_json" => Ok(StepType::ParseJson),
321                    "parse_yaml" => Ok(StepType::ParseYaml),
322                    "parse_xml" => Ok(StepType::ParseXml),
323                    "validate_schema" => Ok(StepType::ValidateSchema),
324                    "csv_process" => Ok(StepType::CsvProcess),
325                    // Code Development
326                    "compile_code" => Ok(StepType::CompileCode),
327                    "run_tests" => Ok(StepType::RunTests),
328                    "format_code" => Ok(StepType::FormatCode),
329                    "lint_code" => Ok(StepType::LintCode),
330                    "extract_functions" => Ok(StepType::ExtractFunctions),
331                    "generate_docs" => Ok(StepType::GenerateDocs),
332                    // Git Operations
333                    "git_commit" => Ok(StepType::GitCommit),
334                    "git_branch" => Ok(StepType::GitBranch),
335                    "git_merge" => Ok(StepType::GitMerge),
336                    "git_status" => Ok(StepType::GitStatus),
337                    "git_diff" => Ok(StepType::GitDiff),
338                    // System Operations
339                    "process_start" => Ok(StepType::ProcessStart),
340                    "process_kill" => Ok(StepType::ProcessKill),
341                    "monitor_resources" => Ok(StepType::MonitorResources),
342                    "service_health" => Ok(StepType::ServiceHealth),
343                    "compress" => Ok(StepType::Compress),
344                    // Database Operations
345                    "sql_query" => Ok(StepType::SqlQuery),
346                    "redis_set" => Ok(StepType::RedisSet),
347                    "redis_get" => Ok(StepType::RedisGet),
348                    "db_backup" => Ok(StepType::DbBackup),
349                    "db_migrate" => Ok(StepType::DbMigrate),
350                    // Network Operations
351                    "websocket_connect" => Ok(StepType::WebsocketConnect),
352                    "ftp_upload" => Ok(StepType::FtpUpload),
353                    "ftp_download" => Ok(StepType::FtpDownload),
354                    "ssh_execute" => Ok(StepType::SshExecute),
355                    "ping_host" => Ok(StepType::PingHost),
356                    // AI/ML Operations
357                    "generate_embedding" => Ok(StepType::GenerateEmbedding),
358                    "similarity_search" => Ok(StepType::SimilaritySearch),
359                    "model_inference" => Ok(StepType::ModelInference),
360                    // Existing
361                    "command" => Ok(StepType::Command),
362                    "http" => Ok(StepType::Http),
363                    "noop" => Ok(StepType::Noop),
364                    "llm" => Ok(StepType::Llm),
365                    "tool" => Ok(StepType::Tool),
366                    "rag_query" => Ok(StepType::RagQuery),
367                    "rag_add" => Ok(StepType::RagAdd),
368                    "chain" => Ok(StepType::Chain),
369                    "agent" => Ok(StepType::Agent),
370                    // Support legacy JSON format
371                    "Tool" => Ok(StepType::Tool),
372                    other => Err(E::unknown_variant(
373                        other,
374                        &[
375                            "create_file",
376                            "edit_file",
377                            "delete_file",
378                            "command",
379                            "http",
380                            "noop",
381                            "llm",
382                            "tool",
383                            "rag_query",
384                            "rag_add",
385                            "chain",
386                            "agent",
387                        ],
388                    )),
389                }
390            }
391
392            // Support future rich object format
393            fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
394            where
395                V: de::MapAccess<'de>,
396            {
397                let mut step_type: Option<String> = None;
398
399                while let Some(key) = map.next_key::<String>()? {
400                    match key.as_str() {
401                        "type" => {
402                            if step_type.is_some() {
403                                return Err(de::Error::duplicate_field("type"));
404                            }
405                            step_type = Some(map.next_value()?);
406                        }
407                        // Skip other fields for now (future extensibility)
408                        _ => {
409                            let _: serde_json::Value = map.next_value()?;
410                        }
411                    }
412                }
413
414                let step_type = step_type.ok_or_else(|| de::Error::missing_field("type"))?;
415                self.visit_str(&step_type)
416            }
417        }
418
419        deserializer.deserialize_any(StepTypeVisitor)
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use tempfile::TempDir;
427    use tokio_test;
428    use std::io::Write;
429
430    // ========== STEPTYPE TESTS ==========
431    mod steptype_tests {
432        use super::*;
433
434        #[test]
435        fn test_steptype_simple_string_serialization() {
436            let step_type = StepType::CreateFile;
437            let serialized = serde_json::to_string(&step_type).unwrap();
438            assert_eq!(serialized, "\"create_file\"");
439
440            let deserialized: StepType = serde_json::from_str(&serialized).unwrap();
441            assert!(matches!(deserialized, StepType::CreateFile));
442        }
443
444        #[test]
445        fn test_steptype_legacy_format_support() {
446            let json = "\"Tool\"";
447            let deserialized: StepType = serde_json::from_str(json).unwrap();
448            assert!(matches!(deserialized, StepType::Tool));
449        }
450
451        #[test]
452        fn test_steptype_future_object_format() {
453            // Test future extensible object format
454            let json = r#"{"type": "create_file", "metadata": {"version": "1.2"}}"#;
455            let deserialized: StepType = serde_json::from_str(json).unwrap();
456            assert!(matches!(deserialized, StepType::CreateFile));
457        }
458
459        #[test]
460        fn test_steptype_all_variants() {
461            let variants = vec![
462                (StepType::CreateFile, "create_file"),
463                (StepType::EditFile, "edit_file"),
464                (StepType::DeleteFile, "delete_file"),
465                (StepType::Command, "command"),
466                (StepType::Http, "http"),
467                (StepType::Noop, "noop"),
468                (StepType::Llm, "llm"),
469                (StepType::Tool, "tool"),
470                (StepType::RagQuery, "rag_query"),
471                (StepType::RagAdd, "rag_add"),
472                (StepType::Chain, "chain"),
473                (StepType::Agent, "agent"),
474            ];
475
476            for (step_type, expected_str) in variants {
477                // Test serialization
478                let serialized = serde_json::to_string(&step_type).unwrap();
479                assert_eq!(serialized, format!("\"{}\"", expected_str));
480
481                // Test deserialization
482                let deserialized: StepType = serde_json::from_str(&serialized).unwrap();
483                assert!(std::mem::discriminant(&step_type) == std::mem::discriminant(&deserialized));
484            }
485        }
486
487        #[test]
488        fn test_steptype_invalid_string() {
489            let json = "\"invalid_step\"";
490            let result: Result<StepType, _> = serde_json::from_str(json);
491            assert!(result.is_err());
492        }
493
494        #[test]
495        fn test_steptype_object_missing_type() {
496            let json = r#"{"metadata": {"version": "1.2"}}"#;
497            let result: Result<StepType, _> = serde_json::from_str(json);
498            assert!(result.is_err());
499        }
500
501        #[test]
502        fn test_steptype_object_duplicate_type() {
503            let json = r#"{"type": "create_file", "type": "edit_file"}"#;
504            let result: Result<StepType, _> = serde_json::from_str(json);
505            assert!(result.is_err());
506        }
507    }
508
509    // ========== MISSION AND MISSIONSTEP TESTS ==========
510    mod mission_tests {
511        use super::*;
512
513        #[test]
514        fn test_mission_creation() {
515            let mission = Mission {
516                version: "1.0".to_string(),
517                name: "Test Mission".to_string(),
518                description: Some("A test mission".to_string()),
519                steps: vec![
520                    MissionStep {
521                        id: "step1".to_string(),
522                        name: "First Step".to_string(),
523                        step_type: StepType::Noop,
524                        depends_on: None,
525                        timeout_seconds: Some(60),
526                        continue_on_error: None,
527                parameters: serde_json::json!({"key": "value"}),
528                    }
529                ],
530                config: Some(MissionConfig {
531                    max_parallel_steps: Some(2),
532                    timeout_seconds: Some(300),
533                    fail_fast: Some(true),
534                }),
535            };
536
537            assert_eq!(mission.name, "Test Mission");
538            assert_eq!(mission.steps.len(), 1);
539            assert!(mission.config.is_some());
540        }
541
542        #[test]
543        fn test_mission_step_serialization() {
544            let step = MissionStep {
545                id: "test_step".to_string(),
546                name: "Test Step".to_string(),
547                step_type: StepType::CreateFile,
548                depends_on: Some(vec!["dep1".to_string(), "dep2".to_string()]),
549                timeout_seconds: Some(120),
550                continue_on_error: None,
551                parameters: serde_json::json!({
552                    "path": "/tmp/test.txt",
553                    "content": "Hello, World!"
554                }),
555            };
556
557            let serialized = serde_json::to_string(&step).unwrap();
558            let deserialized: MissionStep = serde_json::from_str(&serialized).unwrap();
559            
560            assert_eq!(deserialized.id, "test_step");
561            assert_eq!(deserialized.name, "Test Step");
562            assert!(matches!(deserialized.step_type, StepType::CreateFile));
563            assert_eq!(deserialized.depends_on.unwrap().len(), 2);
564            assert_eq!(deserialized.timeout_seconds.unwrap(), 120);
565        }
566
567        #[test]
568        fn test_mission_config_defaults() {
569            let config = MissionConfig {
570                max_parallel_steps: None,
571                timeout_seconds: None,
572                fail_fast: None,
573            };
574
575            let serialized = serde_json::to_string(&config).unwrap();
576            let deserialized: MissionConfig = serde_json::from_str(&serialized).unwrap();
577            
578            assert!(deserialized.max_parallel_steps.is_none());
579            assert!(deserialized.timeout_seconds.is_none());
580            assert!(deserialized.fail_fast.is_none());
581        }
582    }
583
584    // ========== MISSION LOADER TESTS ==========
585    mod mission_loader_tests {
586        use super::*;
587        use std::fs;
588
589        #[test]
590        fn test_load_mission_yaml() {
591            let temp_dir = TempDir::new().unwrap();
592            let mission_path = temp_dir.path().join("test_mission.yaml");
593            
594            let mission_yaml = r#"
595version: "1.0"
596name: "YAML Test Mission"
597description: "Testing YAML loading"
598steps:
599  - id: "step1"
600    name: "First Step"
601    step_type: "noop"
602    parameters: {}
603config:
604  max_parallel_steps: 3
605  timeout_seconds: 600
606  fail_fast: true
607"#;
608            fs::write(&mission_path, mission_yaml).unwrap();
609
610            let mission = MissionLoader::load_from_file(mission_path.to_str().unwrap()).unwrap();
611            assert_eq!(mission.name, "YAML Test Mission");
612            assert_eq!(mission.steps.len(), 1);
613            assert!(mission.config.is_some());
614            assert_eq!(mission.config.unwrap().max_parallel_steps.unwrap(), 3);
615        }
616
617        #[test]
618        fn test_load_mission_json() {
619            let temp_dir = TempDir::new().unwrap();
620            let mission_path = temp_dir.path().join("test_mission.json");
621            
622            let mission_json = r#"{
623                "version": "1.0",
624                "name": "JSON Test Mission",
625                "description": "Testing JSON loading",
626                "steps": [
627                    {
628                        "id": "step1",
629                        "name": "First Step",
630                        "step_type": "noop",
631                        "parameters": {}
632                    }
633                ],
634                "config": {
635                    "max_parallel_steps": 5,
636                    "timeout_seconds": 900,
637                    "fail_fast": false
638                }
639            }"#;
640            fs::write(&mission_path, mission_json).unwrap();
641
642            let mission = MissionLoader::load_from_file(mission_path.to_str().unwrap()).unwrap();
643            assert_eq!(mission.name, "JSON Test Mission");
644            assert_eq!(mission.steps.len(), 1);
645            assert!(mission.config.is_some());
646            assert_eq!(mission.config.unwrap().max_parallel_steps.unwrap(), 5);
647        }
648
649        #[test]
650        fn test_load_mission_empty_path() {
651            let result = MissionLoader::load_from_file("");
652            assert!(result.is_err());
653            assert!(result.unwrap_err().to_string().contains("Mission path must not be empty"));
654        }
655
656        #[test]
657        fn test_load_mission_nonexistent_file() {
658            let result = MissionLoader::load_from_file("/nonexistent/path/mission.yaml");
659            assert!(result.is_err());
660        }
661
662        #[test]
663        fn test_validate_mission_success() {
664            let mission = Mission {
665                version: "1.0".to_string(),
666                name: "Valid Mission".to_string(),
667                description: None,
668                steps: vec![
669                    MissionStep {
670                        id: "step1".to_string(),
671                        name: "Step 1".to_string(),
672                        step_type: StepType::Noop,
673                        depends_on: None,
674                        timeout_seconds: None,
675                        continue_on_error: None,
676                parameters: serde_json::json!({}),
677                    },
678                    MissionStep {
679                        id: "step2".to_string(),
680                        name: "Step 2".to_string(),
681                        step_type: StepType::Noop,
682                        depends_on: Some(vec!["step1".to_string()]),
683                        timeout_seconds: None,
684                        continue_on_error: None,
685                parameters: serde_json::json!({}),
686                    },
687                ],
688                config: None,
689            };
690
691            let result = MissionLoader::validate_mission(&mission);
692            assert!(result.is_ok());
693        }
694
695        #[test]
696        fn test_validate_mission_empty_steps() {
697            let mission = Mission {
698                version: "1.0".to_string(),
699                name: "Empty Mission".to_string(),
700                description: None,
701                steps: vec![],
702                config: None,
703            };
704
705            let result = MissionLoader::validate_mission(&mission);
706            assert!(result.is_err());
707            assert!(result.unwrap_err().to_string().contains("Mission must have at least one step"));
708        }
709
710        #[test]
711        fn test_validate_mission_duplicate_ids() {
712            let mission = Mission {
713                version: "1.0".to_string(),
714                name: "Duplicate ID Mission".to_string(),
715                description: None,
716                steps: vec![
717                    MissionStep {
718                        id: "step1".to_string(),
719                        name: "First Step".to_string(),
720                        step_type: StepType::Noop,
721                        depends_on: None,
722                        timeout_seconds: None,
723                        continue_on_error: None,
724                parameters: serde_json::json!({}),
725                    },
726                    MissionStep {
727                        id: "step1".to_string(), // Duplicate ID
728                        name: "Second Step".to_string(),
729                        step_type: StepType::Noop,
730                        depends_on: None,
731                        timeout_seconds: None,
732                        continue_on_error: None,
733                parameters: serde_json::json!({}),
734                    },
735                ],
736                config: None,
737            };
738
739            let result = MissionLoader::validate_mission(&mission);
740            assert!(result.is_err());
741            assert!(result.unwrap_err().to_string().contains("Duplicate step ID"));
742        }
743
744        #[test]
745        fn test_validate_mission_missing_dependency() {
746            let mission = Mission {
747                version: "1.0".to_string(),
748                name: "Missing Dependency Mission".to_string(),
749                description: None,
750                steps: vec![
751                    MissionStep {
752                        id: "step1".to_string(),
753                        name: "Step 1".to_string(),
754                        step_type: StepType::Noop,
755                        depends_on: Some(vec!["nonexistent".to_string()]),
756                        timeout_seconds: None,
757                        continue_on_error: None,
758                parameters: serde_json::json!({}),
759                    },
760                ],
761                config: None,
762            };
763
764            let result = MissionLoader::validate_mission(&mission);
765            assert!(result.is_err());
766            assert!(result.unwrap_err().to_string().contains("depends on non-existent step"));
767        }
768    }
769
770    // ========== DAG EXECUTOR TESTS ==========
771    mod dag_executor_tests {
772        use super::*;
773
774        #[tokio::test]
775        async fn test_topological_sort_simple() {
776            /// Test basic topological sorting with linear dependencies
777            let steps = vec![
778                MissionStep {
779                    id: "step1".to_string(),
780                    name: "First".to_string(),
781                    step_type: StepType::Noop,
782                    depends_on: None,
783                    timeout_seconds: None,
784                    continue_on_error: None,
785                parameters: serde_json::json!({}),
786                },
787                MissionStep {
788                    id: "step2".to_string(),
789                    name: "Second".to_string(),
790                    step_type: StepType::Noop,
791                    depends_on: Some(vec!["step1".to_string()]),
792                    timeout_seconds: None,
793                    continue_on_error: None,
794                parameters: serde_json::json!({}),
795                },
796                MissionStep {
797                    id: "step3".to_string(),
798                    name: "Third".to_string(),
799                    step_type: StepType::Noop,
800                    depends_on: Some(vec!["step2".to_string()]),
801                    timeout_seconds: None,
802                    continue_on_error: None,
803                parameters: serde_json::json!({}),
804                },
805            ];
806
807            let order = DagExecutor::topological_sort(&steps).unwrap();
808            assert_eq!(order, vec!["step1", "step2", "step3"]);
809        }
810
811        #[tokio::test]
812        async fn test_topological_sort_circular_dependency() {
813            /// Test circular dependency detection
814            let steps = vec![
815                MissionStep {
816                    id: "step1".to_string(),
817                    name: "First".to_string(),
818                    step_type: StepType::Noop,
819                    depends_on: Some(vec!["step2".to_string()]),
820                    timeout_seconds: None,
821                    continue_on_error: None,
822                parameters: serde_json::json!({}),
823                },
824                MissionStep {
825                    id: "step2".to_string(),
826                    name: "Second".to_string(),
827                    step_type: StepType::Noop,
828                    depends_on: Some(vec!["step1".to_string()]),
829                    timeout_seconds: None,
830                    continue_on_error: None,
831                parameters: serde_json::json!({}),
832                },
833            ];
834
835            let result = DagExecutor::topological_sort(&steps);
836            assert!(result.is_err());
837            assert!(result.unwrap_err().to_string().contains("Circular dependency"));
838        }
839
840        #[tokio::test]
841        async fn test_execute_mission_simple_success() {
842            /// Test successful execution of a simple mission
843            let mission = Mission {
844                version: "1.0".to_string(),
845                name: "Simple Success Mission".to_string(),
846                description: Some("A simple successful mission".to_string()),
847                steps: vec![
848                    MissionStep {
849                        id: "noop1".to_string(),
850                        name: "First Noop".to_string(),
851                        step_type: StepType::Noop,
852                        depends_on: None,
853                        timeout_seconds: Some(10),
854                        continue_on_error: None,
855                parameters: serde_json::json!({}),
856                    },
857                    MissionStep {
858                        id: "noop2".to_string(),
859                        name: "Second Noop".to_string(),
860                        step_type: StepType::Noop,
861                        depends_on: Some(vec!["noop1".to_string()]),
862                        timeout_seconds: Some(10),
863                        continue_on_error: None,
864                parameters: serde_json::json!({}),
865                    },
866                ],
867                config: Some(MissionConfig {
868                    max_parallel_steps: Some(1),
869                    timeout_seconds: Some(60),
870                    fail_fast: Some(true),
871                }),
872            };
873
874            let result = DagExecutor::execute_mission(mission).await.unwrap();
875            assert!(matches!(result.status, MissionStatus::Completed));
876            assert_eq!(result.step_results.len(), 2);
877            assert!(result.step_results.contains_key("noop1"));
878            assert!(result.step_results.contains_key("noop2"));
879            // Duration should be tracked (may be 0 for very fast execution)
880            assert!(result.total_duration_ms >= 0);
881        }
882
883        #[tokio::test]
884        async fn test_execute_mission_empty() {
885            /// Test executing an empty mission fails
886            let mission = Mission {
887                version: "1.0".to_string(),
888                name: "Empty Mission".to_string(),
889                description: None,
890                steps: vec![],
891                config: None,
892            };
893
894            let result = DagExecutor::execute_mission(mission).await;
895            assert!(result.is_err());
896            assert!(result.unwrap_err().to_string().contains("Cannot execute empty mission"));
897        }
898
899        #[tokio::test]
900        async fn test_execute_mission_fail_fast() {
901            /// Test fail_fast behavior when a step fails
902            let mission = Mission {
903                version: "1.0".to_string(),
904                name: "Fail Fast Mission".to_string(),
905                description: None,
906                steps: vec![
907                    MissionStep {
908                        id: "failing_step".to_string(),
909                        name: "Failing Step".to_string(),
910                        step_type: StepType::Command,
911                        depends_on: None,
912                        timeout_seconds: Some(5),
913                        continue_on_error: None,
914                parameters: serde_json::json!({
915                            "command": "invalid_command_that_does_not_exist",
916                            "args": []
917                        }),
918                    },
919                    MissionStep {
920                        id: "should_not_run".to_string(),
921                        name: "Should Not Run".to_string(),
922                        step_type: StepType::Noop,
923                        depends_on: Some(vec!["failing_step".to_string()]),
924                        timeout_seconds: Some(5),
925                        continue_on_error: None,
926                parameters: serde_json::json!({}),
927                    },
928                ],
929                config: Some(MissionConfig {
930                    max_parallel_steps: Some(1),
931                    timeout_seconds: Some(30),
932                    fail_fast: Some(true),
933                }),
934            };
935
936            let result = DagExecutor::execute_mission(mission).await;
937            // Should fail fast and return error
938            assert!(result.is_err());
939        }
940    }
941
942    // ========== EXECUTION CONTEXT TESTS ==========
943    mod execution_context_tests {
944        use super::*;
945
946        #[test]
947        fn test_execution_context_creation() {
948            /// Test creating a new ExecutionContext
949            let context = ExecutionContext::new();
950            assert!(context.variables.is_empty());
951            assert!(context.environment.is_empty());
952        }
953
954        #[test]
955        fn test_execution_context_variables() {
956            /// Test setting and getting variables in ExecutionContext
957            let mut context = ExecutionContext::new();
958            
959            context.set_variable("key1", "value1");
960            context.set_variable("key2", "value2");
961            
962            assert_eq!(context.get_variable("key1"), Some(&"value1".to_string()));
963            assert_eq!(context.get_variable("key2"), Some(&"value2".to_string()));
964            assert_eq!(context.get_variable("nonexistent"), None);
965        }
966
967        #[test]
968        fn test_execution_context_variable_override() {
969            /// Test overriding existing variables
970            let mut context = ExecutionContext::new();
971            
972            context.set_variable("key", "original");
973            assert_eq!(context.get_variable("key"), Some(&"original".to_string()));
974            
975            context.set_variable("key", "updated");
976            assert_eq!(context.get_variable("key"), Some(&"updated".to_string()));
977        }
978    }
979
980    // ========== STEP EXECUTION TESTS ==========
981    mod step_execution_tests {
982        use super::*;
983        use tempfile::TempDir;
984
985        #[tokio::test]
986        async fn test_execute_noop_step() {
987            /// Test NOOP step execution
988            let step = MissionStep {
989                id: "noop_test".to_string(),
990                name: "Test Noop".to_string(),
991                step_type: StepType::Noop,
992                depends_on: None,
993                timeout_seconds: None,
994                continue_on_error: None,
995                parameters: serde_json::json!({}),
996            };
997            
998            let mut context = ExecutionContext::new();
999            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1000            
1001            assert_eq!(result.step_id, "noop_test");
1002            assert!(matches!(result.status, StepStatus::Success));
1003            assert!(result.output.get("message").is_some());
1004            assert!(result.error.is_none());
1005        }
1006
1007        #[tokio::test]
1008        async fn test_execute_create_file_step() {
1009            /// Test CreateFile step execution
1010            let temp_dir = TempDir::new().unwrap();
1011            let file_path = temp_dir.path().join("test_file.txt");
1012            
1013            let step = MissionStep {
1014                id: "create_file_test".to_string(),
1015                name: "Test Create File".to_string(),
1016                step_type: StepType::CreateFile,
1017                depends_on: None,
1018                timeout_seconds: None,
1019                continue_on_error: None,
1020                parameters: serde_json::json!({
1021                    "path": file_path.to_str().unwrap(),
1022                    "content": "Hello, World!\nThis is a test file."
1023                }),
1024            };
1025            
1026            let mut context = ExecutionContext::new();
1027            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1028            
1029            assert_eq!(result.step_id, "create_file_test");
1030            assert!(matches!(result.status, StepStatus::Success));
1031            assert!(result.error.is_none());
1032            
1033            // Verify file was created with correct content
1034            let content = std::fs::read_to_string(&file_path).unwrap();
1035            assert_eq!(content, "Hello, World!\nThis is a test file.");
1036        }
1037
1038        #[tokio::test]
1039        async fn test_execute_create_file_missing_path() {
1040            /// Test CreateFile step with missing path parameter
1041            let step = MissionStep {
1042                id: "invalid_create_file".to_string(),
1043                name: "Invalid Create File".to_string(),
1044                step_type: StepType::CreateFile,
1045                depends_on: None,
1046                timeout_seconds: None,
1047                continue_on_error: None,
1048                parameters: serde_json::json!({
1049                    "content": "Some content"
1050                    // Missing "path" parameter
1051                }),
1052            };
1053            
1054            let mut context = ExecutionContext::new();
1055            let result = DagExecutor::execute_step(&step, &mut context).await;
1056            
1057            assert!(result.is_err());
1058            assert!(result.unwrap_err().to_string().contains("Missing 'path' parameter"));
1059        }
1060
1061        #[tokio::test]
1062        async fn test_execute_command_step_success() {
1063            /// Test Command step execution with successful command
1064            let step = MissionStep {
1065                id: "command_test".to_string(),
1066                name: "Test Command".to_string(),
1067                step_type: StepType::Command,
1068                depends_on: None,
1069                timeout_seconds: None,
1070                continue_on_error: None,
1071                parameters: serde_json::json!({
1072                    "command": "echo",
1073                    "args": ["Hello", "World"]
1074                }),
1075            };
1076            
1077            let mut context = ExecutionContext::new();
1078            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1079            
1080            assert!(matches!(result.status, StepStatus::Success));
1081            assert!(result.output["stdout"].as_str().unwrap().contains("Hello World"));
1082            assert_eq!(result.output["exit_code"].as_i64().unwrap(), 0);
1083        }
1084
1085        #[tokio::test]
1086        async fn test_execute_command_step_failure() {
1087            /// Test Command step execution with failing command
1088            let step = MissionStep {
1089                id: "failing_command".to_string(),
1090                name: "Failing Command".to_string(),
1091                step_type: StepType::Command,
1092                depends_on: None,
1093                timeout_seconds: None,
1094                continue_on_error: None,
1095                parameters: serde_json::json!({
1096                    "command": "false", // Command that always fails
1097                    "args": []
1098                }),
1099            };
1100            
1101            let mut context = ExecutionContext::new();
1102            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1103            
1104            assert!(matches!(result.status, StepStatus::Failed));
1105            assert!(result.error.is_some());
1106        }
1107
1108        #[tokio::test]
1109        async fn test_execute_edit_file_step() {
1110            /// Test EditFile step execution
1111            let temp_dir = TempDir::new().unwrap();
1112            let file_path = temp_dir.path().join("edit_test.txt");
1113            
1114            // Create initial file
1115            std::fs::write(&file_path, "Initial content\n").unwrap();
1116            
1117            let step = MissionStep {
1118                id: "edit_file_test".to_string(),
1119                name: "Test Edit File".to_string(),
1120                step_type: StepType::EditFile,
1121                depends_on: None,
1122                timeout_seconds: None,
1123                continue_on_error: None,
1124                parameters: serde_json::json!({
1125                    "path": file_path.to_str().unwrap(),
1126                    "content": "New content",
1127                    "append": false
1128                }),
1129            };
1130            
1131            let mut context = ExecutionContext::new();
1132            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1133            
1134            assert!(matches!(result.status, StepStatus::Success));
1135            assert_eq!(std::fs::read_to_string(&file_path).unwrap(), "New content");
1136        }
1137
1138        #[tokio::test]
1139        async fn test_execute_edit_file_append() {
1140            /// Test EditFile step with append mode
1141            let temp_dir = TempDir::new().unwrap();
1142            let file_path = temp_dir.path().join("append_test.txt");
1143            
1144            // Create initial file
1145            std::fs::write(&file_path, "Initial content\n").unwrap();
1146            
1147            let step = MissionStep {
1148                id: "append_file_test".to_string(),
1149                name: "Test Append File".to_string(),
1150                step_type: StepType::EditFile,
1151                depends_on: None,
1152                timeout_seconds: None,
1153                continue_on_error: None,
1154                parameters: serde_json::json!({
1155                    "path": file_path.to_str().unwrap(),
1156                    "content": "Appended content\n",
1157                    "append": true
1158                }),
1159            };
1160            
1161            let mut context = ExecutionContext::new();
1162            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1163            
1164            assert!(matches!(result.status, StepStatus::Success));
1165            let content = std::fs::read_to_string(&file_path).unwrap();
1166            assert!(content.contains("Initial content"));
1167            assert!(content.contains("Appended content"));
1168        }
1169
1170        #[tokio::test]
1171        async fn test_execute_delete_file_step() {
1172            /// Test DeleteFile step execution
1173            let temp_dir = TempDir::new().unwrap();
1174            let file_path = temp_dir.path().join("delete_test.txt");
1175            
1176            // Create file to delete
1177            std::fs::write(&file_path, "Content to delete").unwrap();
1178            assert!(file_path.exists());
1179            
1180            let step = MissionStep {
1181                id: "delete_file_test".to_string(),
1182                name: "Test Delete File".to_string(),
1183                step_type: StepType::DeleteFile,
1184                depends_on: None,
1185                timeout_seconds: None,
1186                continue_on_error: None,
1187                parameters: serde_json::json!({
1188                    "path": file_path.to_str().unwrap()
1189                }),
1190            };
1191            
1192            let mut context = ExecutionContext::new();
1193            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1194            
1195            assert!(matches!(result.status, StepStatus::Success));
1196            assert!(!file_path.exists());
1197            assert_eq!(result.output["existed"].as_bool().unwrap(), true);
1198            assert_eq!(result.output["deleted"].as_bool().unwrap(), true);
1199        }
1200
1201        #[tokio::test]
1202        async fn test_execute_delete_nonexistent_file() {
1203            /// Test DeleteFile step with nonexistent file
1204            let temp_dir = TempDir::new().unwrap();
1205            let file_path = temp_dir.path().join("nonexistent.txt");
1206            
1207            let step = MissionStep {
1208                id: "delete_nonexistent".to_string(),
1209                name: "Delete Nonexistent File".to_string(),
1210                step_type: StepType::DeleteFile,
1211                depends_on: None,
1212                timeout_seconds: None,
1213                continue_on_error: None,
1214                parameters: serde_json::json!({
1215                    "path": file_path.to_str().unwrap()
1216                }),
1217            };
1218            
1219            let mut context = ExecutionContext::new();
1220            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1221            
1222            assert!(matches!(result.status, StepStatus::Success));
1223            assert_eq!(result.output["existed"].as_bool().unwrap(), false);
1224            assert_eq!(result.output["deleted"].as_bool().unwrap(), false);
1225        }
1226
1227        #[tokio::test]
1228        async fn test_execute_command_with_working_dir() {
1229            /// Test Command step with working directory
1230            let temp_dir = TempDir::new().unwrap();
1231            
1232            // Use echo command which works on all platforms
1233            let step = MissionStep {
1234                id: "echo_command".to_string(),
1235                name: "Echo Command".to_string(),
1236                step_type: StepType::Command,
1237                depends_on: None,
1238                timeout_seconds: None,
1239                continue_on_error: None,
1240                parameters: serde_json::json!({
1241                    "command": "echo",
1242                    "args": ["working_directory_test"],
1243                    "working_dir": temp_dir.path().to_str().unwrap()
1244                }),
1245            };
1246            
1247            let mut context = ExecutionContext::new();
1248            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1249            
1250            assert!(matches!(result.status, StepStatus::Success));
1251            let stdout = result.output["stdout"].as_str().unwrap();
1252            assert!(stdout.contains("working_directory_test"));
1253        }
1254
1255        #[tokio::test]
1256        async fn test_step_result_duration_tracking() {
1257            /// Test that step execution time is properly tracked
1258            let step = MissionStep {
1259                id: "duration_test".to_string(),
1260                name: "Duration Test".to_string(),
1261                step_type: StepType::Noop,
1262                depends_on: None,
1263                timeout_seconds: None,
1264                continue_on_error: None,
1265                parameters: serde_json::json!({}),
1266            };
1267            
1268            let mut context = ExecutionContext::new();
1269            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1270            
1271            // Duration should be tracked (even for NOOP it should be >= 0)
1272            assert!(result.duration_ms >= 0);
1273        }
1274    }
1275
1276    // ========== FEATURE-GATED STEP TESTS ==========
1277    mod feature_gated_tests {
1278        use super::*;
1279
1280        #[cfg(not(feature = "llm"))]
1281        #[tokio::test]
1282        async fn test_http_step_without_llm_feature() {
1283            /// Test HTTP step behavior when llm feature is disabled
1284            let step = MissionStep {
1285                id: "http_disabled".to_string(),
1286                name: "HTTP Disabled Test".to_string(),
1287                step_type: StepType::Http,
1288                depends_on: None,
1289                timeout_seconds: None,
1290                continue_on_error: None,
1291                parameters: serde_json::json!({
1292                    "url": "https://httpbin.org/get",
1293                    "method": "GET"
1294                }),
1295            };
1296            
1297            let mut context = ExecutionContext::new();
1298            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1299            
1300            assert!(matches!(result.status, StepStatus::Skipped));
1301            assert!(result.output["message"].as_str().unwrap().contains("HTTP support requires 'llm' feature"));
1302        }
1303
1304        #[cfg(not(feature = "llm"))]
1305        #[tokio::test]
1306        async fn test_llm_step_without_llm_feature() {
1307            /// Test LLM step behavior when llm feature is disabled
1308            let step = MissionStep {
1309                id: "llm_disabled".to_string(),
1310                name: "LLM Disabled Test".to_string(),
1311                step_type: StepType::Llm,
1312                depends_on: None,
1313                timeout_seconds: None,
1314                continue_on_error: None,
1315                parameters: serde_json::json!({
1316                    "prompt": "Hello, world!"
1317                }),
1318            };
1319            
1320            let mut context = ExecutionContext::new();
1321            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1322            
1323            assert!(matches!(result.status, StepStatus::Failed));
1324            assert_eq!(result.error.as_ref().unwrap(), "LLM feature not enabled");
1325        }
1326
1327        #[cfg(not(feature = "tools"))]
1328        #[tokio::test]
1329        async fn test_tool_step_without_tools_feature() {
1330            /// Test Tool step behavior when tools feature is disabled
1331            let step = MissionStep {
1332                id: "tool_disabled".to_string(),
1333                name: "Tool Disabled Test".to_string(),
1334                step_type: StepType::Tool,
1335                depends_on: None,
1336                timeout_seconds: None,
1337                continue_on_error: None,
1338                parameters: serde_json::json!({
1339                    "tool": "test_tool",
1340                    "parameters": {}
1341                }),
1342            };
1343            
1344            let mut context = ExecutionContext::new();
1345            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1346            
1347            assert!(matches!(result.status, StepStatus::Failed));
1348            assert_eq!(result.error.as_ref().unwrap(), "Tools feature not enabled");
1349        }
1350
1351        #[cfg(not(feature = "rag"))]
1352        #[tokio::test]
1353        async fn test_rag_query_step_without_rag_feature() {
1354            /// Test RAG Query step behavior when rag feature is disabled
1355            let step = MissionStep {
1356                id: "rag_query_disabled".to_string(),
1357                name: "RAG Query Disabled Test".to_string(),
1358                step_type: StepType::RagQuery,
1359                depends_on: None,
1360                timeout_seconds: None,
1361                continue_on_error: None,
1362                parameters: serde_json::json!({
1363                    "query": "test query"
1364                }),
1365            };
1366            
1367            let mut context = ExecutionContext::new();
1368            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1369            
1370            assert!(matches!(result.status, StepStatus::Failed));
1371            assert_eq!(result.error.as_ref().unwrap(), "RAG feature not enabled");
1372        }
1373
1374        #[cfg(not(feature = "rag"))]
1375        #[tokio::test]
1376        async fn test_rag_add_step_without_rag_feature() {
1377            /// Test RAG Add step behavior when rag feature is disabled
1378            let step = MissionStep {
1379                id: "rag_add_disabled".to_string(),
1380                name: "RAG Add Disabled Test".to_string(),
1381                step_type: StepType::RagAdd,
1382                depends_on: None,
1383                timeout_seconds: None,
1384                continue_on_error: None,
1385                parameters: serde_json::json!({
1386                    "id": "doc1",
1387                    "content": "test content"
1388                }),
1389            };
1390            
1391            let mut context = ExecutionContext::new();
1392            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1393            
1394            assert!(matches!(result.status, StepStatus::Failed));
1395            assert_eq!(result.error.as_ref().unwrap(), "RAG feature not enabled");
1396        }
1397
1398        #[cfg(not(feature = "chain"))]
1399        #[tokio::test]
1400        async fn test_chain_step_without_chain_feature() {
1401            /// Test Chain step behavior when chain feature is disabled
1402            let step = MissionStep {
1403                id: "chain_disabled".to_string(),
1404                name: "Chain Disabled Test".to_string(),
1405                step_type: StepType::Chain,
1406                depends_on: None,
1407                timeout_seconds: None,
1408                continue_on_error: None,
1409                parameters: serde_json::json!({
1410                    "type": "sequential",
1411                    "prompt": "test prompt"
1412                }),
1413            };
1414            
1415            let mut context = ExecutionContext::new();
1416            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1417            
1418            assert!(matches!(result.status, StepStatus::Failed));
1419            assert_eq!(result.error.as_ref().unwrap(), "Chain feature not enabled");
1420        }
1421
1422        #[cfg(not(feature = "agent"))]
1423        #[tokio::test]
1424        async fn test_agent_step_without_agent_feature() {
1425            /// Test Agent step behavior when agent feature is disabled
1426            let step = MissionStep {
1427                id: "agent_disabled".to_string(),
1428                name: "Agent Disabled Test".to_string(),
1429                step_type: StepType::Agent,
1430                depends_on: None,
1431                timeout_seconds: None,
1432                continue_on_error: None,
1433                parameters: serde_json::json!({
1434                    "objective": "test objective",
1435                    "name": "test_agent"
1436                }),
1437            };
1438            
1439            let mut context = ExecutionContext::new();
1440            let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1441            
1442            assert!(matches!(result.status, StepStatus::Failed));
1443            assert_eq!(result.error.as_ref().unwrap(), "Agent feature not enabled");
1444        }
1445    }
1446}
1447
1448#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1449pub struct MissionConfig {
1450    pub max_parallel_steps: Option<usize>,
1451    pub timeout_seconds: Option<u64>,
1452    pub fail_fast: Option<bool>,
1453}
1454
1455pub struct MissionLoader;
1456
1457impl MissionLoader {
1458    pub fn load_from_file(path: &str) -> anyhow::Result<Mission> {
1459        if path.is_empty() {
1460            return Err(anyhow::anyhow!("Mission path must not be empty"));
1461        }
1462
1463        let content = std::fs::read_to_string(path)?;
1464
1465        // Try JSON first, then YAML
1466        let mission: Mission = if path.ends_with(".json") {
1467            serde_json::from_str(&content)?
1468        } else {
1469            serde_yaml::from_str(&content)?
1470        };
1471
1472        // Validate mission
1473        Self::validate_mission(&mission)?;
1474
1475        Ok(mission)
1476    }
1477
1478    pub fn validate_mission(mission: &Mission) -> anyhow::Result<()> {
1479        if mission.steps.is_empty() {
1480            return Err(anyhow::anyhow!("Mission must have at least one step"));
1481        }
1482
1483        // Check for duplicate step IDs
1484        let mut seen_ids = std::collections::HashSet::new();
1485        for step in &mission.steps {
1486            if !seen_ids.insert(&step.id) {
1487                return Err(anyhow::anyhow!("Duplicate step ID: {}", step.id));
1488            }
1489        }
1490
1491        // Validate dependencies exist
1492        for step in &mission.steps {
1493            if let Some(deps) = &step.depends_on {
1494                for dep in deps {
1495                    if !mission.steps.iter().any(|s| s.id == *dep) {
1496                        return Err(anyhow::anyhow!(
1497                            "Step {} depends on non-existent step: {}",
1498                            step.id,
1499                            dep
1500                        ));
1501                    }
1502                }
1503            }
1504        }
1505
1506        Ok(())
1507    }
1508}
1509
1510pub struct DagExecutor;
1511
1512impl DagExecutor {
1513    pub async fn execute_mission(mission: Mission) -> anyhow::Result<MissionResult> {
1514        if mission.steps.is_empty() {
1515            return Err(anyhow::anyhow!("Cannot execute empty mission"));
1516        }
1517
1518        let start_time = Instant::now();
1519
1520        // Build dependency graph
1521        let execution_order = Self::topological_sort(&mission.steps)?;
1522
1523        let mut results: HashMap<String, StepResult> = HashMap::new();
1524        let mut completed = std::collections::HashSet::new();
1525        let mut context = ExecutionContext::new();
1526
1527        // Get config values
1528        let fail_fast = mission
1529            .config
1530            .as_ref()
1531            .and_then(|c| c.fail_fast)
1532            .unwrap_or(true);
1533
1534        info!(
1535            "Executing mission '{}' with {} steps",
1536            mission.name,
1537            execution_order.len()
1538        );
1539
1540        for step_id in execution_order {
1541            let step = mission.steps.iter().find(|s| s.id == step_id).unwrap();
1542
1543            debug!("Executing step: {} ({})", step.id, step.name);
1544
1545            // Check if dependencies are complete
1546            if let Some(deps) = &step.depends_on {
1547                for dep in deps {
1548                    if !completed.contains(dep) {
1549                        return Err(anyhow::anyhow!(
1550                            "Dependency {} not completed for step {}",
1551                            dep,
1552                            step.id
1553                        ));
1554                    }
1555
1556                    // Check if dependency failed 
1557                    if let Some(dep_result) = results.get(dep) {
1558                        if matches!(dep_result.status, StepStatus::Failed) {
1559                            // Check step-level continue_on_error flag first, then global fail_fast
1560                            let should_continue = step.continue_on_error.unwrap_or(false);
1561                            if !should_continue && fail_fast {
1562                                warn!("Skipping step {} due to failed dependency {}", step.id, dep);
1563                                results.insert(
1564                                    step_id.clone(),
1565                                    StepResult {
1566                                        step_id: step.id.clone(),
1567                                        status: StepStatus::Skipped,
1568                                        output: serde_json::json!({"reason": "dependency failed"}),
1569                                        error: Some(format!("Dependency {} failed", dep)),
1570                                        duration_ms: 0,
1571                                    },
1572                                );
1573                                continue;
1574                            }
1575                        }
1576                    }
1577                }
1578            }
1579
1580            // Execute step with timeout
1581            let timeout = step
1582                .timeout_seconds
1583                .or(mission.config.as_ref().and_then(|c| c.timeout_seconds))
1584                .unwrap_or(300);
1585
1586            let step_start = Instant::now();
1587
1588            let result = match tokio::time::timeout(
1589                std::time::Duration::from_secs(timeout),
1590                Self::execute_step(step, &mut context),
1591            )
1592            .await
1593            {
1594                Ok(Ok(result)) => result,
1595                Ok(Err(e)) => {
1596                    error!("Step {} failed: {}", step.id, e);
1597                    
1598                    // Check step-level continue_on_error flag first, then global fail_fast
1599                    let should_continue = step.continue_on_error.unwrap_or(false);
1600                    if !should_continue && fail_fast {
1601                        return Err(e);
1602                    }
1603                    StepResult {
1604                        step_id: step.id.clone(),
1605                        status: StepStatus::Failed,
1606                        output: serde_json::json!({"error": e.to_string()}),
1607                        error: Some(e.to_string()),
1608                        duration_ms: step_start.elapsed().as_millis() as u64,
1609                    }
1610                }
1611                Err(_) => {
1612                    error!("Step {} timed out after {} seconds", step.id, timeout);
1613                    if fail_fast {
1614                        return Err(anyhow::anyhow!("Step {} timed out", step.id));
1615                    }
1616                    StepResult {
1617                        step_id: step.id.clone(),
1618                        status: StepStatus::Failed,
1619                        output: serde_json::json!({"error": "timeout"}),
1620                        error: Some(format!("Timed out after {} seconds", timeout)),
1621                        duration_ms: timeout * 1000,
1622                    }
1623                }
1624            };
1625
1626            info!(
1627                "Step {} completed with status: {:?}",
1628                step.id, result.status
1629            );
1630
1631            results.insert(step_id.clone(), result);
1632            completed.insert(step_id);
1633        }
1634
1635        // Determine overall status
1636        let has_failures = results
1637            .values()
1638            .any(|r| matches!(r.status, StepStatus::Failed));
1639        let all_skipped = results
1640            .values()
1641            .all(|r| matches!(r.status, StepStatus::Skipped));
1642
1643        let status = if has_failures {
1644            MissionStatus::Failed
1645        } else if all_skipped {
1646            MissionStatus::Cancelled
1647        } else {
1648            MissionStatus::Completed
1649        };
1650
1651        Ok(MissionResult {
1652            mission_id: Uuid::new_v4(),
1653            status,
1654            step_results: results,
1655            total_duration_ms: start_time.elapsed().as_millis() as u64,
1656        })
1657    }
1658
1659    fn topological_sort(steps: &[MissionStep]) -> anyhow::Result<Vec<String>> {
1660        let mut in_degree = HashMap::new();
1661        let mut graph = HashMap::new();
1662
1663        // Initialize
1664        for step in steps {
1665            in_degree.insert(step.id.clone(), 0);
1666            graph.insert(step.id.clone(), Vec::new());
1667        }
1668
1669        // Build graph
1670        for step in steps {
1671            if let Some(deps) = &step.depends_on {
1672                for dep in deps {
1673                    if let Some(dep_list) = graph.get_mut(dep) {
1674                        dep_list.push(step.id.clone());
1675                    } else {
1676                        return Err(anyhow::anyhow!("Dependency '{}' not found for step '{}'", dep, step.id));
1677                    }
1678                    if let Some(degree) = in_degree.get_mut(&step.id) {
1679                        *degree += 1;
1680                    } else {
1681                        return Err(anyhow::anyhow!("Step '{}' not found in dependency graph", step.id));
1682                    }
1683                }
1684            }
1685        }
1686
1687        // Kahn's algorithm
1688        let mut queue = std::collections::VecDeque::new();
1689        let mut result = Vec::new();
1690
1691        for (node, &degree) in &in_degree {
1692            if degree == 0 {
1693                queue.push_back(node.clone());
1694            }
1695        }
1696
1697        while let Some(node) = queue.pop_front() {
1698            result.push(node.clone());
1699
1700            for neighbor in &graph[&node] {
1701                let degree = in_degree.get_mut(neighbor).unwrap();
1702                *degree -= 1;
1703                if *degree == 0 {
1704                    queue.push_back(neighbor.clone());
1705                }
1706            }
1707        }
1708
1709        if result.len() != steps.len() {
1710            return Err(anyhow::anyhow!("Circular dependency detected"));
1711        }
1712
1713        Ok(result)
1714    }
1715
1716    pub async fn execute_step(
1717        step: &MissionStep,
1718        context: &mut ExecutionContext,
1719    ) -> anyhow::Result<StepResult> {
1720        let start = Instant::now();
1721
1722        let result = match step.step_type {
1723            StepType::Noop => {
1724                debug!("Executing NOOP step");
1725                Ok(StepResult {
1726                    step_id: step.id.clone(),
1727                    status: StepStatus::Success,
1728                    output: serde_json::json!({"message": "No operation performed"}),
1729                    error: None,
1730                    duration_ms: 0,
1731                })
1732            }
1733
1734            StepType::CreateFile => {
1735                let path = step
1736                    .parameters
1737                    .get("path")
1738                    .and_then(|v| v.as_str())
1739                    .ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
1740
1741                // Security: Validate path to prevent path traversal attacks
1742                let sanitized_path = sanitize_file_path(path)?;
1743
1744                let content = step
1745                    .parameters
1746                    .get("content")
1747                    .and_then(|v| v.as_str())
1748                    .unwrap_or("");
1749
1750                // Substitute variables in content
1751                let processed_content = context.substitute_variables(content);
1752
1753                debug!("Creating file: {} (content size: {} -> {})", sanitized_path, content.len(), processed_content.len());
1754
1755                // Create parent directories if needed
1756                if let Some(parent) = std::path::Path::new(&sanitized_path).parent() {
1757                    tokio::fs::create_dir_all(parent).await?;
1758                }
1759
1760                tokio::fs::write(&sanitized_path, &processed_content).await?;
1761
1762                Ok(StepResult {
1763                    step_id: step.id.clone(),
1764                    status: StepStatus::Success,
1765                    output: serde_json::json!({
1766                        "path": path,
1767                        "size": processed_content.len(),
1768                        "created": true,
1769                        "variables_substituted": processed_content != content
1770                    }),
1771                    error: None,
1772                    duration_ms: start.elapsed().as_millis() as u64,
1773                })
1774            }
1775
1776            StepType::EditFile => {
1777                let path = step
1778                    .parameters
1779                    .get("path")
1780                    .and_then(|v| v.as_str())
1781                    .ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
1782
1783                let content = step.parameters.get("content").and_then(|v| v.as_str());
1784
1785                let append = step
1786                    .parameters
1787                    .get("append")
1788                    .and_then(|v| v.as_bool())
1789                    .unwrap_or(false);
1790
1791                debug!("Editing file: {} (append: {})", path, append);
1792
1793                if append && content.is_some() {
1794                    // Append to file
1795                    let mut existing = tokio::fs::read_to_string(path).await.unwrap_or_default();
1796                    existing.push_str(content.unwrap());
1797                    tokio::fs::write(path, existing).await?;
1798                } else if let Some(content) = content {
1799                    // Overwrite file
1800                    tokio::fs::write(path, content).await?;
1801                }
1802
1803                let metadata = tokio::fs::metadata(path).await?;
1804
1805                Ok(StepResult {
1806                    step_id: step.id.clone(),
1807                    status: StepStatus::Success,
1808                    output: serde_json::json!({
1809                        "path": path,
1810                        "size": metadata.len(),
1811                        "modified": true
1812                    }),
1813                    error: None,
1814                    duration_ms: start.elapsed().as_millis() as u64,
1815                })
1816            }
1817
1818            StepType::DeleteFile => {
1819                let path = step
1820                    .parameters
1821                    .get("path")
1822                    .and_then(|v| v.as_str())
1823                    .ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
1824
1825                debug!("Deleting file: {}", path);
1826
1827                let existed = std::path::Path::new(path).exists();
1828                if existed {
1829                    tokio::fs::remove_file(path).await?;
1830                }
1831
1832                Ok(StepResult {
1833                    step_id: step.id.clone(),
1834                    status: StepStatus::Success,
1835                    output: serde_json::json!({
1836                        "path": path,
1837                        "existed": existed,
1838                        "deleted": existed
1839                    }),
1840                    error: None,
1841                    duration_ms: start.elapsed().as_millis() as u64,
1842                })
1843            }
1844
1845            StepType::Command => {
1846                let command = step
1847                    .parameters
1848                    .get("command")
1849                    .and_then(|v| v.as_str())
1850                    .ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
1851
1852                let args = step
1853                    .parameters
1854                    .get("args")
1855                    .and_then(|v| v.as_array())
1856                    .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
1857                    .unwrap_or_default();
1858
1859                // Security: Validate command and arguments
1860                sanitize_command(command, &args)?;
1861
1862                let working_dir = step.parameters.get("working_dir").and_then(|v| v.as_str());
1863
1864                debug!("Executing command: {} {:?}", command, args);
1865
1866                let mut cmd = Command::new(command);
1867                cmd.args(&args);
1868
1869                if let Some(dir) = working_dir {
1870                    cmd.current_dir(dir);
1871                }
1872
1873                // Set environment variables from context
1874                for (key, value) in &context.environment {
1875                    cmd.env(key, value);
1876                }
1877
1878                let output = cmd.output().await?;
1879
1880                let stdout = String::from_utf8_lossy(&output.stdout);
1881                let stderr = String::from_utf8_lossy(&output.stderr);
1882
1883                Ok(StepResult {
1884                    step_id: step.id.clone(),
1885                    status: if output.status.success() {
1886                        StepStatus::Success
1887                    } else {
1888                        StepStatus::Failed
1889                    },
1890                    output: serde_json::json!({
1891                        "command": command,
1892                        "args": args,
1893                        "exit_code": output.status.code(),
1894                        "stdout": stdout,
1895                        "stderr": stderr
1896                    }),
1897                    error: if !output.status.success() {
1898                        Some(format!(
1899                            "Command failed with exit code {:?}",
1900                            output.status.code()
1901                        ))
1902                    } else {
1903                        None
1904                    },
1905                    duration_ms: start.elapsed().as_millis() as u64,
1906                })
1907            }
1908
1909            StepType::Http => {
1910                #[cfg(feature = "llm")]
1911                {
1912                    let url = step
1913                        .parameters
1914                        .get("url")
1915                        .and_then(|v| v.as_str())
1916                        .ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter"))?;
1917
1918                    let method = step
1919                        .parameters
1920                        .get("method")
1921                        .and_then(|v| v.as_str())
1922                        .unwrap_or("GET");
1923
1924                    let headers = step
1925                        .parameters
1926                        .get("headers")
1927                        .and_then(|v| v.as_object())
1928                        .map(|obj| {
1929                            obj.iter()
1930                                .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1931                                .collect::<HashMap<_, _>>()
1932                        });
1933
1934                    let body = step.parameters.get("body");
1935
1936                    debug!("HTTP {} to {}", method, url);
1937
1938                    let client = reqwest::Client::new();
1939                    let mut request = match method.to_uppercase().as_str() {
1940                        "GET" => client.get(url),
1941                        "POST" => client.post(url),
1942                        "PUT" => client.put(url),
1943                        "DELETE" => client.delete(url),
1944                        "PATCH" => client.patch(url),
1945                        _ => return Err(anyhow::anyhow!("Unsupported HTTP method: {}", method)),
1946                    };
1947
1948                    // Add headers
1949                    if let Some(headers) = headers {
1950                        for (key, value) in headers {
1951                            request = request.header(key, value);
1952                        }
1953                    }
1954
1955                    // Add body
1956                    if let Some(body) = body {
1957                        request = request.json(body);
1958                    }
1959
1960                    let response = request.send().await?;
1961                    let status = response.status();
1962                    let status_code = status.as_u16();
1963                    let response_text = response.text().await.unwrap_or_default();
1964
1965                    // Try to parse as JSON, fallback to text
1966                    let response_body = serde_json::from_str::<serde_json::Value>(&response_text)
1967                        .unwrap_or_else(|_| serde_json::json!({"text": response_text}));
1968
1969                    Ok(StepResult {
1970                        step_id: step.id.clone(),
1971                        status: if status.is_success() {
1972                            StepStatus::Success
1973                        } else {
1974                            StepStatus::Failed
1975                        },
1976                        output: serde_json::json!({
1977                            "url": url,
1978                            "method": method,
1979                            "status": status_code,
1980                            "response": response_body
1981                        }),
1982                        error: if !status.is_success() {
1983                            Some(format!("HTTP {} returned {}", method, status))
1984                        } else {
1985                            None
1986                        },
1987                        duration_ms: start.elapsed().as_millis() as u64,
1988                    })
1989                }
1990
1991                #[cfg(not(feature = "llm"))]
1992                {
1993                    Ok(StepResult {
1994                        step_id: step.id.clone(),
1995                        status: StepStatus::Skipped,
1996                        output: serde_json::json!({"message": "HTTP support requires 'llm' feature"}),
1997                        error: None,
1998                        duration_ms: 0,
1999                    })
2000                }
2001            }
2002
2003            StepType::Llm => {
2004                #[cfg(feature = "llm")]
2005                {
2006                    use crate::llm::{
2007                        create_default_llm_manager, ChatMessage, LLMRequest, MessageRole,
2008                    };
2009
2010                    let prompt = step
2011                        .parameters
2012                        .get("prompt")
2013                        .and_then(|v| v.as_str())
2014                        .ok_or_else(|| anyhow::anyhow!("Missing 'prompt' parameter"))?;
2015
2016                    let model = step.parameters.get("model").and_then(|v| v.as_str());
2017
2018                    let provider = step.parameters.get("provider").and_then(|v| v.as_str());
2019
2020                    let temperature = step
2021                        .parameters
2022                        .get("temperature")
2023                        .and_then(|v| v.as_f64())
2024                        .map(|t| t as f32);
2025
2026                    let max_tokens = step
2027                        .parameters
2028                        .get("max_tokens")
2029                        .and_then(|v| v.as_u64())
2030                        .map(|t| t as u32);
2031
2032                    debug!("Calling LLM with prompt: {}", prompt);
2033
2034                    let manager = create_default_llm_manager()?;
2035
2036                    let request = LLMRequest {
2037                        messages: vec![ChatMessage {
2038                            role: MessageRole::User,
2039                            content: prompt.to_string(),
2040                            name: None,
2041                            tool_calls: None,
2042                            tool_call_id: None,
2043                        }],
2044                        model: model.map(String::from),
2045                        temperature,
2046                        max_tokens,
2047                        stream: false,
2048                        tools: None,
2049                        metadata: HashMap::new(),
2050                    };
2051
2052                    let response = manager.complete(request, provider).await?;
2053
2054                    // Store response in context for use by other steps
2055                    context.set_variable(&format!("{}_response", step.id), &response.content);
2056
2057                    Ok(StepResult {
2058                        step_id: step.id.clone(),
2059                        status: StepStatus::Success,
2060                        output: serde_json::json!({
2061                            "model": response.model,
2062                            "content": response.content,
2063                            "usage": {
2064                                "prompt_tokens": response.usage.prompt_tokens,
2065                                "completion_tokens": response.usage.completion_tokens,
2066                                "total_tokens": response.usage.total_tokens
2067                            }
2068                        }),
2069                        error: None,
2070                        duration_ms: start.elapsed().as_millis() as u64,
2071                    })
2072                }
2073
2074                #[cfg(not(feature = "llm"))]
2075                {
2076                    Ok(StepResult {
2077                        step_id: step.id.clone(),
2078                        status: StepStatus::Failed,
2079                        output: serde_json::json!({"error": "LLM feature not enabled"}),
2080                        error: Some("LLM feature not enabled".to_string()),
2081                        duration_ms: 0,
2082                    })
2083                }
2084            }
2085
2086            StepType::Tool => {
2087                #[cfg(feature = "tools")]
2088                {
2089                    use crate::core::RuntimeContext;
2090                    use crate::tools::{create_default_tool_manager, ToolCall};
2091
2092                    let tool_name = step
2093                        .parameters
2094                        .get("tool")
2095                        .and_then(|v| v.as_str())
2096                        .ok_or_else(|| anyhow::anyhow!("Missing 'tool' parameter"))?;
2097
2098                    let tool_params = step
2099                        .parameters
2100                        .get("parameters")
2101                        .cloned()
2102                        .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
2103
2104                    debug!("Executing tool: {}", tool_name);
2105
2106                    let tool_manager = create_default_tool_manager();
2107                    let context = RuntimeContext::new();
2108
2109                    let call = ToolCall::new(
2110                        tool_name.to_string(),
2111                        tool_params,
2112                    );
2113
2114                    let result = tool_manager.execute_tool(call, &context).await?;
2115
2116                    Ok(StepResult {
2117                        step_id: step.id.clone(),
2118                        status: if result.success {
2119                            StepStatus::Success
2120                        } else {
2121                            StepStatus::Failed
2122                        },
2123                        output: result.output,
2124                        error: result.error,
2125                        duration_ms: result.execution_time_ms,
2126                    })
2127                }
2128
2129                #[cfg(not(feature = "tools"))]
2130                {
2131                    Ok(StepResult {
2132                        step_id: step.id.clone(),
2133                        status: StepStatus::Failed,
2134                        output: serde_json::json!({"error": "Tools feature not enabled"}),
2135                        error: Some("Tools feature not enabled".to_string()),
2136                        duration_ms: 0,
2137                    })
2138                }
2139            }
2140
2141            StepType::RagQuery => {
2142                #[cfg(feature = "rag")]
2143                {
2144                    use crate::rag::create_default_rag_system;
2145
2146                    let query = step
2147                        .parameters
2148                        .get("query")
2149                        .and_then(|v| v.as_str())
2150                        .ok_or_else(|| anyhow::anyhow!("Missing 'query' parameter"))?;
2151
2152                    let limit = step
2153                        .parameters
2154                        .get("limit")
2155                        .and_then(|v| v.as_u64())
2156                        .map(|l| l as usize);
2157
2158                    let threshold = step
2159                        .parameters
2160                        .get("threshold")
2161                        .and_then(|v| v.as_f64())
2162                        .map(|t| t as f32);
2163
2164                    debug!("Querying RAG system: {}", query);
2165
2166                    let rag_system = create_default_rag_system()?;
2167                    let results = rag_system.search(query, limit, threshold).await?;
2168
2169                    // Store context in execution context
2170                    if !results.results.is_empty() {
2171                        let context_text = results
2172                            .results
2173                            .iter()
2174                            .map(|r| r.chunk.content.clone())
2175                            .collect::<Vec<_>>()
2176                            .join("\n\n");
2177                        context.set_variable(&format!("{}_context", step.id), &context_text);
2178                    }
2179
2180                    Ok(StepResult {
2181                        step_id: step.id.clone(),
2182                        status: StepStatus::Success,
2183                        output: serde_json::json!({
2184                            "query": query,
2185                            "results_count": results.results.len(),
2186                            "results": results.results.iter().map(|r| serde_json::json!({
2187                                "document_id": r.document_id,
2188                                "chunk_id": r.chunk.id,
2189                                "score": r.similarity_score,
2190                                "content_preview": &r.chunk.content[..r.chunk.content.len().min(200)]
2191                            })).collect::<Vec<_>>(),
2192                            "processing_time_ms": results.processing_time_ms
2193                        }),
2194                        error: None,
2195                        duration_ms: start.elapsed().as_millis() as u64,
2196                    })
2197                }
2198
2199                #[cfg(not(feature = "rag"))]
2200                {
2201                    Ok(StepResult {
2202                        step_id: step.id.clone(),
2203                        status: StepStatus::Failed,
2204                        output: serde_json::json!({"error": "RAG feature not enabled"}),
2205                        error: Some("RAG feature not enabled".to_string()),
2206                        duration_ms: 0,
2207                    })
2208                }
2209            }
2210
2211            StepType::RagAdd => {
2212                #[cfg(feature = "rag")]
2213                {
2214                    use crate::rag::create_default_rag_system;
2215
2216                    let document_id = step
2217                        .parameters
2218                        .get("id")
2219                        .and_then(|v| v.as_str())
2220                        .ok_or_else(|| anyhow::anyhow!("Missing 'id' parameter"))?;
2221
2222                    let content = step
2223                        .parameters
2224                        .get("content")
2225                        .and_then(|v| v.as_str())
2226                        .ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2227
2228                    let metadata = step
2229                        .parameters
2230                        .get("metadata")
2231                        .and_then(|v| v.as_object())
2232                        .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
2233                        .unwrap_or_default();
2234
2235                    debug!("Adding document to RAG: {}", document_id);
2236
2237                    let mut rag_system = create_default_rag_system()?;
2238                    let doc_id = rag_system
2239                        .add_document(document_id.to_string(), content.to_string(), metadata)
2240                        .await?;
2241
2242                    Ok(StepResult {
2243                        step_id: step.id.clone(),
2244                        status: StepStatus::Success,
2245                        output: serde_json::json!({
2246                            "document_id": doc_id,
2247                            "content_length": content.len(),
2248                            "added": true
2249                        }),
2250                        error: None,
2251                        duration_ms: start.elapsed().as_millis() as u64,
2252                    })
2253                }
2254
2255                #[cfg(not(feature = "rag"))]
2256                {
2257                    Ok(StepResult {
2258                        step_id: step.id.clone(),
2259                        status: StepStatus::Failed,
2260                        output: serde_json::json!({"error": "RAG feature not enabled"}),
2261                        error: Some("RAG feature not enabled".to_string()),
2262                        duration_ms: 0,
2263                    })
2264                }
2265            }
2266
2267            StepType::Chain => {
2268                #[cfg(feature = "chain")]
2269                {
2270                    use crate::core::chain::{ChainContext, SequentialChain};
2271                    #[cfg(feature = "llm")]
2272                    use crate::llm::create_default_llm_manager;
2273
2274                    let chain_type = step
2275                        .parameters
2276                        .get("type")
2277                        .and_then(|v| v.as_str())
2278                        .unwrap_or("sequential");
2279
2280                    // Check for new nested steps format
2281                    if let Some(steps_value) = step.parameters.get("steps") {
2282                        // New nested chain format
2283                        use crate::engine::chain_executor::{ChainExecutor, ChainSubStep};
2284                        
2285                        let sub_steps: Vec<ChainSubStep> = serde_json::from_value(steps_value.clone())
2286                            .map_err(|e| anyhow::anyhow!("Invalid chain steps format: {}", e))?;
2287                        
2288                        let executor = ChainExecutor::new(format!("chain_{}", step.id));
2289                        match executor.execute_chain_steps(&sub_steps, context).await {
2290                            Ok(result) => {
2291                                context.set_variable(&format!("{}_result", step.id), &result);
2292                                
2293                                return Ok(StepResult {
2294                                    step_id: step.id.clone(),
2295                                    status: StepStatus::Success,
2296                                    output: serde_json::json!({"type": "chain", "result": result}),
2297                                    error: None,
2298                                    duration_ms: start.elapsed().as_millis() as u64,
2299                                });
2300                            }
2301                            Err(e) => {
2302                                return Ok(StepResult {
2303                                    step_id: step.id.clone(),
2304                                    status: StepStatus::Failed,
2305                                    output: serde_json::json!({"error": e.to_string()}),
2306                                    error: Some(e.to_string()),
2307                                    duration_ms: start.elapsed().as_millis() as u64,
2308                                });
2309                            }
2310                        }
2311                    }
2312                    
2313                    // Legacy simple chain format
2314                    let prompt = step
2315                        .parameters
2316                        .get("prompt")
2317                        .and_then(|v| v.as_str())
2318                        .ok_or_else(|| anyhow::anyhow!("Chain step requires either 'steps' array or 'prompt' parameter"))?;
2319
2320                    debug!("Executing chain: {}", chain_type);
2321
2322                    // Create a simple chain with an LLM step
2323                    let mut chain = SequentialChain::new(format!("chain_{}", step.id));
2324
2325                    let _manager = create_default_llm_manager()?;
2326                    let llm_chain = crate::core::chain::LLMChain::new(
2327                        "llm_step".to_string(),
2328                        prompt.to_string(),
2329                    );
2330                    chain.add(Box::new(llm_chain));
2331
2332                    let mut chain_context = ChainContext::new();
2333
2334                    // Copy parameters to chain context
2335                    for (key, value) in step
2336                        .parameters
2337                        .as_object()
2338                        .unwrap_or(&serde_json::Map::new())
2339                    {
2340                        if let Some(v) = value.as_str() {
2341                            chain_context.set(key, v);
2342                        }
2343                    }
2344
2345                    match chain.run(&mut chain_context).await {
2346                        Ok(_) => {
2347                            // Store chain results in execution context
2348                            if let Some(result) = chain_context.get("result") {
2349                                context.set_variable(&format!("{}_result", step.id), &result);
2350                            }
2351
2352                            Ok(StepResult {
2353                                step_id: step.id.clone(),
2354                                status: StepStatus::Success,
2355                                output: serde_json::json!({
2356                                    "chain_type": chain_type,
2357                                    "variables": chain_context.vars,
2358                                    "events": chain_context.get_history().len()
2359                                }),
2360                                error: None,
2361                                duration_ms: start.elapsed().as_millis() as u64,
2362                            })
2363                        }
2364                        Err(e) => Ok(StepResult {
2365                            step_id: step.id.clone(),
2366                            status: StepStatus::Failed,
2367                            output: serde_json::json!({"error": e.to_string()}),
2368                            error: Some(e.to_string()),
2369                            duration_ms: start.elapsed().as_millis() as u64,
2370                        }),
2371                    }
2372                }
2373
2374                #[cfg(not(feature = "chain"))]
2375                {
2376                    Ok(StepResult {
2377                        step_id: step.id.clone(),
2378                        status: StepStatus::Failed,
2379                        output: serde_json::json!({"error": "Chain feature not enabled"}),
2380                        error: Some("Chain feature not enabled".to_string()),
2381                        duration_ms: 0,
2382                    })
2383                }
2384            }
2385
2386            StepType::Agent => {
2387                #[cfg(feature = "agent")]
2388                {
2389                    use crate::core::memory::InMemoryStore;
2390                    #[cfg(feature = "llm")]
2391                    use crate::llm::create_default_llm_manager;
2392                    #[cfg(feature = "tools")]
2393                    use crate::tools::create_default_tool_manager;
2394
2395                    let objective = step
2396                        .parameters
2397                        .get("objective")
2398                        .and_then(|v| v.as_str())
2399                        .ok_or_else(|| {
2400                            anyhow::anyhow!("Missing 'objective' parameter for agent")
2401                        })?;
2402
2403                    let agent_name = step
2404                        .parameters
2405                        .get("name")
2406                        .and_then(|v| v.as_str())
2407                        .unwrap_or(&step.id);
2408
2409                    debug!(
2410                        "Creating agent '{}' with objective: {}",
2411                        agent_name, objective
2412                    );
2413
2414                    // Create agent components
2415                    let _memory = InMemoryStore::new();
2416                    #[cfg(feature = "tools")]
2417                    let _tool_manager = create_default_tool_manager();
2418                    
2419                    #[cfg(feature = "llm")]
2420                    {
2421                        let _llm_manager = create_default_llm_manager()?;
2422
2423                        // Create agent (this is a simplified version - real implementation would need proper lifetime management)
2424                        // For now, we'll simulate agent execution with LLM
2425                        let agent_prompt = format!(
2426                            "You are an autonomous agent named '{}'. Your objective is: {}\n\nPlease think through this step by step and provide a final answer.",
2427                            agent_name, objective
2428                        );
2429
2430                        let manager = create_default_llm_manager()?;
2431                        let request = crate::llm::LLMRequest {
2432                            messages: vec![crate::llm::ChatMessage {
2433                                role: crate::llm::MessageRole::User,
2434                                content: agent_prompt,
2435                                name: None,
2436                                tool_calls: None,
2437                                tool_call_id: None,
2438                            }],
2439                            model: None,
2440                            temperature: Some(0.7),
2441                            max_tokens: Some(1000),
2442                            stream: false,
2443                            tools: None,
2444                            metadata: std::collections::HashMap::new(),
2445                        };
2446
2447                        match manager.complete(request, None).await {
2448                            Ok(response) => {
2449                                // Store agent response in context
2450                                context
2451                                    .set_variable(&format!("{}_response", step.id), &response.content);
2452
2453                                Ok(StepResult {
2454                                    step_id: step.id.clone(),
2455                                    status: StepStatus::Success,
2456                                    output: serde_json::json!({
2457                                        "agent_name": agent_name,
2458                                        "objective": objective,
2459                                        "response": response.content,
2460                                        "model": response.model,
2461                                        "usage": {
2462                                            "prompt_tokens": response.usage.prompt_tokens,
2463                                            "completion_tokens": response.usage.completion_tokens,
2464                                            "total_tokens": response.usage.total_tokens
2465                                        }
2466                                    }),
2467                                    error: None,
2468                                    duration_ms: start.elapsed().as_millis() as u64,
2469                                })
2470                            }
2471                            Err(e) => Ok(StepResult {
2472                                step_id: step.id.clone(),
2473                                status: StepStatus::Failed,
2474                                output: serde_json::json!({"error": e.to_string()}),
2475                                error: Some(e.to_string()),
2476                                duration_ms: start.elapsed().as_millis() as u64,
2477                            }),
2478                        }
2479                    }
2480                    
2481                    #[cfg(not(feature = "llm"))]
2482                    {
2483                        // Fallback when LLM feature is not available
2484                        Ok(StepResult {
2485                            step_id: step.id.clone(),
2486                            status: StepStatus::Success,
2487                            output: serde_json::json!({
2488                                "agent_name": agent_name,
2489                                "objective": objective,
2490                                "response": format!("Agent {} would work on: {}", agent_name, objective),
2491                                "note": "LLM feature not enabled - this is a simulation"
2492                            }),
2493                            error: None,
2494                            duration_ms: start.elapsed().as_millis() as u64,
2495                        })
2496                    }
2497                }
2498
2499                #[cfg(not(feature = "agent"))]
2500                {
2501                    Ok(StepResult {
2502                        step_id: step.id.clone(),
2503                        status: StepStatus::Failed,
2504                        output: serde_json::json!({"error": "Agent feature not enabled"}),
2505                        error: Some("Agent feature not enabled".to_string()),
2506                        duration_ms: 0,
2507                    })
2508                }
2509            }
2510
2511            // File Operations
2512            StepType::CopyFile => {
2513                let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
2514                let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
2515                
2516                tokio::fs::copy(source, destination).await?;
2517                let size = tokio::fs::metadata(destination).await?.len();
2518                
2519                Ok(StepResult {
2520                    step_id: step.id.clone(),
2521                    status: StepStatus::Success,
2522                    output: serde_json::json!({"source": source, "destination": destination, "size": size}),
2523                    error: None,
2524                    duration_ms: start.elapsed().as_millis() as u64,
2525                })
2526            }
2527
2528            StepType::MoveFile => {
2529                let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
2530                let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
2531                
2532                tokio::fs::rename(source, destination).await?;
2533                
2534                Ok(StepResult {
2535                    step_id: step.id.clone(),
2536                    status: StepStatus::Success,
2537                    output: serde_json::json!({"source": source, "destination": destination, "moved": true}),
2538                    error: None,
2539                    duration_ms: start.elapsed().as_millis() as u64,
2540                })
2541            }
2542
2543            StepType::ReadFile => {
2544                let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2545                
2546                let content = tokio::fs::read_to_string(path).await?;
2547                let size = content.len();
2548                
2549                Ok(StepResult {
2550                    step_id: step.id.clone(),
2551                    status: StepStatus::Success,
2552                    output: serde_json::json!({"path": path, "content": content, "size": size}),
2553                    error: None,
2554                    duration_ms: start.elapsed().as_millis() as u64,
2555                })
2556            }
2557
2558            StepType::ListDirectory => {
2559                let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2560                
2561                let mut entries = tokio::fs::read_dir(path).await?;
2562                let mut files = Vec::new();
2563                
2564                while let Some(entry) = entries.next_entry().await? {
2565                    let metadata = entry.metadata().await?;
2566                    files.push(serde_json::json!({
2567                        "name": entry.file_name().to_string_lossy(),
2568                        "path": entry.path().to_string_lossy(),
2569                        "is_dir": metadata.is_dir(),
2570                        "size": metadata.len()
2571                    }));
2572                }
2573                
2574                Ok(StepResult {
2575                    step_id: step.id.clone(),
2576                    status: StepStatus::Success,
2577                    output: serde_json::json!({"path": path, "entries": files, "count": files.len()}),
2578                    error: None,
2579                    duration_ms: start.elapsed().as_millis() as u64,
2580                })
2581            }
2582
2583            StepType::FileSearch => {
2584                let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2585                let pattern = step.parameters.get("pattern").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'pattern' parameter"))?;
2586                
2587                let mut results = Vec::new();
2588                let mut entries = tokio::fs::read_dir(path).await?;
2589                
2590                while let Some(entry) = entries.next_entry().await? {
2591                    let name = entry.file_name().to_string_lossy().to_string();
2592                    if name.contains(pattern) {
2593                        results.push(serde_json::json!({
2594                            "name": name,
2595                            "path": entry.path().to_string_lossy()
2596                        }));
2597                    }
2598                }
2599                
2600                Ok(StepResult {
2601                    step_id: step.id.clone(),
2602                    status: StepStatus::Success,
2603                    output: serde_json::json!({"pattern": pattern, "results": results, "matches": results.len()}),
2604                    error: None,
2605                    duration_ms: start.elapsed().as_millis() as u64,
2606                })
2607            }
2608
2609            // Data Processing Operations  
2610            StepType::ParseJson => {
2611                let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2612                
2613                let parsed: serde_json::Value = serde_json::from_str(content)?;
2614                
2615                Ok(StepResult {
2616                    step_id: step.id.clone(),
2617                    status: StepStatus::Success,
2618                    output: serde_json::json!({"parsed": parsed, "valid": true}),
2619                    error: None,
2620                    duration_ms: start.elapsed().as_millis() as u64,
2621                })
2622            }
2623
2624            StepType::ParseYaml => {
2625                let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2626                
2627                let parsed: serde_yaml::Value = serde_yaml::from_str(content)?;
2628                let json_value = serde_json::to_value(parsed)?;
2629                
2630                Ok(StepResult {
2631                    step_id: step.id.clone(),
2632                    status: StepStatus::Success,
2633                    output: serde_json::json!({"parsed": json_value, "valid": true}),
2634                    error: None,
2635                    duration_ms: start.elapsed().as_millis() as u64,
2636                })
2637            }
2638
2639            StepType::ParseXml => {
2640                let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2641                
2642                use xml::reader::{EventReader, XmlEvent};
2643                let parser = EventReader::from_str(content);
2644                let mut elements = Vec::new();
2645                let mut current_element = String::new();
2646                
2647                for event in parser {
2648                    match event? {
2649                        XmlEvent::StartElement { name, .. } => {
2650                            current_element = name.local_name;
2651                        },
2652                        XmlEvent::Characters(text) => {
2653                            if !current_element.is_empty() {
2654                                elements.push(serde_json::json!({
2655                                    "element": current_element.clone(),
2656                                    "content": text
2657                                }));
2658                            }
2659                        },
2660                        _ => {}
2661                    }
2662                }
2663                
2664                Ok(StepResult {
2665                    step_id: step.id.clone(),
2666                    status: StepStatus::Success,
2667                    output: serde_json::json!({"elements": elements, "element_count": elements.len(), "valid": true}),
2668                    error: None,
2669                    duration_ms: start.elapsed().as_millis() as u64,
2670                })
2671            }
2672
2673            StepType::ValidateSchema => {
2674                let _data = step.parameters.get("data").ok_or_else(|| anyhow::anyhow!("Missing 'data' parameter"))?;
2675                let _schema = step.parameters.get("schema").ok_or_else(|| anyhow::anyhow!("Missing 'schema' parameter"))?;
2676                
2677                // Basic validation - validate that data is valid JSON
2678                let data_str = _data.as_str().ok_or_else(|| anyhow::anyhow!("Data must be string"))?;
2679                let _parsed: serde_json::Value = serde_json::from_str(data_str)?;
2680                
2681                Ok(StepResult {
2682                    step_id: step.id.clone(),
2683                    status: StepStatus::Success,
2684                    output: serde_json::json!({"valid": true, "validated": "json_syntax", "note": "Full JSON schema validation requires jsonschema crate"}),
2685                    error: None,
2686                    duration_ms: start.elapsed().as_millis() as u64,
2687                })
2688            }
2689
2690            StepType::CsvProcess => {
2691                let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2692                
2693                let mut reader = csv::Reader::from_reader(content.as_bytes());
2694                let headers: Vec<String> = reader.headers()?.iter().map(|h| h.to_string()).collect();
2695                let mut records = Vec::new();
2696                
2697                for result in reader.records() {
2698                    let record = result?;
2699                    let row: Vec<String> = record.iter().map(|field| field.to_string()).collect();
2700                    records.push(row);
2701                }
2702                
2703                Ok(StepResult {
2704                    step_id: step.id.clone(),
2705                    status: StepStatus::Success,
2706                    output: serde_json::json!({"headers": headers, "records": records, "row_count": records.len()}),
2707                    error: None,
2708                    duration_ms: start.elapsed().as_millis() as u64,
2709                })
2710            }
2711
2712            // Code Development Operations
2713            StepType::CompileCode => {
2714                let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
2715                let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2716                
2717                let output = match language {
2718                    "rust" => Command::new("cargo").args(&["check"]).current_dir(path).output().await?,
2719                    "go" => Command::new("go").args(&["build", "."]).current_dir(path).output().await?,
2720                    "node" => Command::new("npm").args(&["run", "build"]).current_dir(path).output().await?,
2721                    _ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
2722                };
2723                
2724                let success = output.status.success();
2725                let stdout = String::from_utf8_lossy(&output.stdout);
2726                let stderr = String::from_utf8_lossy(&output.stderr);
2727                
2728                Ok(StepResult {
2729                    step_id: step.id.clone(),
2730                    status: if success { StepStatus::Success } else { StepStatus::Failed },
2731                    output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
2732                    error: if success { None } else { Some(format!("Compilation failed: {}", stderr)) },
2733                    duration_ms: start.elapsed().as_millis() as u64,
2734                })
2735            }
2736
2737            StepType::RunTests => {
2738                let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
2739                let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2740                
2741                let output = match language {
2742                    "rust" => Command::new("cargo").args(&["test"]).current_dir(path).output().await?,
2743                    "go" => Command::new("go").args(&["test", "./..."]).current_dir(path).output().await?,
2744                    "node" => Command::new("npm").args(&["test"]).current_dir(path).output().await?,
2745                    _ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
2746                };
2747                
2748                let success = output.status.success();
2749                let stdout = String::from_utf8_lossy(&output.stdout);
2750                let stderr = String::from_utf8_lossy(&output.stderr);
2751                
2752                Ok(StepResult {
2753                    step_id: step.id.clone(),
2754                    status: if success { StepStatus::Success } else { StepStatus::Failed },
2755                    output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
2756                    error: if success { None } else { Some(format!("Tests failed: {}", stderr)) },
2757                    duration_ms: start.elapsed().as_millis() as u64,
2758                })
2759            }
2760
2761            StepType::FormatCode => {
2762                let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
2763                let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2764                
2765                let output = match language {
2766                    "rust" => Command::new("cargo").args(&["fmt"]).current_dir(path).output().await?,
2767                    "go" => Command::new("gofmt").args(&["-w", "."]).current_dir(path).output().await?,
2768                    "node" => Command::new("npx").args(&["prettier", "--write", "."]).current_dir(path).output().await?,
2769                    _ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
2770                };
2771                
2772                let success = output.status.success();
2773                
2774                Ok(StepResult {
2775                    step_id: step.id.clone(),
2776                    status: if success { StepStatus::Success } else { StepStatus::Failed },
2777                    output: serde_json::json!({"formatted": success, "language": language}),
2778                    error: if success { None } else { Some("Formatting failed".to_string()) },
2779                    duration_ms: start.elapsed().as_millis() as u64,
2780                })
2781            }
2782
2783            StepType::LintCode => {
2784                let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
2785                let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2786                
2787                let output = match language {
2788                    "rust" => Command::new("cargo").args(&["clippy"]).current_dir(path).output().await?,
2789                    "go" => Command::new("golint").args(&["./..."]).current_dir(path).output().await?,
2790                    "node" => Command::new("npx").args(&["eslint", "."]).current_dir(path).output().await?,
2791                    _ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
2792                };
2793                
2794                let success = output.status.success();
2795                let stdout = String::from_utf8_lossy(&output.stdout);
2796                let stderr = String::from_utf8_lossy(&output.stderr);
2797                
2798                Ok(StepResult {
2799                    step_id: step.id.clone(),
2800                    status: if success { StepStatus::Success } else { StepStatus::Failed },
2801                    output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
2802                    error: None,
2803                    duration_ms: start.elapsed().as_millis() as u64,
2804                })
2805            }
2806
2807            StepType::ExtractFunctions | StepType::GenerateDocs => {
2808                Ok(StepResult {
2809                    step_id: step.id.clone(),
2810                    status: StepStatus::Success,
2811                    output: serde_json::json!({"note": "Implementation pending"}),
2812                    error: None,
2813                    duration_ms: start.elapsed().as_millis() as u64,
2814                })
2815            }
2816
2817            // Git Operations
2818            StepType::GitCommit => {
2819                let message = step.parameters.get("message").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'message' parameter"))?;
2820                let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2821                
2822                let _add_output = Command::new("git").args(&["add", "."]).current_dir(path).output().await?;
2823                let commit_output = Command::new("git").args(&["commit", "-m", message]).current_dir(path).output().await?;
2824                
2825                let success = commit_output.status.success();
2826                
2827                Ok(StepResult {
2828                    step_id: step.id.clone(),
2829                    status: if success { StepStatus::Success } else { StepStatus::Failed },
2830                    output: serde_json::json!({"committed": success, "message": message}),
2831                    error: if success { None } else { Some("Git commit failed".to_string()) },
2832                    duration_ms: start.elapsed().as_millis() as u64,
2833                })
2834            }
2835
2836            StepType::GitBranch => {
2837                let branch_name = step.parameters.get("branch").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'branch' parameter"))?;
2838                let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2839                
2840                let output = Command::new("git").args(&["checkout", "-b", branch_name]).current_dir(path).output().await?;
2841                let success = output.status.success();
2842                
2843                Ok(StepResult {
2844                    step_id: step.id.clone(),
2845                    status: if success { StepStatus::Success } else { StepStatus::Failed },
2846                    output: serde_json::json!({"branch": branch_name, "created": success}),
2847                    error: if success { None } else { Some("Branch creation failed".to_string()) },
2848                    duration_ms: start.elapsed().as_millis() as u64,
2849                })
2850            }
2851
2852            StepType::GitMerge => {
2853                let branch = step.parameters.get("branch").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'branch' parameter"))?;
2854                let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2855                
2856                let output = Command::new("git").args(&["merge", branch]).current_dir(path).output().await?;
2857                let success = output.status.success();
2858                
2859                Ok(StepResult {
2860                    step_id: step.id.clone(),
2861                    status: if success { StepStatus::Success } else { StepStatus::Failed },
2862                    output: serde_json::json!({"merged_branch": branch, "success": success}),
2863                    error: if success { None } else { Some("Merge failed".to_string()) },
2864                    duration_ms: start.elapsed().as_millis() as u64,
2865                })
2866            }
2867
2868            StepType::GitStatus => {
2869                let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2870                
2871                let output = Command::new("git").args(&["status", "--porcelain"]).current_dir(path).output().await?;
2872                let stdout = String::from_utf8_lossy(&output.stdout);
2873                
2874                Ok(StepResult {
2875                    step_id: step.id.clone(),
2876                    status: StepStatus::Success,
2877                    output: serde_json::json!({"status": stdout, "clean": stdout.trim().is_empty()}),
2878                    error: None,
2879                    duration_ms: start.elapsed().as_millis() as u64,
2880                })
2881            }
2882
2883            StepType::GitDiff => {
2884                let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2885                
2886                let output = Command::new("git").args(&["diff"]).current_dir(path).output().await?;
2887                let stdout = String::from_utf8_lossy(&output.stdout);
2888                
2889                Ok(StepResult {
2890                    step_id: step.id.clone(),
2891                    status: StepStatus::Success,
2892                    output: serde_json::json!({"diff": stdout, "has_changes": !stdout.trim().is_empty()}),
2893                    error: None,
2894                    duration_ms: start.elapsed().as_millis() as u64,
2895                })
2896            }
2897
2898            // System Operations
2899            StepType::ProcessStart => {
2900                let command = step.parameters.get("command").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
2901                let empty_args = Vec::new();
2902                let args = step.parameters.get("args").and_then(|v| v.as_array()).unwrap_or(&empty_args);
2903                let working_dir = step.parameters.get("working_dir").and_then(|v| v.as_str()).unwrap_or(".");
2904                
2905                let mut cmd = Command::new(command);
2906                for arg in args {
2907                    if let Some(arg_str) = arg.as_str() {
2908                        cmd.arg(arg_str);
2909                    }
2910                }
2911                cmd.current_dir(working_dir);
2912                
2913                let child = cmd.spawn()?;
2914                let pid = child.id().unwrap_or(0);
2915                
2916                Ok(StepResult {
2917                    step_id: step.id.clone(),
2918                    status: StepStatus::Success,
2919                    output: serde_json::json!({"command": command, "pid": pid, "started": true}),
2920                    error: None,
2921                    duration_ms: start.elapsed().as_millis() as u64,
2922                })
2923            }
2924
2925            StepType::ProcessKill => {
2926                let pid = step.parameters.get("pid").and_then(|v| v.as_u64()).ok_or_else(|| anyhow::anyhow!("Missing 'pid' parameter"))?;
2927                
2928                #[cfg(unix)]
2929                {
2930                    use std::process::Command;
2931                    let output = Command::new("kill").args(&["-9", &pid.to_string()]).output()?;
2932                    let success = output.status.success();
2933                    
2934                    Ok(StepResult {
2935                        step_id: step.id.clone(),
2936                        status: if success { StepStatus::Success } else { StepStatus::Failed },
2937                        output: serde_json::json!({"pid": pid, "killed": success}),
2938                        error: if success { None } else { Some("Failed to kill process".to_string()) },
2939                        duration_ms: start.elapsed().as_millis() as u64,
2940                    })
2941                }
2942                
2943                #[cfg(windows)]
2944                {
2945                    use std::process::Command;
2946                    let output = Command::new("taskkill").args(&["/F", "/PID", &pid.to_string()]).output()?;
2947                    let success = output.status.success();
2948                    
2949                    Ok(StepResult {
2950                        step_id: step.id.clone(),
2951                        status: if success { StepStatus::Success } else { StepStatus::Failed },
2952                        output: serde_json::json!({"pid": pid, "killed": success}),
2953                        error: if success { None } else { Some("Failed to kill process".to_string()) },
2954                        duration_ms: start.elapsed().as_millis() as u64,
2955                    })
2956                }
2957            }
2958
2959            StepType::MonitorResources => {
2960                use sysinfo::System;
2961                let mut sys = System::new_all();
2962                sys.refresh_all();
2963                
2964                let cpu_usage = sys.global_cpu_info().cpu_usage();
2965                let memory_total = sys.total_memory();
2966                let memory_used = sys.used_memory();
2967                let memory_free = sys.free_memory();
2968                
2969                Ok(StepResult {
2970                    step_id: step.id.clone(),
2971                    status: StepStatus::Success,
2972                    output: serde_json::json!({
2973                        "cpu_usage_percent": cpu_usage,
2974                        "memory": {
2975                            "total_bytes": memory_total,
2976                            "used_bytes": memory_used,
2977                            "free_bytes": memory_free,
2978                            "usage_percent": (memory_used as f64 / memory_total as f64) * 100.0
2979                        }
2980                    }),
2981                    error: None,
2982                    duration_ms: start.elapsed().as_millis() as u64,
2983                })
2984            }
2985
2986            StepType::ServiceHealth => {
2987                let service_name = step.parameters.get("service").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'service' parameter"))?;
2988                let port = step.parameters.get("port").and_then(|v| v.as_u64()).unwrap_or(80);
2989                
2990                // Basic health check - attempt TCP connection
2991                use std::net::TcpStream;
2992                use std::time::Duration;
2993                
2994                let addr = format!("localhost:{}", port);
2995                let health = TcpStream::connect_timeout(
2996                    &addr.parse().map_err(|_| anyhow::anyhow!("Invalid address"))?,
2997                    Duration::from_secs(5)
2998                ).is_ok();
2999                
3000                Ok(StepResult {
3001                    step_id: step.id.clone(),
3002                    status: StepStatus::Success,
3003                    output: serde_json::json!({"service": service_name, "port": port, "healthy": health}),
3004                    error: None,
3005                    duration_ms: start.elapsed().as_millis() as u64,
3006                })
3007            }
3008
3009            StepType::Compress => {
3010                let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
3011                let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
3012                let format = step.parameters.get("format").and_then(|v| v.as_str()).unwrap_or("tar");
3013                
3014                match format {
3015                    "tar" => {
3016                        use tar::Builder;
3017                        use std::fs::File;
3018                        
3019                        let tar_file = File::create(destination)?;
3020                        let mut tar = Builder::new(tar_file);
3021                        tar.append_dir_all(".", source)?;
3022                        tar.finish()?;
3023                        
3024                        Ok(StepResult {
3025                            step_id: step.id.clone(),
3026                            status: StepStatus::Success,
3027                            output: serde_json::json!({"source": source, "destination": destination, "format": format, "compressed": true}),
3028                            error: None,
3029                            duration_ms: start.elapsed().as_millis() as u64,
3030                        })
3031                    },
3032                    "zip" => {
3033                        use std::fs::File;
3034                        use std::io::Write;
3035                        use walkdir::WalkDir;
3036                        use zip::write::FileOptions;
3037                        
3038                        let file = File::create(destination)?;
3039                        let mut zip = zip::ZipWriter::new(file);
3040                        let options = FileOptions::default().compression_method(zip::CompressionMethod::Stored);
3041                        
3042                        for entry in WalkDir::new(source) {
3043                            let entry = entry?;
3044                            let path = entry.path();
3045                            let name = path.strip_prefix(source).unwrap();
3046                            
3047                            if path.is_file() {
3048                                zip.start_file(name.to_string_lossy().as_ref(), options)?;
3049                                let file_content = std::fs::read(path)?;
3050                                zip.write_all(&file_content)?;
3051                            }
3052                        }
3053                        
3054                        zip.finish()?;
3055                        
3056                        Ok(StepResult {
3057                            step_id: step.id.clone(),
3058                            status: StepStatus::Success,
3059                            output: serde_json::json!({"source": source, "destination": destination, "format": format, "compressed": true}),
3060                            error: None,
3061                            duration_ms: start.elapsed().as_millis() as u64,
3062                        })
3063                    },
3064                    _ => Err(anyhow::anyhow!("Unsupported compression format: {}", format))
3065                }
3066            }
3067
3068            // Database Operations
3069            StepType::SqlQuery => {
3070                let query = step.parameters.get("query").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'query' parameter"))?;
3071                let database_url = step.parameters.get("database_url").and_then(|v| v.as_str()).unwrap_or("sqlite://memory:");
3072                
3073                // SQL support temporarily disabled due to RSA security vulnerability
3074                let _ = query;
3075                let _ = database_url;
3076                
3077                Ok(StepResult {
3078                    step_id: step.id.clone(),
3079                    status: StepStatus::Failed,
3080                    output: serde_json::json!({"error": "SQL feature disabled due to security vulnerabilities. Use alternative database solutions."}),
3081                    error: Some("SQL feature disabled due to security vulnerabilities. Use alternative database solutions.".to_string()),
3082                    duration_ms: start.elapsed().as_millis() as u64,
3083                })
3084            }
3085
3086            StepType::RedisSet => {
3087                let key = step.parameters.get("key").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'key' parameter"))?;
3088                let value = step.parameters.get("value").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'value' parameter"))?;
3089                let redis_url = step.parameters.get("redis_url").and_then(|v| v.as_str()).unwrap_or("redis://127.0.0.1:6379");
3090                
3091                // Variables used in feature-gated code
3092                #[cfg(not(feature = "redis"))]
3093                {
3094                    let _ = key;
3095                    let _ = value;
3096                    let _ = redis_url;
3097                }
3098                
3099                #[cfg(feature = "redis")]
3100                {
3101                    use redis::{Commands, Connection};
3102                    let client = redis::Client::open(redis_url)?;
3103                    let mut con: Connection = client.get_connection()?;
3104                    let _: () = con.set(key, value)?;
3105                    
3106                    Ok(StepResult {
3107                        step_id: step.id.clone(),
3108                        status: StepStatus::Success,
3109                        output: serde_json::json!({"key": key, "value": value, "set": true}),
3110                        error: None,
3111                        duration_ms: start.elapsed().as_millis() as u64,
3112                    })
3113                }
3114                
3115                #[cfg(not(feature = "redis"))]
3116                {
3117                    Ok(StepResult {
3118                        step_id: step.id.clone(),
3119                        status: StepStatus::Failed,
3120                        output: serde_json::json!({"error": "Redis feature not enabled"}),
3121                        error: Some("Redis feature not enabled".to_string()),
3122                        duration_ms: start.elapsed().as_millis() as u64,
3123                    })
3124                }
3125            }
3126
3127            StepType::RedisGet => {
3128                let key = step.parameters.get("key").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'key' parameter"))?;
3129                let redis_url = step.parameters.get("redis_url").and_then(|v| v.as_str()).unwrap_or("redis://127.0.0.1:6379");
3130                
3131                // Variables used in feature-gated code
3132                #[cfg(not(feature = "redis"))]
3133                {
3134                    let _ = key;
3135                    let _ = redis_url;
3136                }
3137                
3138                #[cfg(feature = "redis")]
3139                {
3140                    use redis::{Commands, Connection};
3141                    let client = redis::Client::open(redis_url)?;
3142                    let mut con: Connection = client.get_connection()?;
3143                    let value: Option<String> = con.get(key).ok();
3144                    
3145                    Ok(StepResult {
3146                        step_id: step.id.clone(),
3147                        status: StepStatus::Success,
3148                        output: serde_json::json!({"key": key, "value": value, "exists": value.is_some()}),
3149                        error: None,
3150                        duration_ms: start.elapsed().as_millis() as u64,
3151                    })
3152                }
3153                
3154                #[cfg(not(feature = "redis"))]
3155                {
3156                    Ok(StepResult {
3157                        step_id: step.id.clone(),
3158                        status: StepStatus::Failed,
3159                        output: serde_json::json!({"error": "Redis feature not enabled"}),
3160                        error: Some("Redis feature not enabled".to_string()),
3161                        duration_ms: start.elapsed().as_millis() as u64,
3162                    })
3163                }
3164            }
3165
3166            StepType::DbBackup => {
3167                let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
3168                let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
3169                
3170                // Simple file-based backup for SQLite
3171                tokio::fs::copy(source, destination).await?;
3172                
3173                Ok(StepResult {
3174                    step_id: step.id.clone(),
3175                    status: StepStatus::Success,
3176                    output: serde_json::json!({"source": source, "destination": destination, "backed_up": true}),
3177                    error: None,
3178                    duration_ms: start.elapsed().as_millis() as u64,
3179                })
3180            }
3181
3182            StepType::DbMigrate => {
3183                let migration_dir = step.parameters.get("migration_dir").and_then(|v| v.as_str()).unwrap_or("migrations");
3184                
3185                Ok(StepResult {
3186                    step_id: step.id.clone(),
3187                    status: StepStatus::Success,
3188                    output: serde_json::json!({"migration_dir": migration_dir, "note": "Migration implementation requires sqlx migration framework"}),
3189                    error: None,
3190                    duration_ms: start.elapsed().as_millis() as u64,
3191                })
3192            }
3193
3194            // Network Operations  
3195            StepType::WebsocketConnect => {
3196                let url = step.parameters.get("url").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter"))?;
3197                let message = step.parameters.get("message").and_then(|v| v.as_str()).unwrap_or("");
3198                
3199                // Variables used in feature-gated code
3200                #[cfg(not(feature = "tokio-tungstenite"))]
3201                {
3202                    let _ = url;
3203                    let _ = message;
3204                }
3205                
3206                #[cfg(feature = "tokio-tungstenite")]
3207                {
3208                    use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
3209                    
3210                    let (ws_stream, _) = connect_async(url).await?;
3211                    let (mut write, _read) = ws_stream.split();
3212                    
3213                    if !message.is_empty() {
3214                        write.send(Message::Text(message.to_string())).await?;
3215                    }
3216                    
3217                    Ok(StepResult {
3218                        step_id: step.id.clone(),
3219                        status: StepStatus::Success,
3220                        output: serde_json::json!({"url": url, "connected": true, "message_sent": !message.is_empty()}),
3221                        error: None,
3222                        duration_ms: start.elapsed().as_millis() as u64,
3223                    })
3224                }
3225                
3226                #[cfg(not(feature = "tokio-tungstenite"))]
3227                {
3228                    Ok(StepResult {
3229                        step_id: step.id.clone(),
3230                        status: StepStatus::Failed,
3231                        output: serde_json::json!({"error": "WebSocket feature not enabled"}),
3232                        error: Some("WebSocket feature not enabled".to_string()),
3233                        duration_ms: start.elapsed().as_millis() as u64,
3234                    })
3235                }
3236            }
3237
3238            StepType::FtpUpload => {
3239                let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
3240                let username = step.parameters.get("username").and_then(|v| v.as_str()).unwrap_or("anonymous");
3241                let password = step.parameters.get("password").and_then(|v| v.as_str()).unwrap_or("");
3242                let local_file = step.parameters.get("local_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'local_file' parameter"))?;
3243                let remote_file = step.parameters.get("remote_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'remote_file' parameter"))?;
3244                
3245                // FTP support removed due to security vulnerabilities
3246                let _ = host;
3247                let _ = username;
3248                let _ = password;
3249                let _ = local_file;
3250                let _ = remote_file;
3251                
3252                Ok(StepResult {
3253                    step_id: step.id.clone(),
3254                    status: StepStatus::Failed,
3255                    output: serde_json::json!({"error": "FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives."}),
3256                    error: Some("FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives.".to_string()),
3257                    duration_ms: start.elapsed().as_millis() as u64,
3258                })
3259            }
3260
3261            StepType::FtpDownload => {
3262                let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
3263                let username = step.parameters.get("username").and_then(|v| v.as_str()).unwrap_or("anonymous");
3264                let password = step.parameters.get("password").and_then(|v| v.as_str()).unwrap_or("");
3265                let remote_file = step.parameters.get("remote_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'remote_file' parameter"))?;
3266                let local_file = step.parameters.get("local_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'local_file' parameter"))?;
3267                
3268                // FTP support removed due to security vulnerabilities
3269                let _ = host;
3270                let _ = username;
3271                let _ = password;
3272                let _ = remote_file;
3273                let _ = local_file;
3274                
3275                Ok(StepResult {
3276                    step_id: step.id.clone(),
3277                    status: StepStatus::Failed,
3278                    output: serde_json::json!({"error": "FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives."}),
3279                    error: Some("FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives.".to_string()),
3280                    duration_ms: start.elapsed().as_millis() as u64,
3281                })
3282            }
3283
3284            StepType::SshExecute => {
3285                let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
3286                let username = step.parameters.get("username").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'username' parameter"))?;
3287                let command = step.parameters.get("command").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
3288                
3289                // Variables used in feature-gated code
3290                #[cfg(all(feature = "openssh", unix))]
3291                {
3292                    use openssh::{Session, KnownHosts};
3293                    
3294                    let session = Session::connect(format!("{}@{}", username, host), KnownHosts::Strict).await?;
3295                    let output = session.command(command).output().await?;
3296                    
3297                    let stdout = String::from_utf8_lossy(&output.stdout);
3298                    let stderr = String::from_utf8_lossy(&output.stderr);
3299                    
3300                    Ok(StepResult {
3301                        step_id: step.id.clone(),
3302                        status: if output.status.success() { StepStatus::Success } else { StepStatus::Failed },
3303                        output: serde_json::json!({"command": command, "stdout": stdout, "stderr": stderr, "exit_code": output.status.code()}),
3304                        error: None,
3305                        duration_ms: start.elapsed().as_millis() as u64,
3306                    })
3307                }
3308                
3309                #[cfg(not(all(feature = "openssh", unix)))]
3310                {
3311                    let _ = host;
3312                    let _ = username;
3313                    let _ = command;
3314                    
3315                    Ok(StepResult {
3316                        step_id: step.id.clone(),
3317                        status: StepStatus::Failed,
3318                        output: serde_json::json!({"error": "SSH feature not enabled or not on Unix"}),
3319                        error: Some("SSH feature not enabled or not on Unix".to_string()),
3320                        duration_ms: start.elapsed().as_millis() as u64,
3321                    })
3322                }
3323            }
3324
3325            StepType::PingHost => {
3326                let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
3327                let count = step.parameters.get("count").and_then(|v| v.as_u64()).unwrap_or(4);
3328                
3329                let output = Command::new("ping")
3330                    .args(&["-c", &count.to_string(), host])
3331                    .output()
3332                    .await?;
3333                
3334                let stdout = String::from_utf8_lossy(&output.stdout);
3335                let success = output.status.success();
3336                
3337                Ok(StepResult {
3338                    step_id: step.id.clone(),
3339                    status: if success { StepStatus::Success } else { StepStatus::Failed },
3340                    output: serde_json::json!({"host": host, "count": count, "success": success, "output": stdout}),
3341                    error: None,
3342                    duration_ms: start.elapsed().as_millis() as u64,
3343                })
3344            }
3345
3346            // AI/ML Operations
3347            StepType::GenerateEmbedding => {
3348                let text = step.parameters.get("text").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'text' parameter"))?;
3349                let model = step.parameters.get("model").and_then(|v| v.as_str()).unwrap_or("text-embedding-ada-002");
3350                
3351                // Variables used in feature-gated code
3352                #[cfg(not(feature = "llm"))]
3353                {
3354                    let _ = text;
3355                    let _ = model;
3356                }
3357                
3358                #[cfg(feature = "llm")]
3359                {
3360                    // Use actual LLM service for embeddings
3361                    use crate::llm::{create_default_llm_manager};
3362                    
3363                    let _manager = create_default_llm_manager()
3364                        .map_err(|e| anyhow::anyhow!("Failed to create LLM manager: {}", e))?;
3365
3366                    // For now, generate deterministic mock embeddings based on text content
3367                    // Using deterministic mock embeddings - production embedding API integration planned
3368                    // Real embedding support requires dedicated embedding models (text-embedding-ada-002, etc.)
3369                    let text_hash = text.chars()
3370                        .enumerate()
3371                        .map(|(i, c)| (c as u32 as f32 + i as f32 * 0.001) % 1.0)
3372                        .collect::<Vec<f32>>();
3373                    
3374                    // Pad or truncate to standard embedding size (1536 dimensions)
3375                    let mut embedding = vec![0.0; 1536];
3376                    for (i, &val) in text_hash.iter().take(1536).enumerate() {
3377                        embedding[i] = val;
3378                    }
3379                    
3380                    // Fill remaining with deterministic values based on text length
3381                    for i in text_hash.len()..1536 {
3382                        embedding[i] = ((text.len() * (i + 1)) as f32 * 0.001) % 1.0;
3383                    }
3384                    
3385                    Ok(StepResult {
3386                        step_id: step.id.clone(),
3387                        status: StepStatus::Success,
3388                        output: serde_json::json!({
3389                            "text": text,
3390                            "model": model,
3391                            "embedding": embedding,
3392                            "dimensions": embedding.len(),
3393                            "note": "Deterministic embedding generation - ready for embedding API integration"
3394                        }),
3395                        error: None,
3396                        duration_ms: start.elapsed().as_millis() as u64,
3397                    })
3398                }
3399                
3400                #[cfg(not(feature = "llm"))]
3401                {
3402                    Ok(StepResult {
3403                        step_id: step.id.clone(),
3404                        status: StepStatus::Failed,
3405                        output: serde_json::json!({"error": "LLM feature not enabled"}),
3406                        error: Some("LLM feature not enabled".to_string()),
3407                        duration_ms: start.elapsed().as_millis() as u64,
3408                    })
3409                }
3410            }
3411
3412            StepType::SimilaritySearch => {
3413                let _query_embedding = step.parameters.get("query_embedding").and_then(|v| v.as_array());
3414                let database = step.parameters.get("database").and_then(|v| v.as_str()).unwrap_or("default");
3415                let top_k = step.parameters.get("top_k").and_then(|v| v.as_u64()).unwrap_or(5);
3416                
3417                // Variables used in feature-gated code
3418                #[cfg(not(feature = "rag"))]
3419                {
3420                    let _ = _query_embedding;
3421                    let _ = database;
3422                    let _ = top_k;
3423                }
3424                
3425                #[cfg(feature = "rag")]
3426                {
3427                    // Deterministic similarity search results based on database name
3428                    // Using deterministic similarity search results - production vector DB integration planned
3429                    // Real vector database integration requires Pinecone, Chroma, or similar embedding store
3430                    let database_hash = database.chars().map(|c| c as u32).sum::<u32>();
3431                    let mut results = Vec::new();
3432                    
3433                    for i in 0..(top_k.min(10)) {
3434                        let doc_id = format!("doc_{}_{}_{}", database, i + 1, database_hash);
3435                        let score = 0.95 - (i as f64 * 0.08); // Decreasing scores
3436                        let text = format!("Document {} from {} database - content hash {}", i + 1, database, database_hash + i as u32);
3437                        
3438                        results.push(serde_json::json!({
3439                            "id": doc_id,
3440                            "score": score,
3441                            "text": text,
3442                            "database": database,
3443                            "rank": i + 1
3444                        }));
3445                    }
3446                    
3447                    Ok(StepResult {
3448                        step_id: step.id.clone(),
3449                        status: StepStatus::Success,
3450                        output: serde_json::json!({
3451                            "database": database,
3452                            "top_k": top_k,
3453                            "results": results,
3454                            "count": results.len(),
3455                            "note": "Deterministic similarity search - ready for vector database integration"
3456                        }),
3457                        error: None,
3458                        duration_ms: start.elapsed().as_millis() as u64,
3459                    })
3460                }
3461                
3462                #[cfg(not(feature = "rag"))]
3463                {
3464                    Ok(StepResult {
3465                        step_id: step.id.clone(),
3466                        status: StepStatus::Failed,
3467                        output: serde_json::json!({"error": "RAG feature not enabled"}),
3468                        error: Some("RAG feature not enabled".to_string()),
3469                        duration_ms: start.elapsed().as_millis() as u64,
3470                    })
3471                }
3472            }
3473
3474            StepType::ModelInference => {
3475                let prompt = step.parameters.get("prompt").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'prompt' parameter"))?;
3476                let model = step.parameters.get("model").and_then(|v| v.as_str()).unwrap_or("gpt-3.5-turbo");
3477                let max_tokens = step.parameters.get("max_tokens").and_then(|v| v.as_u64()).unwrap_or(100);
3478                
3479                // Variables used in feature-gated code
3480                #[cfg(not(feature = "llm"))]
3481                {
3482                    let _ = prompt;
3483                    let _ = model;
3484                    let _ = max_tokens;
3485                }
3486                
3487                #[cfg(feature = "llm")]
3488                {
3489                    // Use actual LLM service for model inference
3490                    use crate::llm::{create_default_llm_manager, ChatMessage, LLMRequest, MessageRole};
3491                    
3492                    let manager = create_default_llm_manager()
3493                        .map_err(|e| anyhow::anyhow!("Failed to create LLM manager: {}", e))?;
3494
3495                    let request = LLMRequest {
3496                        messages: vec![ChatMessage {
3497                            role: MessageRole::User,
3498                            content: prompt.to_string(),
3499                            name: None,
3500                            tool_calls: None,
3501                            tool_call_id: None,
3502                        }],
3503                        model: Some(model.to_string()),
3504                        temperature: None,
3505                        max_tokens: Some(max_tokens as u32),
3506                        stream: false,
3507                        tools: None,
3508                        metadata: HashMap::new(),
3509                    };
3510
3511                    let response = manager
3512                        .complete(request, None)
3513                        .await
3514                        .map_err(|e| anyhow::anyhow!("LLM inference failed: {}", e))?;
3515                    
3516                    Ok(StepResult {
3517                        step_id: step.id.clone(),
3518                        status: StepStatus::Success,
3519                        output: serde_json::json!({
3520                            "prompt": prompt,
3521                            "model": model,
3522                            "response": response.content,
3523                            "max_tokens": max_tokens,
3524                            "tokens_used": response.usage.total_tokens,
3525                            "finish_reason": format!("{:?}", response.finish_reason)
3526                        }),
3527                        error: None,
3528                        duration_ms: start.elapsed().as_millis() as u64,
3529                    })
3530                }
3531                
3532                #[cfg(not(feature = "llm"))]
3533                {
3534                    Ok(StepResult {
3535                        step_id: step.id.clone(),
3536                        status: StepStatus::Failed,
3537                        output: serde_json::json!({"error": "LLM feature not enabled"}),
3538                        error: Some("LLM feature not enabled".to_string()),
3539                        duration_ms: start.elapsed().as_millis() as u64,
3540                    })
3541                }
3542            }
3543        };
3544
3545        result
3546    }
3547}
3548
3549/// Execution context that carries state between steps
3550pub struct ExecutionContext {
3551    pub variables: HashMap<String, String>,
3552    pub environment: HashMap<String, String>,
3553}
3554
3555impl Default for ExecutionContext {
3556    fn default() -> Self {
3557        Self::new()
3558    }
3559}
3560
3561impl ExecutionContext {
3562    pub fn new() -> Self {
3563        Self {
3564            variables: HashMap::new(),
3565            environment: HashMap::new(),
3566        }
3567    }
3568
3569    pub fn set_variable(&mut self, key: &str, value: &str) {
3570        self.variables.insert(key.to_string(), value.to_string());
3571    }
3572
3573    pub fn get_variable(&self, key: &str) -> Option<&String> {
3574        self.variables.get(key)
3575    }
3576
3577    /// Substitute variables in text using {variable_name} syntax
3578    pub fn substitute_variables(&self, text: &str) -> String {
3579        let mut result = text.to_string();
3580        
3581        // DEBUG: Log available variables for debugging
3582        debug!("Variable substitution - Available variables: {:?}", self.variables.keys().collect::<Vec<_>>());
3583        debug!("Variable substitution - Input text: {}", text);
3584        
3585        // Handle {previous_result} - look for the most recent step result
3586        if result.contains("{previous_result}") {
3587            if let Some(last_result) = self.get_last_result() {
3588                result = result.replace("{previous_result}", &last_result);
3589            }
3590        }
3591        
3592        // Handle both {step_id} and {step_id_response} patterns
3593        for (key, value) in &self.variables {
3594            let placeholder = format!("{{{}}}", key);
3595            result = result.replace(&placeholder, value);
3596            
3597            // Also support {step_id} when variable is {step_id_response}
3598            if key.ends_with("_response") {
3599                let step_id = key.strip_suffix("_response").unwrap();
3600                let step_placeholder = format!("{{{}}}", step_id);
3601                result = result.replace(&step_placeholder, value);
3602            }
3603            
3604            // Also support {step_id} when variable is {step_id_result}
3605            if key.ends_with("_result") {
3606                let step_id = key.strip_suffix("_result").unwrap();
3607                let step_placeholder = format!("{{{}}}", step_id);
3608                result = result.replace(&step_placeholder, value);
3609            }
3610            
3611            // Also support {step_id_result} when variable is {step_id_response}
3612            if key.ends_with("_response") {
3613                let step_id = key.strip_suffix("_response").unwrap();
3614                let result_placeholder = format!("{{{}_result}}", step_id);
3615                debug!("Checking result pattern: '{}' -> '{}'", result_placeholder, value);
3616                if result.contains(&result_placeholder) {
3617                    debug!("Found result pattern match, substituting");
3618                    result = result.replace(&result_placeholder, value);
3619                }
3620            }
3621        }
3622        
3623        result
3624    }
3625    
3626    /// Get the most recent step result (for {previous_result})
3627    fn get_last_result(&self) -> Option<String> {
3628        // Look for variables ending with _response or _result
3629        // Try _response first (LLM results), then _result (other step results)
3630        let mut candidates: Vec<(&String, &String)> = self.variables.iter()
3631            .filter(|(key, _)| key.ends_with("_response") || key.ends_with("_result"))
3632            .collect();
3633        
3634        // Sort to get consistent ordering - prefer _response over _result
3635        candidates.sort_by(|a, b| {
3636            if a.0.ends_with("_response") && b.0.ends_with("_result") {
3637                std::cmp::Ordering::Less
3638            } else if a.0.ends_with("_result") && b.0.ends_with("_response") {
3639                std::cmp::Ordering::Greater  
3640            } else {
3641                a.0.cmp(b.0)
3642            }
3643        });
3644        
3645        candidates.first().map(|(_, value)| (*value).clone())
3646    }
3647}
3648
3649#[derive(Debug, Clone, Serialize, Deserialize)]
3650pub struct MissionResult {
3651    pub mission_id: Uuid,
3652    pub status: MissionStatus,
3653    pub step_results: HashMap<String, StepResult>,
3654    pub total_duration_ms: u64,
3655}
3656
3657#[derive(Debug, Clone, Serialize, Deserialize)]
3658pub enum MissionStatus {
3659    Running,
3660    Completed,
3661    Failed,
3662    Cancelled,
3663}
3664
3665#[derive(Debug, Clone, Serialize, Deserialize)]
3666pub struct StepResult {
3667    pub step_id: String,
3668    pub status: StepStatus,
3669    pub output: serde_json::Value,
3670    pub error: Option<String>,
3671    pub duration_ms: u64,
3672}
3673
3674#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
3675pub enum StepStatus {
3676    Pending,
3677    Running,
3678    Success,
3679    Failed,
3680    Skipped,
3681}