1use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
8use super::{ParallelGroup, Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
9use crate::config::BackendConfig;
10use crate::environment::Environment;
11use crate::manifest::WorkspaceConfig;
12use crate::{Error, Result};
13use async_recursion::async_recursion;
14use cuenv_workspaces::PackageManager;
15use std::collections::HashMap;
16use std::path::{Path, PathBuf};
17use std::process::Stdio;
18use std::sync::Arc;
19use tokio::process::Command;
20use tokio::task::JoinSet;
21
22#[derive(Debug, Clone)]
24pub struct TaskResult {
25 pub name: String,
26 pub exit_code: Option<i32>,
27 pub stdout: String,
28 pub stderr: String,
29 pub success: bool,
30}
31
32pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
34
35#[derive(Debug, Clone)]
37pub struct ExecutorConfig {
38 pub capture_output: bool,
40 pub max_parallel: usize,
42 pub environment: Environment,
44 pub working_dir: Option<PathBuf>,
46 pub project_root: PathBuf,
48 pub cue_module_root: Option<PathBuf>,
50 pub materialize_outputs: Option<PathBuf>,
52 pub cache_dir: Option<PathBuf>,
54 pub show_cache_path: bool,
56 pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
58 pub backend_config: Option<BackendConfig>,
60 pub cli_backend: Option<String>,
62}
63
64impl Default for ExecutorConfig {
65 fn default() -> Self {
66 Self {
67 capture_output: false,
68 max_parallel: 0,
69 environment: Environment::new(),
70 working_dir: None,
71 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
72 cue_module_root: None,
73 materialize_outputs: None,
74 cache_dir: None,
75 show_cache_path: false,
76 workspaces: None,
77 backend_config: None,
78 cli_backend: None,
79 }
80 }
81}
82
83pub struct TaskExecutor {
85 config: ExecutorConfig,
86 backend: Arc<dyn TaskBackend>,
87}
88impl TaskExecutor {
89 pub fn new(config: ExecutorConfig) -> Self {
91 Self::with_dagger_factory(config, None)
92 }
93
94 pub fn with_dagger_factory(
98 config: ExecutorConfig,
99 dagger_factory: Option<BackendFactory>,
100 ) -> Self {
101 let backend = create_backend_with_factory(
102 config.backend_config.as_ref(),
103 config.project_root.clone(),
104 config.cli_backend.as_deref(),
105 dagger_factory,
106 );
107 Self { config, backend }
108 }
109
110 fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
112 Self { config, backend }
113 }
114
115 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
117 if self.backend.name() == "dagger" {
122 return self
123 .backend
124 .execute(
125 name,
126 task,
127 &self.config.environment,
128 &self.config.project_root,
129 self.config.capture_output,
130 )
131 .await;
132 }
133
134 self.execute_task_non_hermetic(name, task).await
136 }
137
138 async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
142 if task.is_task_ref() && task.project_root.is_none() {
144 return Err(Error::configuration(format!(
145 "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
146 This usually means:\n\
147 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
148 - The referenced task '{}' doesn't exist in that project\n\
149 - There was an error loading the referenced project's env.cue\n\
150 Run with RUST_LOG=debug for more details.",
151 name,
152 task.task_ref.as_deref().unwrap_or("unknown"),
153 task.task_ref
154 .as_deref()
155 .and_then(|r| r.split(':').next_back())
156 .unwrap_or("unknown")
157 )));
158 }
159
160 let workdir = if let Some(ref dir) = task.directory {
167 self.config
169 .cue_module_root
170 .as_ref()
171 .unwrap_or(&self.config.project_root)
172 .join(dir)
173 } else if let Some(ref project_root) = task.project_root {
174 project_root.clone()
176 } else if let Some(ref source) = task.source {
177 if let Some(dir) = source.directory() {
179 self.config
180 .cue_module_root
181 .as_ref()
182 .unwrap_or(&self.config.project_root)
183 .join(dir)
184 } else {
185 self.config
189 .cue_module_root
190 .clone()
191 .unwrap_or_else(|| self.config.project_root.clone())
192 }
193 } else if !task.hermetic && !task.workspaces.is_empty() {
194 let workspace_name = &task.workspaces[0];
196 let manager = match workspace_name.as_str() {
197 "bun" => PackageManager::Bun,
198 "npm" => PackageManager::Npm,
199 "pnpm" => PackageManager::Pnpm,
200 "yarn" => PackageManager::YarnModern,
201 "cargo" => PackageManager::Cargo,
202 _ => PackageManager::Npm, };
204 find_workspace_root(manager, &self.config.project_root)
205 } else {
206 self.config.project_root.clone()
207 };
208
209 tracing::info!(
210 task = %name,
211 workdir = %workdir.display(),
212 hermetic = false,
213 "Executing non-hermetic task"
214 );
215
216 let cmd_str = if let Some(script) = &task.script {
219 format!("[script: {} bytes]", script.len())
220 } else if task.command.is_empty() {
221 task.args.join(" ")
222 } else {
223 format!("{} {}", task.command, task.args.join(" "))
224 };
225
226 cuenv_events::emit_task_started!(name, cmd_str, false);
227
228 let mut cmd = if let Some(script) = &task.script {
230 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
232 (
233 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
234 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
235 )
236 } else {
237 ("bash".to_string(), "-c".to_string())
239 };
240
241 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
242 let mut cmd = Command::new(&resolved_shell);
243 cmd.arg(&shell_flag);
244 cmd.arg(script);
245 cmd
246 } else {
247 let resolved_command = self.config.environment.resolve_command(&task.command);
249
250 if let Some(shell) = &task.shell {
251 if let (Some(shell_command), Some(shell_flag)) = (&shell.command, &shell.flag) {
252 let resolved_shell = self.config.environment.resolve_command(shell_command);
253 let mut cmd = Command::new(&resolved_shell);
254 cmd.arg(shell_flag);
255 if task.args.is_empty() {
256 cmd.arg(&resolved_command);
257 } else {
258 let full_command = if task.command.is_empty() {
259 task.args.join(" ")
260 } else {
261 format!("{} {}", resolved_command, task.args.join(" "))
262 };
263 cmd.arg(full_command);
264 }
265 cmd
266 } else {
267 let mut cmd = Command::new(&resolved_command);
268 for arg in &task.args {
269 cmd.arg(arg);
270 }
271 cmd
272 }
273 } else {
274 let mut cmd = Command::new(&resolved_command);
275 for arg in &task.args {
276 cmd.arg(arg);
277 }
278 cmd
279 }
280 };
281
282 cmd.current_dir(&workdir);
284 let env_vars = self.config.environment.merge_with_system();
285 for (k, v) in &env_vars {
286 cmd.env(k, v);
287 }
288
289 if self.config.capture_output {
292 use tokio::io::{AsyncBufReadExt, BufReader};
293
294 let start_time = std::time::Instant::now();
295
296 let mut child = cmd
298 .stdout(Stdio::piped())
299 .stderr(Stdio::piped())
300 .spawn()
301 .map_err(|e| Error::Io {
302 source: e,
303 path: None,
304 operation: format!("spawn task {}", name),
305 })?;
306
307 let stdout_handle = child.stdout.take();
309 let stderr_handle = child.stderr.take();
310
311 let mut stdout_lines = Vec::new();
313 let mut stderr_lines = Vec::new();
314
315 let name_for_stdout = name.to_string();
317 let stdout_task = tokio::spawn(async move {
318 let mut lines = Vec::new();
319 if let Some(stdout) = stdout_handle {
320 let mut reader = BufReader::new(stdout).lines();
321 while let Ok(Some(line)) = reader.next_line().await {
322 cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
323 lines.push(line);
324 }
325 }
326 lines
327 });
328
329 let name_for_stderr = name.to_string();
331 let stderr_task = tokio::spawn(async move {
332 let mut lines = Vec::new();
333 if let Some(stderr) = stderr_handle {
334 let mut reader = BufReader::new(stderr).lines();
335 while let Ok(Some(line)) = reader.next_line().await {
336 cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
337 lines.push(line);
338 }
339 }
340 lines
341 });
342
343 let status = child.wait().await.map_err(|e| Error::Io {
345 source: e,
346 path: None,
347 operation: format!("wait for task {}", name),
348 })?;
349
350 if let Ok(lines) = stdout_task.await {
352 stdout_lines = lines;
353 }
354 if let Ok(lines) = stderr_task.await {
355 stderr_lines = lines;
356 }
357
358 let duration_ms = start_time.elapsed().as_millis() as u64;
359 let stdout = stdout_lines.join("\n");
360 let stderr = stderr_lines.join("\n");
361 let exit_code = status.code().unwrap_or(-1);
362 let success = status.success();
363
364 cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
366
367 if !success {
368 tracing::warn!(task = %name, exit = exit_code, "Task failed");
369 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
370 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
371 }
372
373 Ok(TaskResult {
374 name: name.to_string(),
375 exit_code: Some(exit_code),
376 stdout,
377 stderr,
378 success,
379 })
380 } else {
381 let status = cmd
383 .stdout(Stdio::inherit())
384 .stderr(Stdio::inherit())
385 .status()
386 .await
387 .map_err(|e| Error::Io {
388 source: e,
389 path: None,
390 operation: format!("spawn task {}", name),
391 })?;
392
393 let exit_code = status.code().unwrap_or(-1);
394 let success = status.success();
395
396 if !success {
397 tracing::warn!(task = %name, exit = exit_code, "Task failed");
398 }
399
400 Ok(TaskResult {
401 name: name.to_string(),
402 exit_code: Some(exit_code),
403 stdout: String::new(), stderr: String::new(),
405 success,
406 })
407 }
408 }
409
410 #[async_recursion]
412 pub async fn execute_definition(
413 &self,
414 name: &str,
415 definition: &TaskDefinition,
416 all_tasks: &Tasks,
417 ) -> Result<Vec<TaskResult>> {
418 match definition {
419 TaskDefinition::Single(task) => {
420 let result = self.execute_task(name, task.as_ref()).await?;
421 Ok(vec![result])
422 }
423 TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
424 }
425 }
426
427 async fn execute_group(
428 &self,
429 prefix: &str,
430 group: &TaskGroup,
431 all_tasks: &Tasks,
432 ) -> Result<Vec<TaskResult>> {
433 match group {
434 TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
435 TaskGroup::Parallel(group) => self.execute_parallel(prefix, group, all_tasks).await,
436 }
437 }
438
439 async fn execute_sequential(
440 &self,
441 prefix: &str,
442 tasks: &[TaskDefinition],
443 all_tasks: &Tasks,
444 ) -> Result<Vec<TaskResult>> {
445 if !self.config.capture_output {
446 cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
447 }
448 let mut results = Vec::new();
449 for (i, task_def) in tasks.iter().enumerate() {
450 let task_name = format!("{}[{}]", prefix, i);
451 let task_results = self
452 .execute_definition(&task_name, task_def, all_tasks)
453 .await?;
454 for result in &task_results {
455 if !result.success {
456 let message = format!(
457 "Sequential task group '{prefix}' halted.\n\n{}",
458 summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
459 );
460 return Err(Error::configuration(message));
461 }
462 }
463 results.extend(task_results);
464 }
465 Ok(results)
466 }
467
468 async fn execute_parallel(
469 &self,
470 prefix: &str,
471 group: &ParallelGroup,
472 all_tasks: &Tasks,
473 ) -> Result<Vec<TaskResult>> {
474 if let Some(default_task) = group.tasks.get("default") {
476 if !self.config.capture_output {
477 cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
478 }
479 let task_name = format!("{}.default", prefix);
482 return self
483 .execute_definition(&task_name, default_task, all_tasks)
484 .await;
485 }
486
487 if !self.config.capture_output {
488 cuenv_events::emit_task_group_started!(prefix, false, group.tasks.len());
489 }
490 let mut join_set = JoinSet::new();
491 let all_tasks = Arc::new(all_tasks.clone());
492 let mut all_results = Vec::new();
493 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
494 if let Some(failed) = results.iter().find(|r| !r.success) {
495 let message = format!(
496 "Parallel task group '{prefix}' halted.\n\n{}",
497 summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
498 );
499 return Err(Error::configuration(message));
500 }
501 all_results.extend(results);
502 Ok(())
503 };
504 for (name, task_def) in &group.tasks {
505 let task_name = format!("{}.{}", prefix, name);
506 let task_def = task_def.clone();
507 let all_tasks = Arc::clone(&all_tasks);
508 let executor = self.clone_with_config();
509 join_set.spawn(async move {
510 executor
511 .execute_definition(&task_name, &task_def, &all_tasks)
512 .await
513 });
514 if self.config.max_parallel > 0
515 && join_set.len() >= self.config.max_parallel
516 && let Some(result) = join_set.join_next().await
517 {
518 match result {
519 Ok(Ok(results)) => merge_results(results)?,
520 Ok(Err(e)) => return Err(e),
521 Err(e) => {
522 return Err(Error::configuration(format!(
523 "Task execution panicked: {}",
524 e
525 )));
526 }
527 }
528 }
529 }
530 while let Some(result) = join_set.join_next().await {
531 match result {
532 Ok(Ok(results)) => merge_results(results)?,
533 Ok(Err(e)) => return Err(e),
534 Err(e) => {
535 return Err(Error::configuration(format!(
536 "Task execution panicked: {}",
537 e
538 )));
539 }
540 }
541 }
542 Ok(all_results)
543 }
544
545 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
546 let parallel_groups = graph.get_parallel_groups()?;
547 let mut all_results = Vec::new();
548
549 for mut group in parallel_groups {
557 let mut join_set = JoinSet::new();
558
559 while !group.is_empty() || !join_set.is_empty() {
560 while let Some(node) = group.pop() {
562 let task = node.task.clone();
563 let name = node.name.clone();
564 let executor = self.clone_with_config();
565 join_set.spawn(async move { executor.execute_task(&name, &task).await });
566
567 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
568 break;
569 }
570 }
571
572 if let Some(result) = join_set.join_next().await {
573 match result {
574 Ok(Ok(task_result)) => {
575 if !task_result.success {
576 join_set.abort_all();
577 let message = format!(
578 "Task graph execution halted.\n\n{}",
579 summarize_task_failure(
580 &task_result,
581 TASK_FAILURE_SNIPPET_LINES,
582 )
583 );
584 return Err(Error::configuration(message));
585 }
586 all_results.push(task_result);
587 }
588 Ok(Err(e)) => {
589 join_set.abort_all();
590 return Err(e);
591 }
592 Err(e) => {
593 join_set.abort_all();
594 return Err(Error::configuration(format!(
595 "Task execution panicked: {}",
596 e
597 )));
598 }
599 }
600 }
601 }
602 }
603
604 Ok(all_results)
605 }
606
607 fn clone_with_config(&self) -> Self {
608 Self::with_shared_backend(self.config.clone(), self.backend.clone())
610 }
611}
612
613fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
614 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
615
616 loop {
617 let is_root = match manager {
618 PackageManager::Npm
619 | PackageManager::Bun
620 | PackageManager::YarnClassic
621 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
622 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
623 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
624 PackageManager::Deno => deno_json_has_workspace(¤t),
625 };
626
627 if is_root {
628 return current;
629 }
630
631 if let Some(parent) = current.parent() {
632 current = parent.to_path_buf();
633 } else {
634 return start.to_path_buf();
635 }
636 }
637}
638
639fn package_json_has_workspaces(dir: &Path) -> bool {
640 let path = dir.join("package.json");
641 let content = std::fs::read_to_string(&path);
642 let Ok(json) = content.and_then(|s| {
643 serde_json::from_str::<serde_json::Value>(&s)
644 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
645 }) else {
646 return false;
647 };
648
649 match json.get("workspaces") {
650 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
651 Some(serde_json::Value::Object(map)) => map
652 .get("packages")
653 .and_then(|packages| packages.as_array())
654 .map(|arr| !arr.is_empty())
655 .unwrap_or(false),
656 _ => false,
657 }
658}
659
660fn cargo_toml_has_workspace(dir: &Path) -> bool {
661 let path = dir.join("Cargo.toml");
662 let Ok(content) = std::fs::read_to_string(&path) else {
663 return false;
664 };
665
666 content.contains("[workspace]")
667}
668
669fn deno_json_has_workspace(dir: &Path) -> bool {
670 let path = dir.join("deno.json");
671 let content = std::fs::read_to_string(&path);
672 let Ok(json) = content.and_then(|s| {
673 serde_json::from_str::<serde_json::Value>(&s)
674 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
675 }) else {
676 return false;
677 };
678
679 match json.get("workspace") {
681 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
682 Some(serde_json::Value::Object(_)) => true,
683 _ => false,
684 }
685}
686
687pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
690 let exit_code = result
691 .exit_code
692 .map(|c| c.to_string())
693 .unwrap_or_else(|| "unknown".to_string());
694
695 let mut sections = Vec::new();
696 sections.push(format!(
697 "Task '{}' failed with exit code {}.",
698 result.name, exit_code
699 ));
700
701 let output = format_failure_streams(result, max_output_lines);
702 if output.is_empty() {
703 sections.push(
704 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
705 .to_string(),
706 );
707 } else {
708 sections.push(output);
709 }
710
711 sections.join("\n\n")
712}
713
714fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
715 let mut streams = Vec::new();
716
717 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
718 streams.push(stdout);
719 }
720
721 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
722 streams.push(stderr);
723 }
724
725 streams.join("\n\n")
726}
727
728fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
729 let normalized = content.trim_end();
730 if normalized.is_empty() {
731 return None;
732 }
733
734 let lines: Vec<&str> = normalized.lines().collect();
735 let total = lines.len();
736 let start = total.saturating_sub(max_output_lines);
737 let snippet = lines[start..].join("\n");
738
739 let header = if total > max_output_lines {
740 format!("{label} (last {max_output_lines} of {total} lines):")
741 } else {
742 format!("{label}:")
743 };
744
745 Some(format!("{header}\n{snippet}"))
746}
747
748pub async fn execute_command(
752 command: &str,
753 args: &[String],
754 environment: &Environment,
755) -> Result<i32> {
756 execute_command_with_redaction(command, args, environment, &[]).await
757}
758
759pub async fn execute_command_with_redaction(
763 command: &str,
764 args: &[String],
765 environment: &Environment,
766 secrets: &[String],
767) -> Result<i32> {
768 use tokio::io::{AsyncBufReadExt, BufReader};
769
770 tracing::info!("Executing command: {} {:?}", command, args);
771 let mut cmd = Command::new(command);
772 cmd.args(args);
773 let env_vars = environment.merge_with_system();
774 for (key, value) in env_vars {
775 cmd.env(key, value);
776 }
777
778 if secrets.is_empty() {
779 cmd.stdout(Stdio::inherit());
781 cmd.stderr(Stdio::inherit());
782 cmd.stdin(Stdio::inherit());
783 let status = cmd.status().await.map_err(|e| {
784 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
785 })?;
786 return Ok(status.code().unwrap_or(1));
787 }
788
789 cmd.stdout(Stdio::piped());
791 cmd.stderr(Stdio::piped());
792 cmd.stdin(Stdio::inherit());
793
794 let mut child = cmd.spawn().map_err(|e| {
795 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
796 })?;
797
798 let stdout = child
799 .stdout
800 .take()
801 .ok_or_else(|| Error::execution("stdout pipe not available"))?;
802 let stderr = child
803 .stderr
804 .take()
805 .ok_or_else(|| Error::execution("stderr pipe not available"))?;
806
807 let mut sorted_secrets: Vec<&str> = secrets.iter().map(String::as_str).collect();
809 sorted_secrets.sort_by_key(|s| std::cmp::Reverse(s.len()));
810 let sorted_secrets: Vec<String> = sorted_secrets.into_iter().map(String::from).collect();
811
812 let secrets_clone = sorted_secrets.clone();
814 let stdout_task = tokio::spawn(async move {
815 let reader = BufReader::new(stdout);
816 let mut lines = reader.lines();
817 while let Ok(Some(line)) = lines.next_line().await {
818 let mut redacted = line;
819 for secret in &secrets_clone {
820 if secret.len() >= 4 {
821 redacted = redacted.replace(secret, "[REDACTED]");
822 }
823 }
824 cuenv_events::emit_stdout!(&redacted);
825 }
826 });
827
828 let stderr_task = tokio::spawn(async move {
830 let reader = BufReader::new(stderr);
831 let mut lines = reader.lines();
832 while let Ok(Some(line)) = lines.next_line().await {
833 let mut redacted = line;
834 for secret in &sorted_secrets {
835 if secret.len() >= 4 {
836 redacted = redacted.replace(secret, "[REDACTED]");
837 }
838 }
839 cuenv_events::emit_stderr!(&redacted);
840 }
841 });
842
843 let status = child.wait().await.map_err(|e| {
845 Error::configuration(format!("Failed to wait for command '{}': {}", command, e))
846 })?;
847
848 let _ = stdout_task.await;
849 let _ = stderr_task.await;
850
851 Ok(status.code().unwrap_or(1))
852}
853
854#[cfg(test)]
855mod tests {
856 use super::*;
857 use crate::tasks::Input;
858 use std::fs;
859 use tempfile::TempDir;
860
861 #[tokio::test]
862 async fn test_executor_config_default() {
863 let config = ExecutorConfig::default();
864 assert!(!config.capture_output);
865 assert_eq!(config.max_parallel, 0);
866 assert!(config.environment.is_empty());
867 }
868
869 #[tokio::test]
870 async fn test_task_result() {
871 let result = TaskResult {
872 name: "test".to_string(),
873 exit_code: Some(0),
874 stdout: "output".to_string(),
875 stderr: String::new(),
876 success: true,
877 };
878 assert_eq!(result.name, "test");
879 assert_eq!(result.exit_code, Some(0));
880 assert!(result.success);
881 assert_eq!(result.stdout, "output");
882 }
883
884 #[tokio::test]
885 async fn test_execute_simple_task() {
886 let config = ExecutorConfig {
887 capture_output: true,
888 ..Default::default()
889 };
890 let executor = TaskExecutor::new(config);
891 let task = Task {
892 command: "echo".to_string(),
893 args: vec!["hello".to_string()],
894 description: Some("Hello task".to_string()),
895 ..Default::default()
896 };
897 let result = executor.execute_task("test", &task).await.unwrap();
898 assert!(result.success);
899 assert_eq!(result.exit_code, Some(0));
900 assert!(result.stdout.contains("hello"));
901 }
902
903 #[tokio::test]
904 async fn test_execute_with_environment() {
905 let mut config = ExecutorConfig {
906 capture_output: true,
907 ..Default::default()
908 };
909 config
910 .environment
911 .set("TEST_VAR".to_string(), "test_value".to_string());
912 let executor = TaskExecutor::new(config);
913 let task = Task {
914 command: "printenv".to_string(),
915 args: vec!["TEST_VAR".to_string()],
916 description: Some("Print env task".to_string()),
917 ..Default::default()
918 };
919 let result = executor.execute_task("test", &task).await.unwrap();
920 assert!(result.success);
921 assert!(result.stdout.contains("test_value"));
922 }
923
924 #[tokio::test]
925 async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
926 let tmp = TempDir::new().unwrap();
927 let root = tmp.path();
928
929 fs::write(
931 root.join("package.json"),
932 r#"{
933 "name": "root-app",
934 "version": "0.0.0",
935 "workspaces": ["packages/*", "apps/*"],
936 "dependencies": {
937 "@rawkodeacademy/content-technologies": "workspace:*"
938 }
939}"#,
940 )
941 .unwrap();
942 fs::write(
945 root.join("bun.lock"),
946 r#"{
947 "lockfileVersion": 1,
948 "workspaces": {
949 "": {
950 "name": "root-app",
951 "dependencies": {
952 "@rawkodeacademy/content-technologies": "workspace:*"
953 }
954 },
955 "packages/content-technologies": {
956 "name": "@rawkodeacademy/content-technologies",
957 "version": "0.0.1"
958 },
959 "apps/site": {
960 "version": "0.0.0",
961 "dependencies": {
962 "@rawkodeacademy/content-technologies": "workspace:*"
963 }
964 }
965 },
966 "packages": {}
967}"#,
968 )
969 .unwrap();
970
971 fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
973 fs::write(
974 root.join("packages/content-technologies/package.json"),
975 r#"{
976 "name": "@rawkodeacademy/content-technologies",
977 "version": "0.0.1"
978}"#,
979 )
980 .unwrap();
981
982 fs::create_dir_all(root.join("apps/site")).unwrap();
983 fs::write(
984 root.join("apps/site/package.json"),
985 r#"{
986 "name": "site",
987 "version": "0.0.0",
988 "dependencies": {
989 "@rawkodeacademy/content-technologies": "workspace:*"
990 }
991}"#,
992 )
993 .unwrap();
994
995 let mut workspaces = HashMap::new();
996 workspaces.insert(
997 "bun".to_string(),
998 WorkspaceConfig {
999 enabled: true,
1000 package_manager: Some("bun".to_string()),
1001 root: None,
1002 hooks: None,
1003 },
1004 );
1005
1006 let config = ExecutorConfig {
1007 capture_output: true,
1008 project_root: root.join("apps/site"),
1009 workspaces: Some(workspaces),
1010 ..Default::default()
1011 };
1012 let executor = TaskExecutor::new(config);
1013
1014 let task = Task {
1015 command: "sh".to_string(),
1016 args: vec![
1017 "-c".to_string(),
1018 "find ../.. -maxdepth 4 -type d | sort".to_string(),
1019 ],
1020 inputs: vec![Input::Path("package.json".to_string())],
1021 workspaces: vec!["bun".to_string()],
1022 ..Default::default()
1023 };
1024
1025 let result = executor.execute_task("install", &task).await.unwrap();
1026 assert!(
1027 result.success,
1028 "command failed stdout='{}' stderr='{}'",
1029 result.stdout, result.stderr
1030 );
1031 assert!(
1032 result
1033 .stdout
1034 .split_whitespace()
1035 .any(|line| line.ends_with("packages/content-technologies")),
1036 "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1037 result.stdout,
1038 result.stderr
1039 );
1040 }
1041
1042 #[tokio::test]
1043 async fn test_execute_failing_task() {
1044 let config = ExecutorConfig {
1045 capture_output: true,
1046 ..Default::default()
1047 };
1048 let executor = TaskExecutor::new(config);
1049 let task = Task {
1050 command: "false".to_string(),
1051 description: Some("Failing task".to_string()),
1052 ..Default::default()
1053 };
1054 let result = executor.execute_task("test", &task).await.unwrap();
1055 assert!(!result.success);
1056 assert_eq!(result.exit_code, Some(1));
1057 }
1058
1059 #[tokio::test]
1060 async fn test_execute_sequential_group() {
1061 let config = ExecutorConfig {
1062 capture_output: true,
1063 ..Default::default()
1064 };
1065 let executor = TaskExecutor::new(config);
1066 let task1 = Task {
1067 command: "echo".to_string(),
1068 args: vec!["first".to_string()],
1069 description: Some("First task".to_string()),
1070 ..Default::default()
1071 };
1072 let task2 = Task {
1073 command: "echo".to_string(),
1074 args: vec!["second".to_string()],
1075 description: Some("Second task".to_string()),
1076 ..Default::default()
1077 };
1078 let group = TaskGroup::Sequential(vec![
1079 TaskDefinition::Single(Box::new(task1)),
1080 TaskDefinition::Single(Box::new(task2)),
1081 ]);
1082 let all_tasks = Tasks::new();
1083 let results = executor
1084 .execute_group("seq", &group, &all_tasks)
1085 .await
1086 .unwrap();
1087 assert_eq!(results.len(), 2);
1088 assert!(results[0].stdout.contains("first"));
1089 assert!(results[1].stdout.contains("second"));
1090 }
1091
1092 #[tokio::test]
1093 async fn test_command_injection_prevention() {
1094 let config = ExecutorConfig {
1095 capture_output: true,
1096 ..Default::default()
1097 };
1098 let executor = TaskExecutor::new(config);
1099 let malicious_task = Task {
1100 command: "echo".to_string(),
1101 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1102 description: Some("Malicious task test".to_string()),
1103 ..Default::default()
1104 };
1105 let result = executor
1106 .execute_task("malicious", &malicious_task)
1107 .await
1108 .unwrap();
1109 assert!(result.success);
1110 assert!(result.stdout.contains("hello ; rm -rf /"));
1111 }
1112
1113 #[tokio::test]
1114 async fn test_special_characters_in_args() {
1115 let config = ExecutorConfig {
1116 capture_output: true,
1117 ..Default::default()
1118 };
1119 let executor = TaskExecutor::new(config);
1120 let special_chars = vec![
1121 "$USER",
1122 "$(whoami)",
1123 "`whoami`",
1124 "&& echo hacked",
1125 "|| echo failed",
1126 "> /tmp/hack",
1127 "| cat",
1128 ];
1129 for special_arg in special_chars {
1130 let task = Task {
1131 command: "echo".to_string(),
1132 args: vec!["safe".to_string(), special_arg.to_string()],
1133 description: Some("Special character test".to_string()),
1134 ..Default::default()
1135 };
1136 let result = executor.execute_task("special", &task).await.unwrap();
1137 assert!(result.success);
1138 assert!(result.stdout.contains("safe"));
1139 assert!(result.stdout.contains(special_arg));
1140 }
1141 }
1142
1143 #[tokio::test]
1144 async fn test_environment_variable_safety() {
1145 let mut config = ExecutorConfig {
1146 capture_output: true,
1147 ..Default::default()
1148 };
1149 config
1150 .environment
1151 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1152 let executor = TaskExecutor::new(config);
1153 let task = Task {
1154 command: "printenv".to_string(),
1155 args: vec!["DANGEROUS_VAR".to_string()],
1156 description: Some("Environment variable safety test".to_string()),
1157 ..Default::default()
1158 };
1159 let result = executor.execute_task("env_test", &task).await.unwrap();
1160 assert!(result.success);
1161 assert!(result.stdout.contains("; rm -rf /"));
1162 }
1163
1164 #[tokio::test]
1165 async fn test_execute_graph_parallel_groups() {
1166 let config = ExecutorConfig {
1168 capture_output: true,
1169 max_parallel: 2,
1170 ..Default::default()
1171 };
1172 let executor = TaskExecutor::new(config);
1173 let mut graph = TaskGraph::new();
1174
1175 let t1 = Task {
1176 command: "echo".into(),
1177 args: vec!["A".into()],
1178 ..Default::default()
1179 };
1180 let t2 = Task {
1181 command: "echo".into(),
1182 args: vec!["B".into()],
1183 ..Default::default()
1184 };
1185
1186 graph.add_task("t1", t1).unwrap();
1187 graph.add_task("t2", t2).unwrap();
1188 let results = executor.execute_graph(&graph).await.unwrap();
1189 assert_eq!(results.len(), 2);
1190 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1191 assert!(joined.contains("A") && joined.contains("B"));
1192 }
1193
1194 #[tokio::test]
1195 async fn test_execute_graph_respects_dependency_levels() {
1196 let tmp = TempDir::new().unwrap();
1197 let root = tmp.path();
1198
1199 let config = ExecutorConfig {
1200 capture_output: true,
1201 max_parallel: 2,
1202 project_root: root.to_path_buf(),
1203 ..Default::default()
1204 };
1205 let executor = TaskExecutor::new(config);
1206
1207 let mut tasks = Tasks::new();
1208 tasks.tasks.insert(
1209 "dep".into(),
1210 TaskDefinition::Single(Box::new(Task {
1211 command: "sh".into(),
1212 args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
1213 ..Default::default()
1214 })),
1215 );
1216 tasks.tasks.insert(
1217 "consumer".into(),
1218 TaskDefinition::Single(Box::new(Task {
1219 command: "sh".into(),
1220 args: vec!["-c".into(), "cat marker.txt".into()],
1221 depends_on: vec!["dep".into()],
1222 ..Default::default()
1223 })),
1224 );
1225
1226 let mut graph = TaskGraph::new();
1227 graph.build_for_task("consumer", &tasks).unwrap();
1228
1229 let results = executor.execute_graph(&graph).await.unwrap();
1230 assert_eq!(results.len(), 2);
1231
1232 let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
1233 assert!(consumer.success);
1234 assert!(consumer.stdout.contains("ok"));
1235 }
1236}