1use schemars::JsonSchema;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::time::Instant;
6use tokio::process::Command;
7use tracing::{debug, error, info, warn};
8use uuid::Uuid;
9
10#[cfg(feature = "tokio-tungstenite")]
14use futures::{SinkExt, StreamExt};
15
16pub mod chain_executor;
17
18fn sanitize_file_path(path: &str) -> anyhow::Result<String> {
20 use std::path::{Path, Component};
21
22 if path.contains("..") || path.contains("~") {
24 return Err(anyhow::anyhow!("Path traversal detected: {}", path));
25 }
26
27 let windows_reserved = ["CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4",
29 "COM5", "COM6", "COM7", "COM8", "COM9", "LPT1", "LPT2",
30 "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"];
31
32 let file_name = Path::new(path).file_name()
33 .and_then(|n| n.to_str())
34 .unwrap_or("");
35
36 if windows_reserved.iter().any(|&reserved| file_name.eq_ignore_ascii_case(reserved)) {
37 return Err(anyhow::anyhow!("Windows reserved filename: {}", path));
38 }
39
40 let normalized = Path::new(path).components()
42 .filter(|component| match component {
43 Component::Normal(_) => true,
44 Component::CurDir => false,
45 Component::ParentDir => false,
46 _ => true, })
48 .collect::<std::path::PathBuf>();
49
50 let sanitized = normalized.to_string_lossy().to_string();
52
53 if cfg!(unix) && (sanitized.starts_with("/etc/") || sanitized.starts_with("/sys/") ||
55 sanitized.starts_with("/proc/") || sanitized.starts_with("/dev/")) {
56 return Err(anyhow::anyhow!("Access to system directory denied: {}", sanitized));
57 }
58
59 if cfg!(windows) && (sanitized.to_lowercase().starts_with("c:\\windows\\") ||
60 sanitized.to_lowercase().starts_with("c:\\system32\\")) {
61 return Err(anyhow::anyhow!("Access to system directory denied: {}", sanitized));
62 }
63
64 Ok(sanitized)
65}
66
67fn sanitize_command(command: &str, args: &[&str]) -> anyhow::Result<()> {
69 let dangerous_patterns = [
71 "&&", "||", ";", "|", "`", "$", "&", ">", "<",
72 "$(", "${", "rm -rf", "del /f", "format", "shutdown", "reboot"
73 ];
74
75 for pattern in dangerous_patterns {
76 if command.contains(pattern) {
77 return Err(anyhow::anyhow!("Dangerous command pattern detected: {}", pattern));
78 }
79
80 for arg in args {
81 if arg.contains(pattern) {
82 return Err(anyhow::anyhow!("Dangerous argument pattern detected: {}", pattern));
83 }
84 }
85 }
86
87 let dangerous_commands = [
89 "rm", "rmdir", "del", "deltree", "format", "fdisk", "mkfs",
90 "sudo", "su", "passwd", "useradd", "userdel", "chmod", "chown",
91 "shutdown", "reboot", "halt", "init", "systemctl", "service",
92 "curl", "wget", "nc", "netcat", "telnet", "ssh", "ftp", "sftp"
93 ];
94
95 for dangerous_cmd in dangerous_commands {
96 if command.eq_ignore_ascii_case(dangerous_cmd) {
97 return Err(anyhow::anyhow!("Dangerous command blocked: {}", dangerous_cmd));
98 }
99 }
100
101 let script_interpreters = ["bash", "sh", "powershell", "python", "perl", "ruby"];
103
104 if script_interpreters.iter().any(|&interp| command.eq_ignore_ascii_case(interp)) {
105 for arg in args {
107 if arg.contains("-c") || arg.contains("-e") {
108 return Err(anyhow::anyhow!("Inline script execution blocked"));
109 }
110 }
111 }
112
113 if command.eq_ignore_ascii_case("cmd") {
115 for (i, arg) in args.iter().enumerate() {
116 if *arg == "/c" && i + 1 < args.len() {
117 let next_cmd = args[i + 1];
119 for dangerous_cmd in ["del", "rmdir", "format", "shutdown", "reboot"] {
120 if next_cmd.eq_ignore_ascii_case(dangerous_cmd) {
121 return Err(anyhow::anyhow!("Dangerous Windows command blocked: {}", next_cmd));
122 }
123 }
124 }
125 }
126 }
127
128 Ok(())
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
132pub struct Mission {
133 pub version: String,
134 pub name: String,
135 pub description: Option<String>,
136 pub steps: Vec<MissionStep>,
137 pub config: Option<MissionConfig>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
141pub struct MissionStep {
142 pub id: String,
143 pub name: String,
144 pub step_type: StepType,
145 pub depends_on: Option<Vec<String>>,
146 pub timeout_seconds: Option<u64>,
147 pub continue_on_error: Option<bool>,
148 pub parameters: serde_json::Value,
149}
150
151#[derive(Debug, Clone, JsonSchema)]
152pub enum StepType {
153 CreateFile,
155 EditFile,
156 DeleteFile,
157 CopyFile,
158 MoveFile,
159 ReadFile,
160 ListDirectory,
161 FileSearch,
162 ParseJson,
164 ParseYaml,
165 ParseXml,
166 ValidateSchema,
167 CsvProcess,
168 CompileCode,
170 RunTests,
171 FormatCode,
172 LintCode,
173 ExtractFunctions,
174 GenerateDocs,
175 GitCommit,
177 GitBranch,
178 GitMerge,
179 GitStatus,
180 GitDiff,
181 ProcessStart,
183 ProcessKill,
184 MonitorResources,
185 ServiceHealth,
186 Compress,
187 SqlQuery,
189 RedisSet,
190 RedisGet,
191 DbBackup,
192 DbMigrate,
193 WebsocketConnect,
195 FtpUpload,
196 FtpDownload,
197 SshExecute,
198 PingHost,
199 GenerateEmbedding,
201 SimilaritySearch,
202 ModelInference,
203 Command,
205 Http,
206 Noop,
207 Llm,
208 Tool,
209 RagQuery,
210 RagAdd,
211 Chain,
212 Agent,
213}
214
215impl Serialize for StepType {
217 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
218 where
219 S: serde::Serializer,
220 {
221 let s = match self {
222 StepType::CreateFile => "create_file",
224 StepType::EditFile => "edit_file",
225 StepType::DeleteFile => "delete_file",
226 StepType::CopyFile => "copy_file",
227 StepType::MoveFile => "move_file",
228 StepType::ReadFile => "read_file",
229 StepType::ListDirectory => "list_directory",
230 StepType::FileSearch => "file_search",
231 StepType::ParseJson => "parse_json",
233 StepType::ParseYaml => "parse_yaml",
234 StepType::ParseXml => "parse_xml",
235 StepType::ValidateSchema => "validate_schema",
236 StepType::CsvProcess => "csv_process",
237 StepType::CompileCode => "compile_code",
239 StepType::RunTests => "run_tests",
240 StepType::FormatCode => "format_code",
241 StepType::LintCode => "lint_code",
242 StepType::ExtractFunctions => "extract_functions",
243 StepType::GenerateDocs => "generate_docs",
244 StepType::GitCommit => "git_commit",
246 StepType::GitBranch => "git_branch",
247 StepType::GitMerge => "git_merge",
248 StepType::GitStatus => "git_status",
249 StepType::GitDiff => "git_diff",
250 StepType::ProcessStart => "process_start",
252 StepType::ProcessKill => "process_kill",
253 StepType::MonitorResources => "monitor_resources",
254 StepType::ServiceHealth => "service_health",
255 StepType::Compress => "compress",
256 StepType::SqlQuery => "sql_query",
258 StepType::RedisSet => "redis_set",
259 StepType::RedisGet => "redis_get",
260 StepType::DbBackup => "db_backup",
261 StepType::DbMigrate => "db_migrate",
262 StepType::WebsocketConnect => "websocket_connect",
264 StepType::FtpUpload => "ftp_upload",
265 StepType::FtpDownload => "ftp_download",
266 StepType::SshExecute => "ssh_execute",
267 StepType::PingHost => "ping_host",
268 StepType::GenerateEmbedding => "generate_embedding",
270 StepType::SimilaritySearch => "similarity_search",
271 StepType::ModelInference => "model_inference",
272 StepType::Command => "command",
274 StepType::Http => "http",
275 StepType::Noop => "noop",
276 StepType::Llm => "llm",
277 StepType::Tool => "tool",
278 StepType::RagQuery => "rag_query",
279 StepType::RagAdd => "rag_add",
280 StepType::Chain => "chain",
281 StepType::Agent => "agent",
282 };
283 serializer.serialize_str(s)
284 }
285}
286
287impl<'de> Deserialize<'de> for StepType {
288 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
289 where
290 D: serde::Deserializer<'de>,
291 {
292 use serde::de::{self, Visitor};
293 use std::fmt;
294
295 struct StepTypeVisitor;
296
297 impl<'de> Visitor<'de> for StepTypeVisitor {
298 type Value = StepType;
299
300 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
301 formatter.write_str("a string or object representing a step type")
302 }
303
304 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
306 where
307 E: de::Error,
308 {
309 match value {
310 "create_file" => Ok(StepType::CreateFile),
312 "edit_file" => Ok(StepType::EditFile),
313 "delete_file" => Ok(StepType::DeleteFile),
314 "copy_file" => Ok(StepType::CopyFile),
315 "move_file" => Ok(StepType::MoveFile),
316 "read_file" => Ok(StepType::ReadFile),
317 "list_directory" => Ok(StepType::ListDirectory),
318 "file_search" => Ok(StepType::FileSearch),
319 "parse_json" => Ok(StepType::ParseJson),
321 "parse_yaml" => Ok(StepType::ParseYaml),
322 "parse_xml" => Ok(StepType::ParseXml),
323 "validate_schema" => Ok(StepType::ValidateSchema),
324 "csv_process" => Ok(StepType::CsvProcess),
325 "compile_code" => Ok(StepType::CompileCode),
327 "run_tests" => Ok(StepType::RunTests),
328 "format_code" => Ok(StepType::FormatCode),
329 "lint_code" => Ok(StepType::LintCode),
330 "extract_functions" => Ok(StepType::ExtractFunctions),
331 "generate_docs" => Ok(StepType::GenerateDocs),
332 "git_commit" => Ok(StepType::GitCommit),
334 "git_branch" => Ok(StepType::GitBranch),
335 "git_merge" => Ok(StepType::GitMerge),
336 "git_status" => Ok(StepType::GitStatus),
337 "git_diff" => Ok(StepType::GitDiff),
338 "process_start" => Ok(StepType::ProcessStart),
340 "process_kill" => Ok(StepType::ProcessKill),
341 "monitor_resources" => Ok(StepType::MonitorResources),
342 "service_health" => Ok(StepType::ServiceHealth),
343 "compress" => Ok(StepType::Compress),
344 "sql_query" => Ok(StepType::SqlQuery),
346 "redis_set" => Ok(StepType::RedisSet),
347 "redis_get" => Ok(StepType::RedisGet),
348 "db_backup" => Ok(StepType::DbBackup),
349 "db_migrate" => Ok(StepType::DbMigrate),
350 "websocket_connect" => Ok(StepType::WebsocketConnect),
352 "ftp_upload" => Ok(StepType::FtpUpload),
353 "ftp_download" => Ok(StepType::FtpDownload),
354 "ssh_execute" => Ok(StepType::SshExecute),
355 "ping_host" => Ok(StepType::PingHost),
356 "generate_embedding" => Ok(StepType::GenerateEmbedding),
358 "similarity_search" => Ok(StepType::SimilaritySearch),
359 "model_inference" => Ok(StepType::ModelInference),
360 "command" => Ok(StepType::Command),
362 "http" => Ok(StepType::Http),
363 "noop" => Ok(StepType::Noop),
364 "llm" => Ok(StepType::Llm),
365 "tool" => Ok(StepType::Tool),
366 "rag_query" => Ok(StepType::RagQuery),
367 "rag_add" => Ok(StepType::RagAdd),
368 "chain" => Ok(StepType::Chain),
369 "agent" => Ok(StepType::Agent),
370 "Tool" => Ok(StepType::Tool),
372 other => Err(E::unknown_variant(
373 other,
374 &[
375 "create_file",
376 "edit_file",
377 "delete_file",
378 "command",
379 "http",
380 "noop",
381 "llm",
382 "tool",
383 "rag_query",
384 "rag_add",
385 "chain",
386 "agent",
387 ],
388 )),
389 }
390 }
391
392 fn visit_map<V>(self, mut map: V) -> Result<Self::Value, V::Error>
394 where
395 V: de::MapAccess<'de>,
396 {
397 let mut step_type: Option<String> = None;
398
399 while let Some(key) = map.next_key::<String>()? {
400 match key.as_str() {
401 "type" => {
402 if step_type.is_some() {
403 return Err(de::Error::duplicate_field("type"));
404 }
405 step_type = Some(map.next_value()?);
406 }
407 _ => {
409 let _: serde_json::Value = map.next_value()?;
410 }
411 }
412 }
413
414 let step_type = step_type.ok_or_else(|| de::Error::missing_field("type"))?;
415 self.visit_str(&step_type)
416 }
417 }
418
419 deserializer.deserialize_any(StepTypeVisitor)
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use tempfile::TempDir;
427 use tokio_test;
428 use std::io::Write;
429
430 mod steptype_tests {
432 use super::*;
433
434 #[test]
435 fn test_steptype_simple_string_serialization() {
436 let step_type = StepType::CreateFile;
437 let serialized = serde_json::to_string(&step_type).unwrap();
438 assert_eq!(serialized, "\"create_file\"");
439
440 let deserialized: StepType = serde_json::from_str(&serialized).unwrap();
441 assert!(matches!(deserialized, StepType::CreateFile));
442 }
443
444 #[test]
445 fn test_steptype_legacy_format_support() {
446 let json = "\"Tool\"";
447 let deserialized: StepType = serde_json::from_str(json).unwrap();
448 assert!(matches!(deserialized, StepType::Tool));
449 }
450
451 #[test]
452 fn test_steptype_future_object_format() {
453 let json = r#"{"type": "create_file", "metadata": {"version": "1.2"}}"#;
455 let deserialized: StepType = serde_json::from_str(json).unwrap();
456 assert!(matches!(deserialized, StepType::CreateFile));
457 }
458
459 #[test]
460 fn test_steptype_all_variants() {
461 let variants = vec![
462 (StepType::CreateFile, "create_file"),
463 (StepType::EditFile, "edit_file"),
464 (StepType::DeleteFile, "delete_file"),
465 (StepType::Command, "command"),
466 (StepType::Http, "http"),
467 (StepType::Noop, "noop"),
468 (StepType::Llm, "llm"),
469 (StepType::Tool, "tool"),
470 (StepType::RagQuery, "rag_query"),
471 (StepType::RagAdd, "rag_add"),
472 (StepType::Chain, "chain"),
473 (StepType::Agent, "agent"),
474 ];
475
476 for (step_type, expected_str) in variants {
477 let serialized = serde_json::to_string(&step_type).unwrap();
479 assert_eq!(serialized, format!("\"{}\"", expected_str));
480
481 let deserialized: StepType = serde_json::from_str(&serialized).unwrap();
483 assert!(std::mem::discriminant(&step_type) == std::mem::discriminant(&deserialized));
484 }
485 }
486
487 #[test]
488 fn test_steptype_invalid_string() {
489 let json = "\"invalid_step\"";
490 let result: Result<StepType, _> = serde_json::from_str(json);
491 assert!(result.is_err());
492 }
493
494 #[test]
495 fn test_steptype_object_missing_type() {
496 let json = r#"{"metadata": {"version": "1.2"}}"#;
497 let result: Result<StepType, _> = serde_json::from_str(json);
498 assert!(result.is_err());
499 }
500
501 #[test]
502 fn test_steptype_object_duplicate_type() {
503 let json = r#"{"type": "create_file", "type": "edit_file"}"#;
504 let result: Result<StepType, _> = serde_json::from_str(json);
505 assert!(result.is_err());
506 }
507 }
508
509 mod mission_tests {
511 use super::*;
512
513 #[test]
514 fn test_mission_creation() {
515 let mission = Mission {
516 version: "1.0".to_string(),
517 name: "Test Mission".to_string(),
518 description: Some("A test mission".to_string()),
519 steps: vec![
520 MissionStep {
521 id: "step1".to_string(),
522 name: "First Step".to_string(),
523 step_type: StepType::Noop,
524 depends_on: None,
525 timeout_seconds: Some(60),
526 continue_on_error: None,
527 parameters: serde_json::json!({"key": "value"}),
528 }
529 ],
530 config: Some(MissionConfig {
531 max_parallel_steps: Some(2),
532 timeout_seconds: Some(300),
533 fail_fast: Some(true),
534 }),
535 };
536
537 assert_eq!(mission.name, "Test Mission");
538 assert_eq!(mission.steps.len(), 1);
539 assert!(mission.config.is_some());
540 }
541
542 #[test]
543 fn test_mission_step_serialization() {
544 let step = MissionStep {
545 id: "test_step".to_string(),
546 name: "Test Step".to_string(),
547 step_type: StepType::CreateFile,
548 depends_on: Some(vec!["dep1".to_string(), "dep2".to_string()]),
549 timeout_seconds: Some(120),
550 continue_on_error: None,
551 parameters: serde_json::json!({
552 "path": "/tmp/test.txt",
553 "content": "Hello, World!"
554 }),
555 };
556
557 let serialized = serde_json::to_string(&step).unwrap();
558 let deserialized: MissionStep = serde_json::from_str(&serialized).unwrap();
559
560 assert_eq!(deserialized.id, "test_step");
561 assert_eq!(deserialized.name, "Test Step");
562 assert!(matches!(deserialized.step_type, StepType::CreateFile));
563 assert_eq!(deserialized.depends_on.unwrap().len(), 2);
564 assert_eq!(deserialized.timeout_seconds.unwrap(), 120);
565 }
566
567 #[test]
568 fn test_mission_config_defaults() {
569 let config = MissionConfig {
570 max_parallel_steps: None,
571 timeout_seconds: None,
572 fail_fast: None,
573 };
574
575 let serialized = serde_json::to_string(&config).unwrap();
576 let deserialized: MissionConfig = serde_json::from_str(&serialized).unwrap();
577
578 assert!(deserialized.max_parallel_steps.is_none());
579 assert!(deserialized.timeout_seconds.is_none());
580 assert!(deserialized.fail_fast.is_none());
581 }
582 }
583
584 mod mission_loader_tests {
586 use super::*;
587 use std::fs;
588
589 #[test]
590 fn test_load_mission_yaml() {
591 let temp_dir = TempDir::new().unwrap();
592 let mission_path = temp_dir.path().join("test_mission.yaml");
593
594 let mission_yaml = r#"
595version: "1.0"
596name: "YAML Test Mission"
597description: "Testing YAML loading"
598steps:
599 - id: "step1"
600 name: "First Step"
601 step_type: "noop"
602 parameters: {}
603config:
604 max_parallel_steps: 3
605 timeout_seconds: 600
606 fail_fast: true
607"#;
608 fs::write(&mission_path, mission_yaml).unwrap();
609
610 let mission = MissionLoader::load_from_file(mission_path.to_str().unwrap()).unwrap();
611 assert_eq!(mission.name, "YAML Test Mission");
612 assert_eq!(mission.steps.len(), 1);
613 assert!(mission.config.is_some());
614 assert_eq!(mission.config.unwrap().max_parallel_steps.unwrap(), 3);
615 }
616
617 #[test]
618 fn test_load_mission_json() {
619 let temp_dir = TempDir::new().unwrap();
620 let mission_path = temp_dir.path().join("test_mission.json");
621
622 let mission_json = r#"{
623 "version": "1.0",
624 "name": "JSON Test Mission",
625 "description": "Testing JSON loading",
626 "steps": [
627 {
628 "id": "step1",
629 "name": "First Step",
630 "step_type": "noop",
631 "parameters": {}
632 }
633 ],
634 "config": {
635 "max_parallel_steps": 5,
636 "timeout_seconds": 900,
637 "fail_fast": false
638 }
639 }"#;
640 fs::write(&mission_path, mission_json).unwrap();
641
642 let mission = MissionLoader::load_from_file(mission_path.to_str().unwrap()).unwrap();
643 assert_eq!(mission.name, "JSON Test Mission");
644 assert_eq!(mission.steps.len(), 1);
645 assert!(mission.config.is_some());
646 assert_eq!(mission.config.unwrap().max_parallel_steps.unwrap(), 5);
647 }
648
649 #[test]
650 fn test_load_mission_empty_path() {
651 let result = MissionLoader::load_from_file("");
652 assert!(result.is_err());
653 assert!(result.unwrap_err().to_string().contains("Mission path must not be empty"));
654 }
655
656 #[test]
657 fn test_load_mission_nonexistent_file() {
658 let result = MissionLoader::load_from_file("/nonexistent/path/mission.yaml");
659 assert!(result.is_err());
660 }
661
662 #[test]
663 fn test_validate_mission_success() {
664 let mission = Mission {
665 version: "1.0".to_string(),
666 name: "Valid Mission".to_string(),
667 description: None,
668 steps: vec![
669 MissionStep {
670 id: "step1".to_string(),
671 name: "Step 1".to_string(),
672 step_type: StepType::Noop,
673 depends_on: None,
674 timeout_seconds: None,
675 continue_on_error: None,
676 parameters: serde_json::json!({}),
677 },
678 MissionStep {
679 id: "step2".to_string(),
680 name: "Step 2".to_string(),
681 step_type: StepType::Noop,
682 depends_on: Some(vec!["step1".to_string()]),
683 timeout_seconds: None,
684 continue_on_error: None,
685 parameters: serde_json::json!({}),
686 },
687 ],
688 config: None,
689 };
690
691 let result = MissionLoader::validate_mission(&mission);
692 assert!(result.is_ok());
693 }
694
695 #[test]
696 fn test_validate_mission_empty_steps() {
697 let mission = Mission {
698 version: "1.0".to_string(),
699 name: "Empty Mission".to_string(),
700 description: None,
701 steps: vec![],
702 config: None,
703 };
704
705 let result = MissionLoader::validate_mission(&mission);
706 assert!(result.is_err());
707 assert!(result.unwrap_err().to_string().contains("Mission must have at least one step"));
708 }
709
710 #[test]
711 fn test_validate_mission_duplicate_ids() {
712 let mission = Mission {
713 version: "1.0".to_string(),
714 name: "Duplicate ID Mission".to_string(),
715 description: None,
716 steps: vec![
717 MissionStep {
718 id: "step1".to_string(),
719 name: "First Step".to_string(),
720 step_type: StepType::Noop,
721 depends_on: None,
722 timeout_seconds: None,
723 continue_on_error: None,
724 parameters: serde_json::json!({}),
725 },
726 MissionStep {
727 id: "step1".to_string(), name: "Second Step".to_string(),
729 step_type: StepType::Noop,
730 depends_on: None,
731 timeout_seconds: None,
732 continue_on_error: None,
733 parameters: serde_json::json!({}),
734 },
735 ],
736 config: None,
737 };
738
739 let result = MissionLoader::validate_mission(&mission);
740 assert!(result.is_err());
741 assert!(result.unwrap_err().to_string().contains("Duplicate step ID"));
742 }
743
744 #[test]
745 fn test_validate_mission_missing_dependency() {
746 let mission = Mission {
747 version: "1.0".to_string(),
748 name: "Missing Dependency Mission".to_string(),
749 description: None,
750 steps: vec![
751 MissionStep {
752 id: "step1".to_string(),
753 name: "Step 1".to_string(),
754 step_type: StepType::Noop,
755 depends_on: Some(vec!["nonexistent".to_string()]),
756 timeout_seconds: None,
757 continue_on_error: None,
758 parameters: serde_json::json!({}),
759 },
760 ],
761 config: None,
762 };
763
764 let result = MissionLoader::validate_mission(&mission);
765 assert!(result.is_err());
766 assert!(result.unwrap_err().to_string().contains("depends on non-existent step"));
767 }
768 }
769
770 mod dag_executor_tests {
772 use super::*;
773
774 #[tokio::test]
775 async fn test_topological_sort_simple() {
776 let steps = vec![
778 MissionStep {
779 id: "step1".to_string(),
780 name: "First".to_string(),
781 step_type: StepType::Noop,
782 depends_on: None,
783 timeout_seconds: None,
784 continue_on_error: None,
785 parameters: serde_json::json!({}),
786 },
787 MissionStep {
788 id: "step2".to_string(),
789 name: "Second".to_string(),
790 step_type: StepType::Noop,
791 depends_on: Some(vec!["step1".to_string()]),
792 timeout_seconds: None,
793 continue_on_error: None,
794 parameters: serde_json::json!({}),
795 },
796 MissionStep {
797 id: "step3".to_string(),
798 name: "Third".to_string(),
799 step_type: StepType::Noop,
800 depends_on: Some(vec!["step2".to_string()]),
801 timeout_seconds: None,
802 continue_on_error: None,
803 parameters: serde_json::json!({}),
804 },
805 ];
806
807 let order = DagExecutor::topological_sort(&steps).unwrap();
808 assert_eq!(order, vec!["step1", "step2", "step3"]);
809 }
810
811 #[tokio::test]
812 async fn test_topological_sort_circular_dependency() {
813 let steps = vec![
815 MissionStep {
816 id: "step1".to_string(),
817 name: "First".to_string(),
818 step_type: StepType::Noop,
819 depends_on: Some(vec!["step2".to_string()]),
820 timeout_seconds: None,
821 continue_on_error: None,
822 parameters: serde_json::json!({}),
823 },
824 MissionStep {
825 id: "step2".to_string(),
826 name: "Second".to_string(),
827 step_type: StepType::Noop,
828 depends_on: Some(vec!["step1".to_string()]),
829 timeout_seconds: None,
830 continue_on_error: None,
831 parameters: serde_json::json!({}),
832 },
833 ];
834
835 let result = DagExecutor::topological_sort(&steps);
836 assert!(result.is_err());
837 assert!(result.unwrap_err().to_string().contains("Circular dependency"));
838 }
839
840 #[tokio::test]
841 async fn test_execute_mission_simple_success() {
842 let mission = Mission {
844 version: "1.0".to_string(),
845 name: "Simple Success Mission".to_string(),
846 description: Some("A simple successful mission".to_string()),
847 steps: vec![
848 MissionStep {
849 id: "noop1".to_string(),
850 name: "First Noop".to_string(),
851 step_type: StepType::Noop,
852 depends_on: None,
853 timeout_seconds: Some(10),
854 continue_on_error: None,
855 parameters: serde_json::json!({}),
856 },
857 MissionStep {
858 id: "noop2".to_string(),
859 name: "Second Noop".to_string(),
860 step_type: StepType::Noop,
861 depends_on: Some(vec!["noop1".to_string()]),
862 timeout_seconds: Some(10),
863 continue_on_error: None,
864 parameters: serde_json::json!({}),
865 },
866 ],
867 config: Some(MissionConfig {
868 max_parallel_steps: Some(1),
869 timeout_seconds: Some(60),
870 fail_fast: Some(true),
871 }),
872 };
873
874 let result = DagExecutor::execute_mission(mission).await.unwrap();
875 assert!(matches!(result.status, MissionStatus::Completed));
876 assert_eq!(result.step_results.len(), 2);
877 assert!(result.step_results.contains_key("noop1"));
878 assert!(result.step_results.contains_key("noop2"));
879 assert!(result.total_duration_ms >= 0);
881 }
882
883 #[tokio::test]
884 async fn test_execute_mission_empty() {
885 let mission = Mission {
887 version: "1.0".to_string(),
888 name: "Empty Mission".to_string(),
889 description: None,
890 steps: vec![],
891 config: None,
892 };
893
894 let result = DagExecutor::execute_mission(mission).await;
895 assert!(result.is_err());
896 assert!(result.unwrap_err().to_string().contains("Cannot execute empty mission"));
897 }
898
899 #[tokio::test]
900 async fn test_execute_mission_fail_fast() {
901 let mission = Mission {
903 version: "1.0".to_string(),
904 name: "Fail Fast Mission".to_string(),
905 description: None,
906 steps: vec![
907 MissionStep {
908 id: "failing_step".to_string(),
909 name: "Failing Step".to_string(),
910 step_type: StepType::Command,
911 depends_on: None,
912 timeout_seconds: Some(5),
913 continue_on_error: None,
914 parameters: serde_json::json!({
915 "command": "invalid_command_that_does_not_exist",
916 "args": []
917 }),
918 },
919 MissionStep {
920 id: "should_not_run".to_string(),
921 name: "Should Not Run".to_string(),
922 step_type: StepType::Noop,
923 depends_on: Some(vec!["failing_step".to_string()]),
924 timeout_seconds: Some(5),
925 continue_on_error: None,
926 parameters: serde_json::json!({}),
927 },
928 ],
929 config: Some(MissionConfig {
930 max_parallel_steps: Some(1),
931 timeout_seconds: Some(30),
932 fail_fast: Some(true),
933 }),
934 };
935
936 let result = DagExecutor::execute_mission(mission).await;
937 assert!(result.is_err());
939 }
940 }
941
942 mod execution_context_tests {
944 use super::*;
945
946 #[test]
947 fn test_execution_context_creation() {
948 let context = ExecutionContext::new();
950 assert!(context.variables.is_empty());
951 assert!(context.environment.is_empty());
952 }
953
954 #[test]
955 fn test_execution_context_variables() {
956 let mut context = ExecutionContext::new();
958
959 context.set_variable("key1", "value1");
960 context.set_variable("key2", "value2");
961
962 assert_eq!(context.get_variable("key1"), Some(&"value1".to_string()));
963 assert_eq!(context.get_variable("key2"), Some(&"value2".to_string()));
964 assert_eq!(context.get_variable("nonexistent"), None);
965 }
966
967 #[test]
968 fn test_execution_context_variable_override() {
969 let mut context = ExecutionContext::new();
971
972 context.set_variable("key", "original");
973 assert_eq!(context.get_variable("key"), Some(&"original".to_string()));
974
975 context.set_variable("key", "updated");
976 assert_eq!(context.get_variable("key"), Some(&"updated".to_string()));
977 }
978 }
979
980 mod step_execution_tests {
982 use super::*;
983 use tempfile::TempDir;
984
985 #[tokio::test]
986 async fn test_execute_noop_step() {
987 let step = MissionStep {
989 id: "noop_test".to_string(),
990 name: "Test Noop".to_string(),
991 step_type: StepType::Noop,
992 depends_on: None,
993 timeout_seconds: None,
994 continue_on_error: None,
995 parameters: serde_json::json!({}),
996 };
997
998 let mut context = ExecutionContext::new();
999 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1000
1001 assert_eq!(result.step_id, "noop_test");
1002 assert!(matches!(result.status, StepStatus::Success));
1003 assert!(result.output.get("message").is_some());
1004 assert!(result.error.is_none());
1005 }
1006
1007 #[tokio::test]
1008 async fn test_execute_create_file_step() {
1009 let temp_dir = TempDir::new().unwrap();
1011 let file_path = temp_dir.path().join("test_file.txt");
1012
1013 let step = MissionStep {
1014 id: "create_file_test".to_string(),
1015 name: "Test Create File".to_string(),
1016 step_type: StepType::CreateFile,
1017 depends_on: None,
1018 timeout_seconds: None,
1019 continue_on_error: None,
1020 parameters: serde_json::json!({
1021 "path": file_path.to_str().unwrap(),
1022 "content": "Hello, World!\nThis is a test file."
1023 }),
1024 };
1025
1026 let mut context = ExecutionContext::new();
1027 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1028
1029 assert_eq!(result.step_id, "create_file_test");
1030 assert!(matches!(result.status, StepStatus::Success));
1031 assert!(result.error.is_none());
1032
1033 let content = std::fs::read_to_string(&file_path).unwrap();
1035 assert_eq!(content, "Hello, World!\nThis is a test file.");
1036 }
1037
1038 #[tokio::test]
1039 async fn test_execute_create_file_missing_path() {
1040 let step = MissionStep {
1042 id: "invalid_create_file".to_string(),
1043 name: "Invalid Create File".to_string(),
1044 step_type: StepType::CreateFile,
1045 depends_on: None,
1046 timeout_seconds: None,
1047 continue_on_error: None,
1048 parameters: serde_json::json!({
1049 "content": "Some content"
1050 }),
1052 };
1053
1054 let mut context = ExecutionContext::new();
1055 let result = DagExecutor::execute_step(&step, &mut context).await;
1056
1057 assert!(result.is_err());
1058 assert!(result.unwrap_err().to_string().contains("Missing 'path' parameter"));
1059 }
1060
1061 #[tokio::test]
1062 async fn test_execute_command_step_success() {
1063 let step = MissionStep {
1065 id: "command_test".to_string(),
1066 name: "Test Command".to_string(),
1067 step_type: StepType::Command,
1068 depends_on: None,
1069 timeout_seconds: None,
1070 continue_on_error: None,
1071 parameters: serde_json::json!({
1072 "command": "echo",
1073 "args": ["Hello", "World"]
1074 }),
1075 };
1076
1077 let mut context = ExecutionContext::new();
1078 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1079
1080 assert!(matches!(result.status, StepStatus::Success));
1081 assert!(result.output["stdout"].as_str().unwrap().contains("Hello World"));
1082 assert_eq!(result.output["exit_code"].as_i64().unwrap(), 0);
1083 }
1084
1085 #[tokio::test]
1086 async fn test_execute_command_step_failure() {
1087 let step = MissionStep {
1089 id: "failing_command".to_string(),
1090 name: "Failing Command".to_string(),
1091 step_type: StepType::Command,
1092 depends_on: None,
1093 timeout_seconds: None,
1094 continue_on_error: None,
1095 parameters: serde_json::json!({
1096 "command": "false", "args": []
1098 }),
1099 };
1100
1101 let mut context = ExecutionContext::new();
1102 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1103
1104 assert!(matches!(result.status, StepStatus::Failed));
1105 assert!(result.error.is_some());
1106 }
1107
1108 #[tokio::test]
1109 async fn test_execute_edit_file_step() {
1110 let temp_dir = TempDir::new().unwrap();
1112 let file_path = temp_dir.path().join("edit_test.txt");
1113
1114 std::fs::write(&file_path, "Initial content\n").unwrap();
1116
1117 let step = MissionStep {
1118 id: "edit_file_test".to_string(),
1119 name: "Test Edit File".to_string(),
1120 step_type: StepType::EditFile,
1121 depends_on: None,
1122 timeout_seconds: None,
1123 continue_on_error: None,
1124 parameters: serde_json::json!({
1125 "path": file_path.to_str().unwrap(),
1126 "content": "New content",
1127 "append": false
1128 }),
1129 };
1130
1131 let mut context = ExecutionContext::new();
1132 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1133
1134 assert!(matches!(result.status, StepStatus::Success));
1135 assert_eq!(std::fs::read_to_string(&file_path).unwrap(), "New content");
1136 }
1137
1138 #[tokio::test]
1139 async fn test_execute_edit_file_append() {
1140 let temp_dir = TempDir::new().unwrap();
1142 let file_path = temp_dir.path().join("append_test.txt");
1143
1144 std::fs::write(&file_path, "Initial content\n").unwrap();
1146
1147 let step = MissionStep {
1148 id: "append_file_test".to_string(),
1149 name: "Test Append File".to_string(),
1150 step_type: StepType::EditFile,
1151 depends_on: None,
1152 timeout_seconds: None,
1153 continue_on_error: None,
1154 parameters: serde_json::json!({
1155 "path": file_path.to_str().unwrap(),
1156 "content": "Appended content\n",
1157 "append": true
1158 }),
1159 };
1160
1161 let mut context = ExecutionContext::new();
1162 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1163
1164 assert!(matches!(result.status, StepStatus::Success));
1165 let content = std::fs::read_to_string(&file_path).unwrap();
1166 assert!(content.contains("Initial content"));
1167 assert!(content.contains("Appended content"));
1168 }
1169
1170 #[tokio::test]
1171 async fn test_execute_delete_file_step() {
1172 let temp_dir = TempDir::new().unwrap();
1174 let file_path = temp_dir.path().join("delete_test.txt");
1175
1176 std::fs::write(&file_path, "Content to delete").unwrap();
1178 assert!(file_path.exists());
1179
1180 let step = MissionStep {
1181 id: "delete_file_test".to_string(),
1182 name: "Test Delete File".to_string(),
1183 step_type: StepType::DeleteFile,
1184 depends_on: None,
1185 timeout_seconds: None,
1186 continue_on_error: None,
1187 parameters: serde_json::json!({
1188 "path": file_path.to_str().unwrap()
1189 }),
1190 };
1191
1192 let mut context = ExecutionContext::new();
1193 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1194
1195 assert!(matches!(result.status, StepStatus::Success));
1196 assert!(!file_path.exists());
1197 assert_eq!(result.output["existed"].as_bool().unwrap(), true);
1198 assert_eq!(result.output["deleted"].as_bool().unwrap(), true);
1199 }
1200
1201 #[tokio::test]
1202 async fn test_execute_delete_nonexistent_file() {
1203 let temp_dir = TempDir::new().unwrap();
1205 let file_path = temp_dir.path().join("nonexistent.txt");
1206
1207 let step = MissionStep {
1208 id: "delete_nonexistent".to_string(),
1209 name: "Delete Nonexistent File".to_string(),
1210 step_type: StepType::DeleteFile,
1211 depends_on: None,
1212 timeout_seconds: None,
1213 continue_on_error: None,
1214 parameters: serde_json::json!({
1215 "path": file_path.to_str().unwrap()
1216 }),
1217 };
1218
1219 let mut context = ExecutionContext::new();
1220 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1221
1222 assert!(matches!(result.status, StepStatus::Success));
1223 assert_eq!(result.output["existed"].as_bool().unwrap(), false);
1224 assert_eq!(result.output["deleted"].as_bool().unwrap(), false);
1225 }
1226
1227 #[tokio::test]
1228 async fn test_execute_command_with_working_dir() {
1229 let temp_dir = TempDir::new().unwrap();
1231
1232 let step = MissionStep {
1234 id: "echo_command".to_string(),
1235 name: "Echo Command".to_string(),
1236 step_type: StepType::Command,
1237 depends_on: None,
1238 timeout_seconds: None,
1239 continue_on_error: None,
1240 parameters: serde_json::json!({
1241 "command": "echo",
1242 "args": ["working_directory_test"],
1243 "working_dir": temp_dir.path().to_str().unwrap()
1244 }),
1245 };
1246
1247 let mut context = ExecutionContext::new();
1248 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1249
1250 assert!(matches!(result.status, StepStatus::Success));
1251 let stdout = result.output["stdout"].as_str().unwrap();
1252 assert!(stdout.contains("working_directory_test"));
1253 }
1254
1255 #[tokio::test]
1256 async fn test_step_result_duration_tracking() {
1257 let step = MissionStep {
1259 id: "duration_test".to_string(),
1260 name: "Duration Test".to_string(),
1261 step_type: StepType::Noop,
1262 depends_on: None,
1263 timeout_seconds: None,
1264 continue_on_error: None,
1265 parameters: serde_json::json!({}),
1266 };
1267
1268 let mut context = ExecutionContext::new();
1269 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1270
1271 assert!(result.duration_ms >= 0);
1273 }
1274 }
1275
1276 mod feature_gated_tests {
1278 use super::*;
1279
1280 #[cfg(not(feature = "llm"))]
1281 #[tokio::test]
1282 async fn test_http_step_without_llm_feature() {
1283 let step = MissionStep {
1285 id: "http_disabled".to_string(),
1286 name: "HTTP Disabled Test".to_string(),
1287 step_type: StepType::Http,
1288 depends_on: None,
1289 timeout_seconds: None,
1290 continue_on_error: None,
1291 parameters: serde_json::json!({
1292 "url": "https://httpbin.org/get",
1293 "method": "GET"
1294 }),
1295 };
1296
1297 let mut context = ExecutionContext::new();
1298 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1299
1300 assert!(matches!(result.status, StepStatus::Skipped));
1301 assert!(result.output["message"].as_str().unwrap().contains("HTTP support requires 'llm' feature"));
1302 }
1303
1304 #[cfg(not(feature = "llm"))]
1305 #[tokio::test]
1306 async fn test_llm_step_without_llm_feature() {
1307 let step = MissionStep {
1309 id: "llm_disabled".to_string(),
1310 name: "LLM Disabled Test".to_string(),
1311 step_type: StepType::Llm,
1312 depends_on: None,
1313 timeout_seconds: None,
1314 continue_on_error: None,
1315 parameters: serde_json::json!({
1316 "prompt": "Hello, world!"
1317 }),
1318 };
1319
1320 let mut context = ExecutionContext::new();
1321 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1322
1323 assert!(matches!(result.status, StepStatus::Failed));
1324 assert_eq!(result.error.as_ref().unwrap(), "LLM feature not enabled");
1325 }
1326
1327 #[cfg(not(feature = "tools"))]
1328 #[tokio::test]
1329 async fn test_tool_step_without_tools_feature() {
1330 let step = MissionStep {
1332 id: "tool_disabled".to_string(),
1333 name: "Tool Disabled Test".to_string(),
1334 step_type: StepType::Tool,
1335 depends_on: None,
1336 timeout_seconds: None,
1337 continue_on_error: None,
1338 parameters: serde_json::json!({
1339 "tool": "test_tool",
1340 "parameters": {}
1341 }),
1342 };
1343
1344 let mut context = ExecutionContext::new();
1345 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1346
1347 assert!(matches!(result.status, StepStatus::Failed));
1348 assert_eq!(result.error.as_ref().unwrap(), "Tools feature not enabled");
1349 }
1350
1351 #[cfg(not(feature = "rag"))]
1352 #[tokio::test]
1353 async fn test_rag_query_step_without_rag_feature() {
1354 let step = MissionStep {
1356 id: "rag_query_disabled".to_string(),
1357 name: "RAG Query Disabled Test".to_string(),
1358 step_type: StepType::RagQuery,
1359 depends_on: None,
1360 timeout_seconds: None,
1361 continue_on_error: None,
1362 parameters: serde_json::json!({
1363 "query": "test query"
1364 }),
1365 };
1366
1367 let mut context = ExecutionContext::new();
1368 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1369
1370 assert!(matches!(result.status, StepStatus::Failed));
1371 assert_eq!(result.error.as_ref().unwrap(), "RAG feature not enabled");
1372 }
1373
1374 #[cfg(not(feature = "rag"))]
1375 #[tokio::test]
1376 async fn test_rag_add_step_without_rag_feature() {
1377 let step = MissionStep {
1379 id: "rag_add_disabled".to_string(),
1380 name: "RAG Add Disabled Test".to_string(),
1381 step_type: StepType::RagAdd,
1382 depends_on: None,
1383 timeout_seconds: None,
1384 continue_on_error: None,
1385 parameters: serde_json::json!({
1386 "id": "doc1",
1387 "content": "test content"
1388 }),
1389 };
1390
1391 let mut context = ExecutionContext::new();
1392 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1393
1394 assert!(matches!(result.status, StepStatus::Failed));
1395 assert_eq!(result.error.as_ref().unwrap(), "RAG feature not enabled");
1396 }
1397
1398 #[cfg(not(feature = "chain"))]
1399 #[tokio::test]
1400 async fn test_chain_step_without_chain_feature() {
1401 let step = MissionStep {
1403 id: "chain_disabled".to_string(),
1404 name: "Chain Disabled Test".to_string(),
1405 step_type: StepType::Chain,
1406 depends_on: None,
1407 timeout_seconds: None,
1408 continue_on_error: None,
1409 parameters: serde_json::json!({
1410 "type": "sequential",
1411 "prompt": "test prompt"
1412 }),
1413 };
1414
1415 let mut context = ExecutionContext::new();
1416 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1417
1418 assert!(matches!(result.status, StepStatus::Failed));
1419 assert_eq!(result.error.as_ref().unwrap(), "Chain feature not enabled");
1420 }
1421
1422 #[cfg(not(feature = "agent"))]
1423 #[tokio::test]
1424 async fn test_agent_step_without_agent_feature() {
1425 let step = MissionStep {
1427 id: "agent_disabled".to_string(),
1428 name: "Agent Disabled Test".to_string(),
1429 step_type: StepType::Agent,
1430 depends_on: None,
1431 timeout_seconds: None,
1432 continue_on_error: None,
1433 parameters: serde_json::json!({
1434 "objective": "test objective",
1435 "name": "test_agent"
1436 }),
1437 };
1438
1439 let mut context = ExecutionContext::new();
1440 let result = DagExecutor::execute_step(&step, &mut context).await.unwrap();
1441
1442 assert!(matches!(result.status, StepStatus::Failed));
1443 assert_eq!(result.error.as_ref().unwrap(), "Agent feature not enabled");
1444 }
1445 }
1446}
1447
1448#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1449pub struct MissionConfig {
1450 pub max_parallel_steps: Option<usize>,
1451 pub timeout_seconds: Option<u64>,
1452 pub fail_fast: Option<bool>,
1453}
1454
1455pub struct MissionLoader;
1456
1457impl MissionLoader {
1458 pub fn load_from_file(path: &str) -> anyhow::Result<Mission> {
1459 if path.is_empty() {
1460 return Err(anyhow::anyhow!("Mission path must not be empty"));
1461 }
1462
1463 let content = std::fs::read_to_string(path)?;
1464
1465 let mission: Mission = if path.ends_with(".json") {
1467 serde_json::from_str(&content)?
1468 } else {
1469 serde_yaml::from_str(&content)?
1470 };
1471
1472 Self::validate_mission(&mission)?;
1474
1475 Ok(mission)
1476 }
1477
1478 pub fn validate_mission(mission: &Mission) -> anyhow::Result<()> {
1479 if mission.steps.is_empty() {
1480 return Err(anyhow::anyhow!("Mission must have at least one step"));
1481 }
1482
1483 let mut seen_ids = std::collections::HashSet::new();
1485 for step in &mission.steps {
1486 if !seen_ids.insert(&step.id) {
1487 return Err(anyhow::anyhow!("Duplicate step ID: {}", step.id));
1488 }
1489 }
1490
1491 for step in &mission.steps {
1493 if let Some(deps) = &step.depends_on {
1494 for dep in deps {
1495 if !mission.steps.iter().any(|s| s.id == *dep) {
1496 return Err(anyhow::anyhow!(
1497 "Step {} depends on non-existent step: {}",
1498 step.id,
1499 dep
1500 ));
1501 }
1502 }
1503 }
1504 }
1505
1506 Ok(())
1507 }
1508}
1509
1510pub struct DagExecutor;
1511
1512impl DagExecutor {
1513 pub async fn execute_mission(mission: Mission) -> anyhow::Result<MissionResult> {
1514 if mission.steps.is_empty() {
1515 return Err(anyhow::anyhow!("Cannot execute empty mission"));
1516 }
1517
1518 let start_time = Instant::now();
1519
1520 let execution_order = Self::topological_sort(&mission.steps)?;
1522
1523 let mut results: HashMap<String, StepResult> = HashMap::new();
1524 let mut completed = std::collections::HashSet::new();
1525 let mut context = ExecutionContext::new();
1526
1527 let fail_fast = mission
1529 .config
1530 .as_ref()
1531 .and_then(|c| c.fail_fast)
1532 .unwrap_or(true);
1533
1534 info!(
1535 "Executing mission '{}' with {} steps",
1536 mission.name,
1537 execution_order.len()
1538 );
1539
1540 for step_id in execution_order {
1541 let step = mission.steps.iter().find(|s| s.id == step_id).unwrap();
1542
1543 debug!("Executing step: {} ({})", step.id, step.name);
1544
1545 if let Some(deps) = &step.depends_on {
1547 for dep in deps {
1548 if !completed.contains(dep) {
1549 return Err(anyhow::anyhow!(
1550 "Dependency {} not completed for step {}",
1551 dep,
1552 step.id
1553 ));
1554 }
1555
1556 if let Some(dep_result) = results.get(dep) {
1558 if matches!(dep_result.status, StepStatus::Failed) {
1559 let should_continue = step.continue_on_error.unwrap_or(false);
1561 if !should_continue && fail_fast {
1562 warn!("Skipping step {} due to failed dependency {}", step.id, dep);
1563 results.insert(
1564 step_id.clone(),
1565 StepResult {
1566 step_id: step.id.clone(),
1567 status: StepStatus::Skipped,
1568 output: serde_json::json!({"reason": "dependency failed"}),
1569 error: Some(format!("Dependency {} failed", dep)),
1570 duration_ms: 0,
1571 },
1572 );
1573 continue;
1574 }
1575 }
1576 }
1577 }
1578 }
1579
1580 let timeout = step
1582 .timeout_seconds
1583 .or(mission.config.as_ref().and_then(|c| c.timeout_seconds))
1584 .unwrap_or(300);
1585
1586 let step_start = Instant::now();
1587
1588 let result = match tokio::time::timeout(
1589 std::time::Duration::from_secs(timeout),
1590 Self::execute_step(step, &mut context),
1591 )
1592 .await
1593 {
1594 Ok(Ok(result)) => result,
1595 Ok(Err(e)) => {
1596 error!("Step {} failed: {}", step.id, e);
1597
1598 let should_continue = step.continue_on_error.unwrap_or(false);
1600 if !should_continue && fail_fast {
1601 return Err(e);
1602 }
1603 StepResult {
1604 step_id: step.id.clone(),
1605 status: StepStatus::Failed,
1606 output: serde_json::json!({"error": e.to_string()}),
1607 error: Some(e.to_string()),
1608 duration_ms: step_start.elapsed().as_millis() as u64,
1609 }
1610 }
1611 Err(_) => {
1612 error!("Step {} timed out after {} seconds", step.id, timeout);
1613 if fail_fast {
1614 return Err(anyhow::anyhow!("Step {} timed out", step.id));
1615 }
1616 StepResult {
1617 step_id: step.id.clone(),
1618 status: StepStatus::Failed,
1619 output: serde_json::json!({"error": "timeout"}),
1620 error: Some(format!("Timed out after {} seconds", timeout)),
1621 duration_ms: timeout * 1000,
1622 }
1623 }
1624 };
1625
1626 info!(
1627 "Step {} completed with status: {:?}",
1628 step.id, result.status
1629 );
1630
1631 results.insert(step_id.clone(), result);
1632 completed.insert(step_id);
1633 }
1634
1635 let has_failures = results
1637 .values()
1638 .any(|r| matches!(r.status, StepStatus::Failed));
1639 let all_skipped = results
1640 .values()
1641 .all(|r| matches!(r.status, StepStatus::Skipped));
1642
1643 let status = if has_failures {
1644 MissionStatus::Failed
1645 } else if all_skipped {
1646 MissionStatus::Cancelled
1647 } else {
1648 MissionStatus::Completed
1649 };
1650
1651 Ok(MissionResult {
1652 mission_id: Uuid::new_v4(),
1653 status,
1654 step_results: results,
1655 total_duration_ms: start_time.elapsed().as_millis() as u64,
1656 })
1657 }
1658
1659 fn topological_sort(steps: &[MissionStep]) -> anyhow::Result<Vec<String>> {
1660 let mut in_degree = HashMap::new();
1661 let mut graph = HashMap::new();
1662
1663 for step in steps {
1665 in_degree.insert(step.id.clone(), 0);
1666 graph.insert(step.id.clone(), Vec::new());
1667 }
1668
1669 for step in steps {
1671 if let Some(deps) = &step.depends_on {
1672 for dep in deps {
1673 if let Some(dep_list) = graph.get_mut(dep) {
1674 dep_list.push(step.id.clone());
1675 } else {
1676 return Err(anyhow::anyhow!("Dependency '{}' not found for step '{}'", dep, step.id));
1677 }
1678 if let Some(degree) = in_degree.get_mut(&step.id) {
1679 *degree += 1;
1680 } else {
1681 return Err(anyhow::anyhow!("Step '{}' not found in dependency graph", step.id));
1682 }
1683 }
1684 }
1685 }
1686
1687 let mut queue = std::collections::VecDeque::new();
1689 let mut result = Vec::new();
1690
1691 for (node, °ree) in &in_degree {
1692 if degree == 0 {
1693 queue.push_back(node.clone());
1694 }
1695 }
1696
1697 while let Some(node) = queue.pop_front() {
1698 result.push(node.clone());
1699
1700 for neighbor in &graph[&node] {
1701 let degree = in_degree.get_mut(neighbor).unwrap();
1702 *degree -= 1;
1703 if *degree == 0 {
1704 queue.push_back(neighbor.clone());
1705 }
1706 }
1707 }
1708
1709 if result.len() != steps.len() {
1710 return Err(anyhow::anyhow!("Circular dependency detected"));
1711 }
1712
1713 Ok(result)
1714 }
1715
1716 pub async fn execute_step(
1717 step: &MissionStep,
1718 context: &mut ExecutionContext,
1719 ) -> anyhow::Result<StepResult> {
1720 let start = Instant::now();
1721
1722 let result = match step.step_type {
1723 StepType::Noop => {
1724 debug!("Executing NOOP step");
1725 Ok(StepResult {
1726 step_id: step.id.clone(),
1727 status: StepStatus::Success,
1728 output: serde_json::json!({"message": "No operation performed"}),
1729 error: None,
1730 duration_ms: 0,
1731 })
1732 }
1733
1734 StepType::CreateFile => {
1735 let path = step
1736 .parameters
1737 .get("path")
1738 .and_then(|v| v.as_str())
1739 .ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
1740
1741 let sanitized_path = sanitize_file_path(path)?;
1743
1744 let content = step
1745 .parameters
1746 .get("content")
1747 .and_then(|v| v.as_str())
1748 .unwrap_or("");
1749
1750 let processed_content = context.substitute_variables(content);
1752
1753 debug!("Creating file: {} (content size: {} -> {})", sanitized_path, content.len(), processed_content.len());
1754
1755 if let Some(parent) = std::path::Path::new(&sanitized_path).parent() {
1757 tokio::fs::create_dir_all(parent).await?;
1758 }
1759
1760 tokio::fs::write(&sanitized_path, &processed_content).await?;
1761
1762 Ok(StepResult {
1763 step_id: step.id.clone(),
1764 status: StepStatus::Success,
1765 output: serde_json::json!({
1766 "path": path,
1767 "size": processed_content.len(),
1768 "created": true,
1769 "variables_substituted": processed_content != content
1770 }),
1771 error: None,
1772 duration_ms: start.elapsed().as_millis() as u64,
1773 })
1774 }
1775
1776 StepType::EditFile => {
1777 let path = step
1778 .parameters
1779 .get("path")
1780 .and_then(|v| v.as_str())
1781 .ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
1782
1783 let content = step.parameters.get("content").and_then(|v| v.as_str());
1784
1785 let append = step
1786 .parameters
1787 .get("append")
1788 .and_then(|v| v.as_bool())
1789 .unwrap_or(false);
1790
1791 debug!("Editing file: {} (append: {})", path, append);
1792
1793 if append && content.is_some() {
1794 let mut existing = tokio::fs::read_to_string(path).await.unwrap_or_default();
1796 existing.push_str(content.unwrap());
1797 tokio::fs::write(path, existing).await?;
1798 } else if let Some(content) = content {
1799 tokio::fs::write(path, content).await?;
1801 }
1802
1803 let metadata = tokio::fs::metadata(path).await?;
1804
1805 Ok(StepResult {
1806 step_id: step.id.clone(),
1807 status: StepStatus::Success,
1808 output: serde_json::json!({
1809 "path": path,
1810 "size": metadata.len(),
1811 "modified": true
1812 }),
1813 error: None,
1814 duration_ms: start.elapsed().as_millis() as u64,
1815 })
1816 }
1817
1818 StepType::DeleteFile => {
1819 let path = step
1820 .parameters
1821 .get("path")
1822 .and_then(|v| v.as_str())
1823 .ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
1824
1825 debug!("Deleting file: {}", path);
1826
1827 let existed = std::path::Path::new(path).exists();
1828 if existed {
1829 tokio::fs::remove_file(path).await?;
1830 }
1831
1832 Ok(StepResult {
1833 step_id: step.id.clone(),
1834 status: StepStatus::Success,
1835 output: serde_json::json!({
1836 "path": path,
1837 "existed": existed,
1838 "deleted": existed
1839 }),
1840 error: None,
1841 duration_ms: start.elapsed().as_millis() as u64,
1842 })
1843 }
1844
1845 StepType::Command => {
1846 let command = step
1847 .parameters
1848 .get("command")
1849 .and_then(|v| v.as_str())
1850 .ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
1851
1852 let args = step
1853 .parameters
1854 .get("args")
1855 .and_then(|v| v.as_array())
1856 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>())
1857 .unwrap_or_default();
1858
1859 sanitize_command(command, &args)?;
1861
1862 let working_dir = step.parameters.get("working_dir").and_then(|v| v.as_str());
1863
1864 debug!("Executing command: {} {:?}", command, args);
1865
1866 let mut cmd = Command::new(command);
1867 cmd.args(&args);
1868
1869 if let Some(dir) = working_dir {
1870 cmd.current_dir(dir);
1871 }
1872
1873 for (key, value) in &context.environment {
1875 cmd.env(key, value);
1876 }
1877
1878 let output = cmd.output().await?;
1879
1880 let stdout = String::from_utf8_lossy(&output.stdout);
1881 let stderr = String::from_utf8_lossy(&output.stderr);
1882
1883 Ok(StepResult {
1884 step_id: step.id.clone(),
1885 status: if output.status.success() {
1886 StepStatus::Success
1887 } else {
1888 StepStatus::Failed
1889 },
1890 output: serde_json::json!({
1891 "command": command,
1892 "args": args,
1893 "exit_code": output.status.code(),
1894 "stdout": stdout,
1895 "stderr": stderr
1896 }),
1897 error: if !output.status.success() {
1898 Some(format!(
1899 "Command failed with exit code {:?}",
1900 output.status.code()
1901 ))
1902 } else {
1903 None
1904 },
1905 duration_ms: start.elapsed().as_millis() as u64,
1906 })
1907 }
1908
1909 StepType::Http => {
1910 #[cfg(feature = "llm")]
1911 {
1912 let url = step
1913 .parameters
1914 .get("url")
1915 .and_then(|v| v.as_str())
1916 .ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter"))?;
1917
1918 let method = step
1919 .parameters
1920 .get("method")
1921 .and_then(|v| v.as_str())
1922 .unwrap_or("GET");
1923
1924 let headers = step
1925 .parameters
1926 .get("headers")
1927 .and_then(|v| v.as_object())
1928 .map(|obj| {
1929 obj.iter()
1930 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1931 .collect::<HashMap<_, _>>()
1932 });
1933
1934 let body = step.parameters.get("body");
1935
1936 debug!("HTTP {} to {}", method, url);
1937
1938 let client = reqwest::Client::new();
1939 let mut request = match method.to_uppercase().as_str() {
1940 "GET" => client.get(url),
1941 "POST" => client.post(url),
1942 "PUT" => client.put(url),
1943 "DELETE" => client.delete(url),
1944 "PATCH" => client.patch(url),
1945 _ => return Err(anyhow::anyhow!("Unsupported HTTP method: {}", method)),
1946 };
1947
1948 if let Some(headers) = headers {
1950 for (key, value) in headers {
1951 request = request.header(key, value);
1952 }
1953 }
1954
1955 if let Some(body) = body {
1957 request = request.json(body);
1958 }
1959
1960 let response = request.send().await?;
1961 let status = response.status();
1962 let status_code = status.as_u16();
1963 let response_text = response.text().await.unwrap_or_default();
1964
1965 let response_body = serde_json::from_str::<serde_json::Value>(&response_text)
1967 .unwrap_or_else(|_| serde_json::json!({"text": response_text}));
1968
1969 Ok(StepResult {
1970 step_id: step.id.clone(),
1971 status: if status.is_success() {
1972 StepStatus::Success
1973 } else {
1974 StepStatus::Failed
1975 },
1976 output: serde_json::json!({
1977 "url": url,
1978 "method": method,
1979 "status": status_code,
1980 "response": response_body
1981 }),
1982 error: if !status.is_success() {
1983 Some(format!("HTTP {} returned {}", method, status))
1984 } else {
1985 None
1986 },
1987 duration_ms: start.elapsed().as_millis() as u64,
1988 })
1989 }
1990
1991 #[cfg(not(feature = "llm"))]
1992 {
1993 Ok(StepResult {
1994 step_id: step.id.clone(),
1995 status: StepStatus::Skipped,
1996 output: serde_json::json!({"message": "HTTP support requires 'llm' feature"}),
1997 error: None,
1998 duration_ms: 0,
1999 })
2000 }
2001 }
2002
2003 StepType::Llm => {
2004 #[cfg(feature = "llm")]
2005 {
2006 use crate::llm::{
2007 create_default_llm_manager, ChatMessage, LLMRequest, MessageRole,
2008 };
2009
2010 let prompt = step
2011 .parameters
2012 .get("prompt")
2013 .and_then(|v| v.as_str())
2014 .ok_or_else(|| anyhow::anyhow!("Missing 'prompt' parameter"))?;
2015
2016 let model = step.parameters.get("model").and_then(|v| v.as_str());
2017
2018 let provider = step.parameters.get("provider").and_then(|v| v.as_str());
2019
2020 let temperature = step
2021 .parameters
2022 .get("temperature")
2023 .and_then(|v| v.as_f64())
2024 .map(|t| t as f32);
2025
2026 let max_tokens = step
2027 .parameters
2028 .get("max_tokens")
2029 .and_then(|v| v.as_u64())
2030 .map(|t| t as u32);
2031
2032 debug!("Calling LLM with prompt: {}", prompt);
2033
2034 let manager = create_default_llm_manager()?;
2035
2036 let request = LLMRequest {
2037 messages: vec![ChatMessage {
2038 role: MessageRole::User,
2039 content: prompt.to_string(),
2040 name: None,
2041 tool_calls: None,
2042 tool_call_id: None,
2043 }],
2044 model: model.map(String::from),
2045 temperature,
2046 max_tokens,
2047 stream: false,
2048 tools: None,
2049 metadata: HashMap::new(),
2050 };
2051
2052 let response = manager.complete(request, provider).await?;
2053
2054 context.set_variable(&format!("{}_response", step.id), &response.content);
2056
2057 Ok(StepResult {
2058 step_id: step.id.clone(),
2059 status: StepStatus::Success,
2060 output: serde_json::json!({
2061 "model": response.model,
2062 "content": response.content,
2063 "usage": {
2064 "prompt_tokens": response.usage.prompt_tokens,
2065 "completion_tokens": response.usage.completion_tokens,
2066 "total_tokens": response.usage.total_tokens
2067 }
2068 }),
2069 error: None,
2070 duration_ms: start.elapsed().as_millis() as u64,
2071 })
2072 }
2073
2074 #[cfg(not(feature = "llm"))]
2075 {
2076 Ok(StepResult {
2077 step_id: step.id.clone(),
2078 status: StepStatus::Failed,
2079 output: serde_json::json!({"error": "LLM feature not enabled"}),
2080 error: Some("LLM feature not enabled".to_string()),
2081 duration_ms: 0,
2082 })
2083 }
2084 }
2085
2086 StepType::Tool => {
2087 #[cfg(feature = "tools")]
2088 {
2089 use crate::core::RuntimeContext;
2090 use crate::tools::{create_default_tool_manager, ToolCall};
2091
2092 let tool_name = step
2093 .parameters
2094 .get("tool")
2095 .and_then(|v| v.as_str())
2096 .ok_or_else(|| anyhow::anyhow!("Missing 'tool' parameter"))?;
2097
2098 let tool_params = step
2099 .parameters
2100 .get("parameters")
2101 .cloned()
2102 .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
2103
2104 debug!("Executing tool: {}", tool_name);
2105
2106 let tool_manager = create_default_tool_manager();
2107 let context = RuntimeContext::new();
2108
2109 let call = ToolCall::new(
2110 tool_name.to_string(),
2111 tool_params,
2112 );
2113
2114 let result = tool_manager.execute_tool(call, &context).await?;
2115
2116 Ok(StepResult {
2117 step_id: step.id.clone(),
2118 status: if result.success {
2119 StepStatus::Success
2120 } else {
2121 StepStatus::Failed
2122 },
2123 output: result.output,
2124 error: result.error,
2125 duration_ms: result.execution_time_ms,
2126 })
2127 }
2128
2129 #[cfg(not(feature = "tools"))]
2130 {
2131 Ok(StepResult {
2132 step_id: step.id.clone(),
2133 status: StepStatus::Failed,
2134 output: serde_json::json!({"error": "Tools feature not enabled"}),
2135 error: Some("Tools feature not enabled".to_string()),
2136 duration_ms: 0,
2137 })
2138 }
2139 }
2140
2141 StepType::RagQuery => {
2142 #[cfg(feature = "rag")]
2143 {
2144 use crate::rag::create_default_rag_system;
2145
2146 let query = step
2147 .parameters
2148 .get("query")
2149 .and_then(|v| v.as_str())
2150 .ok_or_else(|| anyhow::anyhow!("Missing 'query' parameter"))?;
2151
2152 let limit = step
2153 .parameters
2154 .get("limit")
2155 .and_then(|v| v.as_u64())
2156 .map(|l| l as usize);
2157
2158 let threshold = step
2159 .parameters
2160 .get("threshold")
2161 .and_then(|v| v.as_f64())
2162 .map(|t| t as f32);
2163
2164 debug!("Querying RAG system: {}", query);
2165
2166 let rag_system = create_default_rag_system()?;
2167 let results = rag_system.search(query, limit, threshold).await?;
2168
2169 if !results.results.is_empty() {
2171 let context_text = results
2172 .results
2173 .iter()
2174 .map(|r| r.chunk.content.clone())
2175 .collect::<Vec<_>>()
2176 .join("\n\n");
2177 context.set_variable(&format!("{}_context", step.id), &context_text);
2178 }
2179
2180 Ok(StepResult {
2181 step_id: step.id.clone(),
2182 status: StepStatus::Success,
2183 output: serde_json::json!({
2184 "query": query,
2185 "results_count": results.results.len(),
2186 "results": results.results.iter().map(|r| serde_json::json!({
2187 "document_id": r.document_id,
2188 "chunk_id": r.chunk.id,
2189 "score": r.similarity_score,
2190 "content_preview": &r.chunk.content[..r.chunk.content.len().min(200)]
2191 })).collect::<Vec<_>>(),
2192 "processing_time_ms": results.processing_time_ms
2193 }),
2194 error: None,
2195 duration_ms: start.elapsed().as_millis() as u64,
2196 })
2197 }
2198
2199 #[cfg(not(feature = "rag"))]
2200 {
2201 Ok(StepResult {
2202 step_id: step.id.clone(),
2203 status: StepStatus::Failed,
2204 output: serde_json::json!({"error": "RAG feature not enabled"}),
2205 error: Some("RAG feature not enabled".to_string()),
2206 duration_ms: 0,
2207 })
2208 }
2209 }
2210
2211 StepType::RagAdd => {
2212 #[cfg(feature = "rag")]
2213 {
2214 use crate::rag::create_default_rag_system;
2215
2216 let document_id = step
2217 .parameters
2218 .get("id")
2219 .and_then(|v| v.as_str())
2220 .ok_or_else(|| anyhow::anyhow!("Missing 'id' parameter"))?;
2221
2222 let content = step
2223 .parameters
2224 .get("content")
2225 .and_then(|v| v.as_str())
2226 .ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2227
2228 let metadata = step
2229 .parameters
2230 .get("metadata")
2231 .and_then(|v| v.as_object())
2232 .map(|obj| obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
2233 .unwrap_or_default();
2234
2235 debug!("Adding document to RAG: {}", document_id);
2236
2237 let mut rag_system = create_default_rag_system()?;
2238 let doc_id = rag_system
2239 .add_document(document_id.to_string(), content.to_string(), metadata)
2240 .await?;
2241
2242 Ok(StepResult {
2243 step_id: step.id.clone(),
2244 status: StepStatus::Success,
2245 output: serde_json::json!({
2246 "document_id": doc_id,
2247 "content_length": content.len(),
2248 "added": true
2249 }),
2250 error: None,
2251 duration_ms: start.elapsed().as_millis() as u64,
2252 })
2253 }
2254
2255 #[cfg(not(feature = "rag"))]
2256 {
2257 Ok(StepResult {
2258 step_id: step.id.clone(),
2259 status: StepStatus::Failed,
2260 output: serde_json::json!({"error": "RAG feature not enabled"}),
2261 error: Some("RAG feature not enabled".to_string()),
2262 duration_ms: 0,
2263 })
2264 }
2265 }
2266
2267 StepType::Chain => {
2268 #[cfg(feature = "chain")]
2269 {
2270 use crate::core::chain::{ChainContext, SequentialChain};
2271 #[cfg(feature = "llm")]
2272 use crate::llm::create_default_llm_manager;
2273
2274 let chain_type = step
2275 .parameters
2276 .get("type")
2277 .and_then(|v| v.as_str())
2278 .unwrap_or("sequential");
2279
2280 if let Some(steps_value) = step.parameters.get("steps") {
2282 use crate::engine::chain_executor::{ChainExecutor, ChainSubStep};
2284
2285 let sub_steps: Vec<ChainSubStep> = serde_json::from_value(steps_value.clone())
2286 .map_err(|e| anyhow::anyhow!("Invalid chain steps format: {}", e))?;
2287
2288 let executor = ChainExecutor::new(format!("chain_{}", step.id));
2289 match executor.execute_chain_steps(&sub_steps, context).await {
2290 Ok(result) => {
2291 context.set_variable(&format!("{}_result", step.id), &result);
2292
2293 return Ok(StepResult {
2294 step_id: step.id.clone(),
2295 status: StepStatus::Success,
2296 output: serde_json::json!({"type": "chain", "result": result}),
2297 error: None,
2298 duration_ms: start.elapsed().as_millis() as u64,
2299 });
2300 }
2301 Err(e) => {
2302 return Ok(StepResult {
2303 step_id: step.id.clone(),
2304 status: StepStatus::Failed,
2305 output: serde_json::json!({"error": e.to_string()}),
2306 error: Some(e.to_string()),
2307 duration_ms: start.elapsed().as_millis() as u64,
2308 });
2309 }
2310 }
2311 }
2312
2313 let prompt = step
2315 .parameters
2316 .get("prompt")
2317 .and_then(|v| v.as_str())
2318 .ok_or_else(|| anyhow::anyhow!("Chain step requires either 'steps' array or 'prompt' parameter"))?;
2319
2320 debug!("Executing chain: {}", chain_type);
2321
2322 let mut chain = SequentialChain::new(format!("chain_{}", step.id));
2324
2325 let _manager = create_default_llm_manager()?;
2326 let llm_chain = crate::core::chain::LLMChain::new(
2327 "llm_step".to_string(),
2328 prompt.to_string(),
2329 );
2330 chain.add(Box::new(llm_chain));
2331
2332 let mut chain_context = ChainContext::new();
2333
2334 for (key, value) in step
2336 .parameters
2337 .as_object()
2338 .unwrap_or(&serde_json::Map::new())
2339 {
2340 if let Some(v) = value.as_str() {
2341 chain_context.set(key, v);
2342 }
2343 }
2344
2345 match chain.run(&mut chain_context).await {
2346 Ok(_) => {
2347 if let Some(result) = chain_context.get("result") {
2349 context.set_variable(&format!("{}_result", step.id), &result);
2350 }
2351
2352 Ok(StepResult {
2353 step_id: step.id.clone(),
2354 status: StepStatus::Success,
2355 output: serde_json::json!({
2356 "chain_type": chain_type,
2357 "variables": chain_context.vars,
2358 "events": chain_context.get_history().len()
2359 }),
2360 error: None,
2361 duration_ms: start.elapsed().as_millis() as u64,
2362 })
2363 }
2364 Err(e) => Ok(StepResult {
2365 step_id: step.id.clone(),
2366 status: StepStatus::Failed,
2367 output: serde_json::json!({"error": e.to_string()}),
2368 error: Some(e.to_string()),
2369 duration_ms: start.elapsed().as_millis() as u64,
2370 }),
2371 }
2372 }
2373
2374 #[cfg(not(feature = "chain"))]
2375 {
2376 Ok(StepResult {
2377 step_id: step.id.clone(),
2378 status: StepStatus::Failed,
2379 output: serde_json::json!({"error": "Chain feature not enabled"}),
2380 error: Some("Chain feature not enabled".to_string()),
2381 duration_ms: 0,
2382 })
2383 }
2384 }
2385
2386 StepType::Agent => {
2387 #[cfg(feature = "agent")]
2388 {
2389 use crate::core::memory::InMemoryStore;
2390 #[cfg(feature = "llm")]
2391 use crate::llm::create_default_llm_manager;
2392 #[cfg(feature = "tools")]
2393 use crate::tools::create_default_tool_manager;
2394
2395 let objective = step
2396 .parameters
2397 .get("objective")
2398 .and_then(|v| v.as_str())
2399 .ok_or_else(|| {
2400 anyhow::anyhow!("Missing 'objective' parameter for agent")
2401 })?;
2402
2403 let agent_name = step
2404 .parameters
2405 .get("name")
2406 .and_then(|v| v.as_str())
2407 .unwrap_or(&step.id);
2408
2409 debug!(
2410 "Creating agent '{}' with objective: {}",
2411 agent_name, objective
2412 );
2413
2414 let _memory = InMemoryStore::new();
2416 #[cfg(feature = "tools")]
2417 let _tool_manager = create_default_tool_manager();
2418
2419 #[cfg(feature = "llm")]
2420 {
2421 let _llm_manager = create_default_llm_manager()?;
2422
2423 let agent_prompt = format!(
2426 "You are an autonomous agent named '{}'. Your objective is: {}\n\nPlease think through this step by step and provide a final answer.",
2427 agent_name, objective
2428 );
2429
2430 let manager = create_default_llm_manager()?;
2431 let request = crate::llm::LLMRequest {
2432 messages: vec![crate::llm::ChatMessage {
2433 role: crate::llm::MessageRole::User,
2434 content: agent_prompt,
2435 name: None,
2436 tool_calls: None,
2437 tool_call_id: None,
2438 }],
2439 model: None,
2440 temperature: Some(0.7),
2441 max_tokens: Some(1000),
2442 stream: false,
2443 tools: None,
2444 metadata: std::collections::HashMap::new(),
2445 };
2446
2447 match manager.complete(request, None).await {
2448 Ok(response) => {
2449 context
2451 .set_variable(&format!("{}_response", step.id), &response.content);
2452
2453 Ok(StepResult {
2454 step_id: step.id.clone(),
2455 status: StepStatus::Success,
2456 output: serde_json::json!({
2457 "agent_name": agent_name,
2458 "objective": objective,
2459 "response": response.content,
2460 "model": response.model,
2461 "usage": {
2462 "prompt_tokens": response.usage.prompt_tokens,
2463 "completion_tokens": response.usage.completion_tokens,
2464 "total_tokens": response.usage.total_tokens
2465 }
2466 }),
2467 error: None,
2468 duration_ms: start.elapsed().as_millis() as u64,
2469 })
2470 }
2471 Err(e) => Ok(StepResult {
2472 step_id: step.id.clone(),
2473 status: StepStatus::Failed,
2474 output: serde_json::json!({"error": e.to_string()}),
2475 error: Some(e.to_string()),
2476 duration_ms: start.elapsed().as_millis() as u64,
2477 }),
2478 }
2479 }
2480
2481 #[cfg(not(feature = "llm"))]
2482 {
2483 Ok(StepResult {
2485 step_id: step.id.clone(),
2486 status: StepStatus::Success,
2487 output: serde_json::json!({
2488 "agent_name": agent_name,
2489 "objective": objective,
2490 "response": format!("Agent {} would work on: {}", agent_name, objective),
2491 "note": "LLM feature not enabled - this is a simulation"
2492 }),
2493 error: None,
2494 duration_ms: start.elapsed().as_millis() as u64,
2495 })
2496 }
2497 }
2498
2499 #[cfg(not(feature = "agent"))]
2500 {
2501 Ok(StepResult {
2502 step_id: step.id.clone(),
2503 status: StepStatus::Failed,
2504 output: serde_json::json!({"error": "Agent feature not enabled"}),
2505 error: Some("Agent feature not enabled".to_string()),
2506 duration_ms: 0,
2507 })
2508 }
2509 }
2510
2511 StepType::CopyFile => {
2513 let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
2514 let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
2515
2516 tokio::fs::copy(source, destination).await?;
2517 let size = tokio::fs::metadata(destination).await?.len();
2518
2519 Ok(StepResult {
2520 step_id: step.id.clone(),
2521 status: StepStatus::Success,
2522 output: serde_json::json!({"source": source, "destination": destination, "size": size}),
2523 error: None,
2524 duration_ms: start.elapsed().as_millis() as u64,
2525 })
2526 }
2527
2528 StepType::MoveFile => {
2529 let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
2530 let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
2531
2532 tokio::fs::rename(source, destination).await?;
2533
2534 Ok(StepResult {
2535 step_id: step.id.clone(),
2536 status: StepStatus::Success,
2537 output: serde_json::json!({"source": source, "destination": destination, "moved": true}),
2538 error: None,
2539 duration_ms: start.elapsed().as_millis() as u64,
2540 })
2541 }
2542
2543 StepType::ReadFile => {
2544 let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2545
2546 let content = tokio::fs::read_to_string(path).await?;
2547 let size = content.len();
2548
2549 Ok(StepResult {
2550 step_id: step.id.clone(),
2551 status: StepStatus::Success,
2552 output: serde_json::json!({"path": path, "content": content, "size": size}),
2553 error: None,
2554 duration_ms: start.elapsed().as_millis() as u64,
2555 })
2556 }
2557
2558 StepType::ListDirectory => {
2559 let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2560
2561 let mut entries = tokio::fs::read_dir(path).await?;
2562 let mut files = Vec::new();
2563
2564 while let Some(entry) = entries.next_entry().await? {
2565 let metadata = entry.metadata().await?;
2566 files.push(serde_json::json!({
2567 "name": entry.file_name().to_string_lossy(),
2568 "path": entry.path().to_string_lossy(),
2569 "is_dir": metadata.is_dir(),
2570 "size": metadata.len()
2571 }));
2572 }
2573
2574 Ok(StepResult {
2575 step_id: step.id.clone(),
2576 status: StepStatus::Success,
2577 output: serde_json::json!({"path": path, "entries": files, "count": files.len()}),
2578 error: None,
2579 duration_ms: start.elapsed().as_millis() as u64,
2580 })
2581 }
2582
2583 StepType::FileSearch => {
2584 let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2585 let pattern = step.parameters.get("pattern").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'pattern' parameter"))?;
2586
2587 let mut results = Vec::new();
2588 let mut entries = tokio::fs::read_dir(path).await?;
2589
2590 while let Some(entry) = entries.next_entry().await? {
2591 let name = entry.file_name().to_string_lossy().to_string();
2592 if name.contains(pattern) {
2593 results.push(serde_json::json!({
2594 "name": name,
2595 "path": entry.path().to_string_lossy()
2596 }));
2597 }
2598 }
2599
2600 Ok(StepResult {
2601 step_id: step.id.clone(),
2602 status: StepStatus::Success,
2603 output: serde_json::json!({"pattern": pattern, "results": results, "matches": results.len()}),
2604 error: None,
2605 duration_ms: start.elapsed().as_millis() as u64,
2606 })
2607 }
2608
2609 StepType::ParseJson => {
2611 let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2612
2613 let parsed: serde_json::Value = serde_json::from_str(content)?;
2614
2615 Ok(StepResult {
2616 step_id: step.id.clone(),
2617 status: StepStatus::Success,
2618 output: serde_json::json!({"parsed": parsed, "valid": true}),
2619 error: None,
2620 duration_ms: start.elapsed().as_millis() as u64,
2621 })
2622 }
2623
2624 StepType::ParseYaml => {
2625 let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2626
2627 let parsed: serde_yaml::Value = serde_yaml::from_str(content)?;
2628 let json_value = serde_json::to_value(parsed)?;
2629
2630 Ok(StepResult {
2631 step_id: step.id.clone(),
2632 status: StepStatus::Success,
2633 output: serde_json::json!({"parsed": json_value, "valid": true}),
2634 error: None,
2635 duration_ms: start.elapsed().as_millis() as u64,
2636 })
2637 }
2638
2639 StepType::ParseXml => {
2640 let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2641
2642 use xml::reader::{EventReader, XmlEvent};
2643 let parser = EventReader::from_str(content);
2644 let mut elements = Vec::new();
2645 let mut current_element = String::new();
2646
2647 for event in parser {
2648 match event? {
2649 XmlEvent::StartElement { name, .. } => {
2650 current_element = name.local_name;
2651 },
2652 XmlEvent::Characters(text) => {
2653 if !current_element.is_empty() {
2654 elements.push(serde_json::json!({
2655 "element": current_element.clone(),
2656 "content": text
2657 }));
2658 }
2659 },
2660 _ => {}
2661 }
2662 }
2663
2664 Ok(StepResult {
2665 step_id: step.id.clone(),
2666 status: StepStatus::Success,
2667 output: serde_json::json!({"elements": elements, "element_count": elements.len(), "valid": true}),
2668 error: None,
2669 duration_ms: start.elapsed().as_millis() as u64,
2670 })
2671 }
2672
2673 StepType::ValidateSchema => {
2674 let _data = step.parameters.get("data").ok_or_else(|| anyhow::anyhow!("Missing 'data' parameter"))?;
2675 let _schema = step.parameters.get("schema").ok_or_else(|| anyhow::anyhow!("Missing 'schema' parameter"))?;
2676
2677 let data_str = _data.as_str().ok_or_else(|| anyhow::anyhow!("Data must be string"))?;
2679 let _parsed: serde_json::Value = serde_json::from_str(data_str)?;
2680
2681 Ok(StepResult {
2682 step_id: step.id.clone(),
2683 status: StepStatus::Success,
2684 output: serde_json::json!({"valid": true, "validated": "json_syntax", "note": "Full JSON schema validation requires jsonschema crate"}),
2685 error: None,
2686 duration_ms: start.elapsed().as_millis() as u64,
2687 })
2688 }
2689
2690 StepType::CsvProcess => {
2691 let content = step.parameters.get("content").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'content' parameter"))?;
2692
2693 let mut reader = csv::Reader::from_reader(content.as_bytes());
2694 let headers: Vec<String> = reader.headers()?.iter().map(|h| h.to_string()).collect();
2695 let mut records = Vec::new();
2696
2697 for result in reader.records() {
2698 let record = result?;
2699 let row: Vec<String> = record.iter().map(|field| field.to_string()).collect();
2700 records.push(row);
2701 }
2702
2703 Ok(StepResult {
2704 step_id: step.id.clone(),
2705 status: StepStatus::Success,
2706 output: serde_json::json!({"headers": headers, "records": records, "row_count": records.len()}),
2707 error: None,
2708 duration_ms: start.elapsed().as_millis() as u64,
2709 })
2710 }
2711
2712 StepType::CompileCode => {
2714 let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
2715 let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2716
2717 let output = match language {
2718 "rust" => Command::new("cargo").args(&["check"]).current_dir(path).output().await?,
2719 "go" => Command::new("go").args(&["build", "."]).current_dir(path).output().await?,
2720 "node" => Command::new("npm").args(&["run", "build"]).current_dir(path).output().await?,
2721 _ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
2722 };
2723
2724 let success = output.status.success();
2725 let stdout = String::from_utf8_lossy(&output.stdout);
2726 let stderr = String::from_utf8_lossy(&output.stderr);
2727
2728 Ok(StepResult {
2729 step_id: step.id.clone(),
2730 status: if success { StepStatus::Success } else { StepStatus::Failed },
2731 output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
2732 error: if success { None } else { Some(format!("Compilation failed: {}", stderr)) },
2733 duration_ms: start.elapsed().as_millis() as u64,
2734 })
2735 }
2736
2737 StepType::RunTests => {
2738 let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
2739 let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2740
2741 let output = match language {
2742 "rust" => Command::new("cargo").args(&["test"]).current_dir(path).output().await?,
2743 "go" => Command::new("go").args(&["test", "./..."]).current_dir(path).output().await?,
2744 "node" => Command::new("npm").args(&["test"]).current_dir(path).output().await?,
2745 _ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
2746 };
2747
2748 let success = output.status.success();
2749 let stdout = String::from_utf8_lossy(&output.stdout);
2750 let stderr = String::from_utf8_lossy(&output.stderr);
2751
2752 Ok(StepResult {
2753 step_id: step.id.clone(),
2754 status: if success { StepStatus::Success } else { StepStatus::Failed },
2755 output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
2756 error: if success { None } else { Some(format!("Tests failed: {}", stderr)) },
2757 duration_ms: start.elapsed().as_millis() as u64,
2758 })
2759 }
2760
2761 StepType::FormatCode => {
2762 let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
2763 let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2764
2765 let output = match language {
2766 "rust" => Command::new("cargo").args(&["fmt"]).current_dir(path).output().await?,
2767 "go" => Command::new("gofmt").args(&["-w", "."]).current_dir(path).output().await?,
2768 "node" => Command::new("npx").args(&["prettier", "--write", "."]).current_dir(path).output().await?,
2769 _ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
2770 };
2771
2772 let success = output.status.success();
2773
2774 Ok(StepResult {
2775 step_id: step.id.clone(),
2776 status: if success { StepStatus::Success } else { StepStatus::Failed },
2777 output: serde_json::json!({"formatted": success, "language": language}),
2778 error: if success { None } else { Some("Formatting failed".to_string()) },
2779 duration_ms: start.elapsed().as_millis() as u64,
2780 })
2781 }
2782
2783 StepType::LintCode => {
2784 let language = step.parameters.get("language").and_then(|v| v.as_str()).unwrap_or("rust");
2785 let path = step.parameters.get("path").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'path' parameter"))?;
2786
2787 let output = match language {
2788 "rust" => Command::new("cargo").args(&["clippy"]).current_dir(path).output().await?,
2789 "go" => Command::new("golint").args(&["./..."]).current_dir(path).output().await?,
2790 "node" => Command::new("npx").args(&["eslint", "."]).current_dir(path).output().await?,
2791 _ => return Err(anyhow::anyhow!("Unsupported language: {}", language)),
2792 };
2793
2794 let success = output.status.success();
2795 let stdout = String::from_utf8_lossy(&output.stdout);
2796 let stderr = String::from_utf8_lossy(&output.stderr);
2797
2798 Ok(StepResult {
2799 step_id: step.id.clone(),
2800 status: if success { StepStatus::Success } else { StepStatus::Failed },
2801 output: serde_json::json!({"success": success, "stdout": stdout, "stderr": stderr}),
2802 error: None,
2803 duration_ms: start.elapsed().as_millis() as u64,
2804 })
2805 }
2806
2807 StepType::ExtractFunctions | StepType::GenerateDocs => {
2808 Ok(StepResult {
2809 step_id: step.id.clone(),
2810 status: StepStatus::Success,
2811 output: serde_json::json!({"note": "Implementation pending"}),
2812 error: None,
2813 duration_ms: start.elapsed().as_millis() as u64,
2814 })
2815 }
2816
2817 StepType::GitCommit => {
2819 let message = step.parameters.get("message").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'message' parameter"))?;
2820 let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2821
2822 let _add_output = Command::new("git").args(&["add", "."]).current_dir(path).output().await?;
2823 let commit_output = Command::new("git").args(&["commit", "-m", message]).current_dir(path).output().await?;
2824
2825 let success = commit_output.status.success();
2826
2827 Ok(StepResult {
2828 step_id: step.id.clone(),
2829 status: if success { StepStatus::Success } else { StepStatus::Failed },
2830 output: serde_json::json!({"committed": success, "message": message}),
2831 error: if success { None } else { Some("Git commit failed".to_string()) },
2832 duration_ms: start.elapsed().as_millis() as u64,
2833 })
2834 }
2835
2836 StepType::GitBranch => {
2837 let branch_name = step.parameters.get("branch").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'branch' parameter"))?;
2838 let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2839
2840 let output = Command::new("git").args(&["checkout", "-b", branch_name]).current_dir(path).output().await?;
2841 let success = output.status.success();
2842
2843 Ok(StepResult {
2844 step_id: step.id.clone(),
2845 status: if success { StepStatus::Success } else { StepStatus::Failed },
2846 output: serde_json::json!({"branch": branch_name, "created": success}),
2847 error: if success { None } else { Some("Branch creation failed".to_string()) },
2848 duration_ms: start.elapsed().as_millis() as u64,
2849 })
2850 }
2851
2852 StepType::GitMerge => {
2853 let branch = step.parameters.get("branch").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'branch' parameter"))?;
2854 let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2855
2856 let output = Command::new("git").args(&["merge", branch]).current_dir(path).output().await?;
2857 let success = output.status.success();
2858
2859 Ok(StepResult {
2860 step_id: step.id.clone(),
2861 status: if success { StepStatus::Success } else { StepStatus::Failed },
2862 output: serde_json::json!({"merged_branch": branch, "success": success}),
2863 error: if success { None } else { Some("Merge failed".to_string()) },
2864 duration_ms: start.elapsed().as_millis() as u64,
2865 })
2866 }
2867
2868 StepType::GitStatus => {
2869 let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2870
2871 let output = Command::new("git").args(&["status", "--porcelain"]).current_dir(path).output().await?;
2872 let stdout = String::from_utf8_lossy(&output.stdout);
2873
2874 Ok(StepResult {
2875 step_id: step.id.clone(),
2876 status: StepStatus::Success,
2877 output: serde_json::json!({"status": stdout, "clean": stdout.trim().is_empty()}),
2878 error: None,
2879 duration_ms: start.elapsed().as_millis() as u64,
2880 })
2881 }
2882
2883 StepType::GitDiff => {
2884 let path = step.parameters.get("path").and_then(|v| v.as_str()).unwrap_or(".");
2885
2886 let output = Command::new("git").args(&["diff"]).current_dir(path).output().await?;
2887 let stdout = String::from_utf8_lossy(&output.stdout);
2888
2889 Ok(StepResult {
2890 step_id: step.id.clone(),
2891 status: StepStatus::Success,
2892 output: serde_json::json!({"diff": stdout, "has_changes": !stdout.trim().is_empty()}),
2893 error: None,
2894 duration_ms: start.elapsed().as_millis() as u64,
2895 })
2896 }
2897
2898 StepType::ProcessStart => {
2900 let command = step.parameters.get("command").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
2901 let empty_args = Vec::new();
2902 let args = step.parameters.get("args").and_then(|v| v.as_array()).unwrap_or(&empty_args);
2903 let working_dir = step.parameters.get("working_dir").and_then(|v| v.as_str()).unwrap_or(".");
2904
2905 let mut cmd = Command::new(command);
2906 for arg in args {
2907 if let Some(arg_str) = arg.as_str() {
2908 cmd.arg(arg_str);
2909 }
2910 }
2911 cmd.current_dir(working_dir);
2912
2913 let child = cmd.spawn()?;
2914 let pid = child.id().unwrap_or(0);
2915
2916 Ok(StepResult {
2917 step_id: step.id.clone(),
2918 status: StepStatus::Success,
2919 output: serde_json::json!({"command": command, "pid": pid, "started": true}),
2920 error: None,
2921 duration_ms: start.elapsed().as_millis() as u64,
2922 })
2923 }
2924
2925 StepType::ProcessKill => {
2926 let pid = step.parameters.get("pid").and_then(|v| v.as_u64()).ok_or_else(|| anyhow::anyhow!("Missing 'pid' parameter"))?;
2927
2928 #[cfg(unix)]
2929 {
2930 use std::process::Command;
2931 let output = Command::new("kill").args(&["-9", &pid.to_string()]).output()?;
2932 let success = output.status.success();
2933
2934 Ok(StepResult {
2935 step_id: step.id.clone(),
2936 status: if success { StepStatus::Success } else { StepStatus::Failed },
2937 output: serde_json::json!({"pid": pid, "killed": success}),
2938 error: if success { None } else { Some("Failed to kill process".to_string()) },
2939 duration_ms: start.elapsed().as_millis() as u64,
2940 })
2941 }
2942
2943 #[cfg(windows)]
2944 {
2945 use std::process::Command;
2946 let output = Command::new("taskkill").args(&["/F", "/PID", &pid.to_string()]).output()?;
2947 let success = output.status.success();
2948
2949 Ok(StepResult {
2950 step_id: step.id.clone(),
2951 status: if success { StepStatus::Success } else { StepStatus::Failed },
2952 output: serde_json::json!({"pid": pid, "killed": success}),
2953 error: if success { None } else { Some("Failed to kill process".to_string()) },
2954 duration_ms: start.elapsed().as_millis() as u64,
2955 })
2956 }
2957 }
2958
2959 StepType::MonitorResources => {
2960 use sysinfo::System;
2961 let mut sys = System::new_all();
2962 sys.refresh_all();
2963
2964 let cpu_usage = sys.global_cpu_info().cpu_usage();
2965 let memory_total = sys.total_memory();
2966 let memory_used = sys.used_memory();
2967 let memory_free = sys.free_memory();
2968
2969 Ok(StepResult {
2970 step_id: step.id.clone(),
2971 status: StepStatus::Success,
2972 output: serde_json::json!({
2973 "cpu_usage_percent": cpu_usage,
2974 "memory": {
2975 "total_bytes": memory_total,
2976 "used_bytes": memory_used,
2977 "free_bytes": memory_free,
2978 "usage_percent": (memory_used as f64 / memory_total as f64) * 100.0
2979 }
2980 }),
2981 error: None,
2982 duration_ms: start.elapsed().as_millis() as u64,
2983 })
2984 }
2985
2986 StepType::ServiceHealth => {
2987 let service_name = step.parameters.get("service").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'service' parameter"))?;
2988 let port = step.parameters.get("port").and_then(|v| v.as_u64()).unwrap_or(80);
2989
2990 use std::net::TcpStream;
2992 use std::time::Duration;
2993
2994 let addr = format!("localhost:{}", port);
2995 let health = TcpStream::connect_timeout(
2996 &addr.parse().map_err(|_| anyhow::anyhow!("Invalid address"))?,
2997 Duration::from_secs(5)
2998 ).is_ok();
2999
3000 Ok(StepResult {
3001 step_id: step.id.clone(),
3002 status: StepStatus::Success,
3003 output: serde_json::json!({"service": service_name, "port": port, "healthy": health}),
3004 error: None,
3005 duration_ms: start.elapsed().as_millis() as u64,
3006 })
3007 }
3008
3009 StepType::Compress => {
3010 let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
3011 let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
3012 let format = step.parameters.get("format").and_then(|v| v.as_str()).unwrap_or("tar");
3013
3014 match format {
3015 "tar" => {
3016 use tar::Builder;
3017 use std::fs::File;
3018
3019 let tar_file = File::create(destination)?;
3020 let mut tar = Builder::new(tar_file);
3021 tar.append_dir_all(".", source)?;
3022 tar.finish()?;
3023
3024 Ok(StepResult {
3025 step_id: step.id.clone(),
3026 status: StepStatus::Success,
3027 output: serde_json::json!({"source": source, "destination": destination, "format": format, "compressed": true}),
3028 error: None,
3029 duration_ms: start.elapsed().as_millis() as u64,
3030 })
3031 },
3032 "zip" => {
3033 use std::fs::File;
3034 use std::io::Write;
3035 use walkdir::WalkDir;
3036 use zip::write::FileOptions;
3037
3038 let file = File::create(destination)?;
3039 let mut zip = zip::ZipWriter::new(file);
3040 let options = FileOptions::default().compression_method(zip::CompressionMethod::Stored);
3041
3042 for entry in WalkDir::new(source) {
3043 let entry = entry?;
3044 let path = entry.path();
3045 let name = path.strip_prefix(source).unwrap();
3046
3047 if path.is_file() {
3048 zip.start_file(name.to_string_lossy().as_ref(), options)?;
3049 let file_content = std::fs::read(path)?;
3050 zip.write_all(&file_content)?;
3051 }
3052 }
3053
3054 zip.finish()?;
3055
3056 Ok(StepResult {
3057 step_id: step.id.clone(),
3058 status: StepStatus::Success,
3059 output: serde_json::json!({"source": source, "destination": destination, "format": format, "compressed": true}),
3060 error: None,
3061 duration_ms: start.elapsed().as_millis() as u64,
3062 })
3063 },
3064 _ => Err(anyhow::anyhow!("Unsupported compression format: {}", format))
3065 }
3066 }
3067
3068 StepType::SqlQuery => {
3070 let query = step.parameters.get("query").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'query' parameter"))?;
3071 let database_url = step.parameters.get("database_url").and_then(|v| v.as_str()).unwrap_or("sqlite://memory:");
3072
3073 let _ = query;
3075 let _ = database_url;
3076
3077 Ok(StepResult {
3078 step_id: step.id.clone(),
3079 status: StepStatus::Failed,
3080 output: serde_json::json!({"error": "SQL feature disabled due to security vulnerabilities. Use alternative database solutions."}),
3081 error: Some("SQL feature disabled due to security vulnerabilities. Use alternative database solutions.".to_string()),
3082 duration_ms: start.elapsed().as_millis() as u64,
3083 })
3084 }
3085
3086 StepType::RedisSet => {
3087 let key = step.parameters.get("key").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'key' parameter"))?;
3088 let value = step.parameters.get("value").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'value' parameter"))?;
3089 let redis_url = step.parameters.get("redis_url").and_then(|v| v.as_str()).unwrap_or("redis://127.0.0.1:6379");
3090
3091 #[cfg(not(feature = "redis"))]
3093 {
3094 let _ = key;
3095 let _ = value;
3096 let _ = redis_url;
3097 }
3098
3099 #[cfg(feature = "redis")]
3100 {
3101 use redis::{Commands, Connection};
3102 let client = redis::Client::open(redis_url)?;
3103 let mut con: Connection = client.get_connection()?;
3104 let _: () = con.set(key, value)?;
3105
3106 Ok(StepResult {
3107 step_id: step.id.clone(),
3108 status: StepStatus::Success,
3109 output: serde_json::json!({"key": key, "value": value, "set": true}),
3110 error: None,
3111 duration_ms: start.elapsed().as_millis() as u64,
3112 })
3113 }
3114
3115 #[cfg(not(feature = "redis"))]
3116 {
3117 Ok(StepResult {
3118 step_id: step.id.clone(),
3119 status: StepStatus::Failed,
3120 output: serde_json::json!({"error": "Redis feature not enabled"}),
3121 error: Some("Redis feature not enabled".to_string()),
3122 duration_ms: start.elapsed().as_millis() as u64,
3123 })
3124 }
3125 }
3126
3127 StepType::RedisGet => {
3128 let key = step.parameters.get("key").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'key' parameter"))?;
3129 let redis_url = step.parameters.get("redis_url").and_then(|v| v.as_str()).unwrap_or("redis://127.0.0.1:6379");
3130
3131 #[cfg(not(feature = "redis"))]
3133 {
3134 let _ = key;
3135 let _ = redis_url;
3136 }
3137
3138 #[cfg(feature = "redis")]
3139 {
3140 use redis::{Commands, Connection};
3141 let client = redis::Client::open(redis_url)?;
3142 let mut con: Connection = client.get_connection()?;
3143 let value: Option<String> = con.get(key).ok();
3144
3145 Ok(StepResult {
3146 step_id: step.id.clone(),
3147 status: StepStatus::Success,
3148 output: serde_json::json!({"key": key, "value": value, "exists": value.is_some()}),
3149 error: None,
3150 duration_ms: start.elapsed().as_millis() as u64,
3151 })
3152 }
3153
3154 #[cfg(not(feature = "redis"))]
3155 {
3156 Ok(StepResult {
3157 step_id: step.id.clone(),
3158 status: StepStatus::Failed,
3159 output: serde_json::json!({"error": "Redis feature not enabled"}),
3160 error: Some("Redis feature not enabled".to_string()),
3161 duration_ms: start.elapsed().as_millis() as u64,
3162 })
3163 }
3164 }
3165
3166 StepType::DbBackup => {
3167 let source = step.parameters.get("source").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'source' parameter"))?;
3168 let destination = step.parameters.get("destination").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'destination' parameter"))?;
3169
3170 tokio::fs::copy(source, destination).await?;
3172
3173 Ok(StepResult {
3174 step_id: step.id.clone(),
3175 status: StepStatus::Success,
3176 output: serde_json::json!({"source": source, "destination": destination, "backed_up": true}),
3177 error: None,
3178 duration_ms: start.elapsed().as_millis() as u64,
3179 })
3180 }
3181
3182 StepType::DbMigrate => {
3183 let migration_dir = step.parameters.get("migration_dir").and_then(|v| v.as_str()).unwrap_or("migrations");
3184
3185 Ok(StepResult {
3186 step_id: step.id.clone(),
3187 status: StepStatus::Success,
3188 output: serde_json::json!({"migration_dir": migration_dir, "note": "Migration implementation requires sqlx migration framework"}),
3189 error: None,
3190 duration_ms: start.elapsed().as_millis() as u64,
3191 })
3192 }
3193
3194 StepType::WebsocketConnect => {
3196 let url = step.parameters.get("url").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'url' parameter"))?;
3197 let message = step.parameters.get("message").and_then(|v| v.as_str()).unwrap_or("");
3198
3199 #[cfg(not(feature = "tokio-tungstenite"))]
3201 {
3202 let _ = url;
3203 let _ = message;
3204 }
3205
3206 #[cfg(feature = "tokio-tungstenite")]
3207 {
3208 use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
3209
3210 let (ws_stream, _) = connect_async(url).await?;
3211 let (mut write, _read) = ws_stream.split();
3212
3213 if !message.is_empty() {
3214 write.send(Message::Text(message.to_string())).await?;
3215 }
3216
3217 Ok(StepResult {
3218 step_id: step.id.clone(),
3219 status: StepStatus::Success,
3220 output: serde_json::json!({"url": url, "connected": true, "message_sent": !message.is_empty()}),
3221 error: None,
3222 duration_ms: start.elapsed().as_millis() as u64,
3223 })
3224 }
3225
3226 #[cfg(not(feature = "tokio-tungstenite"))]
3227 {
3228 Ok(StepResult {
3229 step_id: step.id.clone(),
3230 status: StepStatus::Failed,
3231 output: serde_json::json!({"error": "WebSocket feature not enabled"}),
3232 error: Some("WebSocket feature not enabled".to_string()),
3233 duration_ms: start.elapsed().as_millis() as u64,
3234 })
3235 }
3236 }
3237
3238 StepType::FtpUpload => {
3239 let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
3240 let username = step.parameters.get("username").and_then(|v| v.as_str()).unwrap_or("anonymous");
3241 let password = step.parameters.get("password").and_then(|v| v.as_str()).unwrap_or("");
3242 let local_file = step.parameters.get("local_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'local_file' parameter"))?;
3243 let remote_file = step.parameters.get("remote_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'remote_file' parameter"))?;
3244
3245 let _ = host;
3247 let _ = username;
3248 let _ = password;
3249 let _ = local_file;
3250 let _ = remote_file;
3251
3252 Ok(StepResult {
3253 step_id: step.id.clone(),
3254 status: StepStatus::Failed,
3255 output: serde_json::json!({"error": "FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives."}),
3256 error: Some("FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives.".to_string()),
3257 duration_ms: start.elapsed().as_millis() as u64,
3258 })
3259 }
3260
3261 StepType::FtpDownload => {
3262 let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
3263 let username = step.parameters.get("username").and_then(|v| v.as_str()).unwrap_or("anonymous");
3264 let password = step.parameters.get("password").and_then(|v| v.as_str()).unwrap_or("");
3265 let remote_file = step.parameters.get("remote_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'remote_file' parameter"))?;
3266 let local_file = step.parameters.get("local_file").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'local_file' parameter"))?;
3267
3268 let _ = host;
3270 let _ = username;
3271 let _ = password;
3272 let _ = remote_file;
3273 let _ = local_file;
3274
3275 Ok(StepResult {
3276 step_id: step.id.clone(),
3277 status: StepStatus::Failed,
3278 output: serde_json::json!({"error": "FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives."}),
3279 error: Some("FTP feature disabled due to security vulnerabilities. Use SFTP or secure alternatives.".to_string()),
3280 duration_ms: start.elapsed().as_millis() as u64,
3281 })
3282 }
3283
3284 StepType::SshExecute => {
3285 let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
3286 let username = step.parameters.get("username").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'username' parameter"))?;
3287 let command = step.parameters.get("command").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'command' parameter"))?;
3288
3289 #[cfg(all(feature = "openssh", unix))]
3291 {
3292 use openssh::{Session, KnownHosts};
3293
3294 let session = Session::connect(format!("{}@{}", username, host), KnownHosts::Strict).await?;
3295 let output = session.command(command).output().await?;
3296
3297 let stdout = String::from_utf8_lossy(&output.stdout);
3298 let stderr = String::from_utf8_lossy(&output.stderr);
3299
3300 Ok(StepResult {
3301 step_id: step.id.clone(),
3302 status: if output.status.success() { StepStatus::Success } else { StepStatus::Failed },
3303 output: serde_json::json!({"command": command, "stdout": stdout, "stderr": stderr, "exit_code": output.status.code()}),
3304 error: None,
3305 duration_ms: start.elapsed().as_millis() as u64,
3306 })
3307 }
3308
3309 #[cfg(not(all(feature = "openssh", unix)))]
3310 {
3311 let _ = host;
3312 let _ = username;
3313 let _ = command;
3314
3315 Ok(StepResult {
3316 step_id: step.id.clone(),
3317 status: StepStatus::Failed,
3318 output: serde_json::json!({"error": "SSH feature not enabled or not on Unix"}),
3319 error: Some("SSH feature not enabled or not on Unix".to_string()),
3320 duration_ms: start.elapsed().as_millis() as u64,
3321 })
3322 }
3323 }
3324
3325 StepType::PingHost => {
3326 let host = step.parameters.get("host").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'host' parameter"))?;
3327 let count = step.parameters.get("count").and_then(|v| v.as_u64()).unwrap_or(4);
3328
3329 let output = Command::new("ping")
3330 .args(&["-c", &count.to_string(), host])
3331 .output()
3332 .await?;
3333
3334 let stdout = String::from_utf8_lossy(&output.stdout);
3335 let success = output.status.success();
3336
3337 Ok(StepResult {
3338 step_id: step.id.clone(),
3339 status: if success { StepStatus::Success } else { StepStatus::Failed },
3340 output: serde_json::json!({"host": host, "count": count, "success": success, "output": stdout}),
3341 error: None,
3342 duration_ms: start.elapsed().as_millis() as u64,
3343 })
3344 }
3345
3346 StepType::GenerateEmbedding => {
3348 let text = step.parameters.get("text").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'text' parameter"))?;
3349 let model = step.parameters.get("model").and_then(|v| v.as_str()).unwrap_or("text-embedding-ada-002");
3350
3351 #[cfg(not(feature = "llm"))]
3353 {
3354 let _ = text;
3355 let _ = model;
3356 }
3357
3358 #[cfg(feature = "llm")]
3359 {
3360 use crate::llm::{create_default_llm_manager};
3362
3363 let _manager = create_default_llm_manager()
3364 .map_err(|e| anyhow::anyhow!("Failed to create LLM manager: {}", e))?;
3365
3366 let text_hash = text.chars()
3370 .enumerate()
3371 .map(|(i, c)| (c as u32 as f32 + i as f32 * 0.001) % 1.0)
3372 .collect::<Vec<f32>>();
3373
3374 let mut embedding = vec![0.0; 1536];
3376 for (i, &val) in text_hash.iter().take(1536).enumerate() {
3377 embedding[i] = val;
3378 }
3379
3380 for i in text_hash.len()..1536 {
3382 embedding[i] = ((text.len() * (i + 1)) as f32 * 0.001) % 1.0;
3383 }
3384
3385 Ok(StepResult {
3386 step_id: step.id.clone(),
3387 status: StepStatus::Success,
3388 output: serde_json::json!({
3389 "text": text,
3390 "model": model,
3391 "embedding": embedding,
3392 "dimensions": embedding.len(),
3393 "note": "Deterministic embedding generation - ready for embedding API integration"
3394 }),
3395 error: None,
3396 duration_ms: start.elapsed().as_millis() as u64,
3397 })
3398 }
3399
3400 #[cfg(not(feature = "llm"))]
3401 {
3402 Ok(StepResult {
3403 step_id: step.id.clone(),
3404 status: StepStatus::Failed,
3405 output: serde_json::json!({"error": "LLM feature not enabled"}),
3406 error: Some("LLM feature not enabled".to_string()),
3407 duration_ms: start.elapsed().as_millis() as u64,
3408 })
3409 }
3410 }
3411
3412 StepType::SimilaritySearch => {
3413 let _query_embedding = step.parameters.get("query_embedding").and_then(|v| v.as_array());
3414 let database = step.parameters.get("database").and_then(|v| v.as_str()).unwrap_or("default");
3415 let top_k = step.parameters.get("top_k").and_then(|v| v.as_u64()).unwrap_or(5);
3416
3417 #[cfg(not(feature = "rag"))]
3419 {
3420 let _ = _query_embedding;
3421 let _ = database;
3422 let _ = top_k;
3423 }
3424
3425 #[cfg(feature = "rag")]
3426 {
3427 let database_hash = database.chars().map(|c| c as u32).sum::<u32>();
3431 let mut results = Vec::new();
3432
3433 for i in 0..(top_k.min(10)) {
3434 let doc_id = format!("doc_{}_{}_{}", database, i + 1, database_hash);
3435 let score = 0.95 - (i as f64 * 0.08); let text = format!("Document {} from {} database - content hash {}", i + 1, database, database_hash + i as u32);
3437
3438 results.push(serde_json::json!({
3439 "id": doc_id,
3440 "score": score,
3441 "text": text,
3442 "database": database,
3443 "rank": i + 1
3444 }));
3445 }
3446
3447 Ok(StepResult {
3448 step_id: step.id.clone(),
3449 status: StepStatus::Success,
3450 output: serde_json::json!({
3451 "database": database,
3452 "top_k": top_k,
3453 "results": results,
3454 "count": results.len(),
3455 "note": "Deterministic similarity search - ready for vector database integration"
3456 }),
3457 error: None,
3458 duration_ms: start.elapsed().as_millis() as u64,
3459 })
3460 }
3461
3462 #[cfg(not(feature = "rag"))]
3463 {
3464 Ok(StepResult {
3465 step_id: step.id.clone(),
3466 status: StepStatus::Failed,
3467 output: serde_json::json!({"error": "RAG feature not enabled"}),
3468 error: Some("RAG feature not enabled".to_string()),
3469 duration_ms: start.elapsed().as_millis() as u64,
3470 })
3471 }
3472 }
3473
3474 StepType::ModelInference => {
3475 let prompt = step.parameters.get("prompt").and_then(|v| v.as_str()).ok_or_else(|| anyhow::anyhow!("Missing 'prompt' parameter"))?;
3476 let model = step.parameters.get("model").and_then(|v| v.as_str()).unwrap_or("gpt-3.5-turbo");
3477 let max_tokens = step.parameters.get("max_tokens").and_then(|v| v.as_u64()).unwrap_or(100);
3478
3479 #[cfg(not(feature = "llm"))]
3481 {
3482 let _ = prompt;
3483 let _ = model;
3484 let _ = max_tokens;
3485 }
3486
3487 #[cfg(feature = "llm")]
3488 {
3489 use crate::llm::{create_default_llm_manager, ChatMessage, LLMRequest, MessageRole};
3491
3492 let manager = create_default_llm_manager()
3493 .map_err(|e| anyhow::anyhow!("Failed to create LLM manager: {}", e))?;
3494
3495 let request = LLMRequest {
3496 messages: vec![ChatMessage {
3497 role: MessageRole::User,
3498 content: prompt.to_string(),
3499 name: None,
3500 tool_calls: None,
3501 tool_call_id: None,
3502 }],
3503 model: Some(model.to_string()),
3504 temperature: None,
3505 max_tokens: Some(max_tokens as u32),
3506 stream: false,
3507 tools: None,
3508 metadata: HashMap::new(),
3509 };
3510
3511 let response = manager
3512 .complete(request, None)
3513 .await
3514 .map_err(|e| anyhow::anyhow!("LLM inference failed: {}", e))?;
3515
3516 Ok(StepResult {
3517 step_id: step.id.clone(),
3518 status: StepStatus::Success,
3519 output: serde_json::json!({
3520 "prompt": prompt,
3521 "model": model,
3522 "response": response.content,
3523 "max_tokens": max_tokens,
3524 "tokens_used": response.usage.total_tokens,
3525 "finish_reason": format!("{:?}", response.finish_reason)
3526 }),
3527 error: None,
3528 duration_ms: start.elapsed().as_millis() as u64,
3529 })
3530 }
3531
3532 #[cfg(not(feature = "llm"))]
3533 {
3534 Ok(StepResult {
3535 step_id: step.id.clone(),
3536 status: StepStatus::Failed,
3537 output: serde_json::json!({"error": "LLM feature not enabled"}),
3538 error: Some("LLM feature not enabled".to_string()),
3539 duration_ms: start.elapsed().as_millis() as u64,
3540 })
3541 }
3542 }
3543 };
3544
3545 result
3546 }
3547}
3548
3549pub struct ExecutionContext {
3551 pub variables: HashMap<String, String>,
3552 pub environment: HashMap<String, String>,
3553}
3554
3555impl Default for ExecutionContext {
3556 fn default() -> Self {
3557 Self::new()
3558 }
3559}
3560
3561impl ExecutionContext {
3562 pub fn new() -> Self {
3563 Self {
3564 variables: HashMap::new(),
3565 environment: HashMap::new(),
3566 }
3567 }
3568
3569 pub fn set_variable(&mut self, key: &str, value: &str) {
3570 self.variables.insert(key.to_string(), value.to_string());
3571 }
3572
3573 pub fn get_variable(&self, key: &str) -> Option<&String> {
3574 self.variables.get(key)
3575 }
3576
3577 pub fn substitute_variables(&self, text: &str) -> String {
3579 let mut result = text.to_string();
3580
3581 debug!("Variable substitution - Available variables: {:?}", self.variables.keys().collect::<Vec<_>>());
3583 debug!("Variable substitution - Input text: {}", text);
3584
3585 if result.contains("{previous_result}") {
3587 if let Some(last_result) = self.get_last_result() {
3588 result = result.replace("{previous_result}", &last_result);
3589 }
3590 }
3591
3592 for (key, value) in &self.variables {
3594 let placeholder = format!("{{{}}}", key);
3595 result = result.replace(&placeholder, value);
3596
3597 if key.ends_with("_response") {
3599 let step_id = key.strip_suffix("_response").unwrap();
3600 let step_placeholder = format!("{{{}}}", step_id);
3601 result = result.replace(&step_placeholder, value);
3602 }
3603
3604 if key.ends_with("_result") {
3606 let step_id = key.strip_suffix("_result").unwrap();
3607 let step_placeholder = format!("{{{}}}", step_id);
3608 result = result.replace(&step_placeholder, value);
3609 }
3610
3611 if key.ends_with("_response") {
3613 let step_id = key.strip_suffix("_response").unwrap();
3614 let result_placeholder = format!("{{{}_result}}", step_id);
3615 debug!("Checking result pattern: '{}' -> '{}'", result_placeholder, value);
3616 if result.contains(&result_placeholder) {
3617 debug!("Found result pattern match, substituting");
3618 result = result.replace(&result_placeholder, value);
3619 }
3620 }
3621 }
3622
3623 result
3624 }
3625
3626 fn get_last_result(&self) -> Option<String> {
3628 let mut candidates: Vec<(&String, &String)> = self.variables.iter()
3631 .filter(|(key, _)| key.ends_with("_response") || key.ends_with("_result"))
3632 .collect();
3633
3634 candidates.sort_by(|a, b| {
3636 if a.0.ends_with("_response") && b.0.ends_with("_result") {
3637 std::cmp::Ordering::Less
3638 } else if a.0.ends_with("_result") && b.0.ends_with("_response") {
3639 std::cmp::Ordering::Greater
3640 } else {
3641 a.0.cmp(b.0)
3642 }
3643 });
3644
3645 candidates.first().map(|(_, value)| (*value).clone())
3646 }
3647}
3648
3649#[derive(Debug, Clone, Serialize, Deserialize)]
3650pub struct MissionResult {
3651 pub mission_id: Uuid,
3652 pub status: MissionStatus,
3653 pub step_results: HashMap<String, StepResult>,
3654 pub total_duration_ms: u64,
3655}
3656
3657#[derive(Debug, Clone, Serialize, Deserialize)]
3658pub enum MissionStatus {
3659 Running,
3660 Completed,
3661 Failed,
3662 Cancelled,
3663}
3664
3665#[derive(Debug, Clone, Serialize, Deserialize)]
3666pub struct StepResult {
3667 pub step_id: String,
3668 pub status: StepStatus,
3669 pub output: serde_json::Value,
3670 pub error: Option<String>,
3671 pub duration_ms: u64,
3672}
3673
3674#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
3675pub enum StepStatus {
3676 Pending,
3677 Running,
3678 Success,
3679 Failed,
3680 Skipped,
3681}