1use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
8use super::process_registry::global_registry;
9use super::{Task, TaskGraph, TaskGroup, TaskNode, Tasks};
10use crate::OutputCapture;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::{Error, Result};
14use async_recursion::async_recursion;
15use cuenv_workspaces::PackageManager;
16use std::path::{Path, PathBuf};
17use std::process::Stdio;
18use std::sync::Arc;
19use tokio::process::Command;
20use tokio::task::JoinSet;
21use tracing::instrument;
22
23#[cfg(unix)]
26#[allow(unused_imports)]
27use std::os::unix::process::CommandExt;
28
29#[cfg(unix)]
34fn setup_process_group(cmd: &mut Command) {
35 #[expect(unsafe_code, reason = "Required for POSIX process group management")]
39 unsafe {
40 cmd.pre_exec(|| {
41 libc::setpgid(0, 0);
42 Ok(())
43 });
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct TaskResult {
50 pub name: String,
51 pub exit_code: Option<i32>,
52 pub stdout: String,
53 pub stderr: String,
54 pub success: bool,
55}
56
57pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
59
60#[derive(Debug, Clone)]
62pub struct ExecutorConfig {
63 pub capture_output: OutputCapture,
65 pub max_parallel: usize,
67 pub environment: Environment,
69 pub working_dir: Option<PathBuf>,
71 pub project_root: PathBuf,
73 pub cue_module_root: Option<PathBuf>,
75 pub materialize_outputs: Option<PathBuf>,
77 pub cache_dir: Option<PathBuf>,
79 pub show_cache_path: bool,
81 pub backend_config: Option<BackendConfig>,
83 pub cli_backend: Option<String>,
85}
86
87impl Default for ExecutorConfig {
88 fn default() -> Self {
89 Self {
90 capture_output: OutputCapture::Capture,
91 max_parallel: 0,
92 environment: Environment::new(),
93 working_dir: None,
94 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
95 cue_module_root: None,
96 materialize_outputs: None,
97 cache_dir: None,
98 show_cache_path: false,
99 backend_config: None,
100 cli_backend: None,
101 }
102 }
103}
104
105pub struct TaskExecutor {
107 config: ExecutorConfig,
108 backend: Arc<dyn TaskBackend>,
109}
110impl TaskExecutor {
111 pub fn new(config: ExecutorConfig) -> Self {
113 Self::with_dagger_factory(config, None)
114 }
115
116 pub fn with_dagger_factory(
120 config: ExecutorConfig,
121 dagger_factory: Option<BackendFactory>,
122 ) -> Self {
123 let backend = create_backend_with_factory(
124 config.backend_config.as_ref(),
125 config.project_root.clone(),
126 config.cli_backend.as_deref(),
127 dagger_factory,
128 );
129 Self { config, backend }
130 }
131
132 fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
134 Self { config, backend }
135 }
136
137 #[instrument(name = "execute_task", skip(self, task), fields(task_name = %name))]
139 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
140 if self.backend.name() == "dagger" {
145 let ctx = super::backend::TaskExecutionContext {
146 name,
147 task,
148 environment: &self.config.environment,
149 project_root: &self.config.project_root,
150 capture_output: self.config.capture_output,
151 };
152 return self.backend.execute(&ctx).await;
153 }
154
155 self.execute_task_non_hermetic(name, task).await
157 }
158
159 async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
163 if task.is_task_ref() && task.project_root.is_none() {
165 return Err(Error::configuration(format!(
166 "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
167 This usually means:\n\
168 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
169 - The referenced task '{}' doesn't exist in that project\n\
170 - There was an error loading the referenced project's env.cue\n\
171 Run with RUST_LOG=debug for more details.",
172 name,
173 task.task_ref.as_deref().unwrap_or("unknown"),
174 task.task_ref
175 .as_deref()
176 .and_then(|r| r.split(':').next_back())
177 .unwrap_or("unknown")
178 )));
179 }
180
181 let workdir = if let Some(ref dir) = task.directory {
188 self.config
190 .cue_module_root
191 .as_ref()
192 .unwrap_or(&self.config.project_root)
193 .join(dir)
194 } else if let Some(ref project_root) = task.project_root {
195 project_root.clone()
197 } else if let Some(ref source) = task.source {
198 if let Some(dir) = source.directory() {
200 self.config
201 .cue_module_root
202 .as_ref()
203 .unwrap_or(&self.config.project_root)
204 .join(dir)
205 } else {
206 self.config
210 .cue_module_root
211 .clone()
212 .unwrap_or_else(|| self.config.project_root.clone())
213 }
214 } else if !task.hermetic {
215 if let Some(manager) = cuenv_workspaces::detect_from_command(&task.command) {
217 find_workspace_root(manager, &self.config.project_root)
218 } else {
219 self.config.project_root.clone()
220 }
221 } else {
222 self.config.project_root.clone()
223 };
224
225 tracing::info!(
226 task = %name,
227 workdir = %workdir.display(),
228 hermetic = false,
229 "Executing non-hermetic task"
230 );
231
232 let cmd_str = if let Some(script) = &task.script {
235 format!("[script: {} bytes]", script.len())
236 } else if task.command.is_empty() {
237 task.args.join(" ")
238 } else {
239 format!("{} {}", task.command, task.args.join(" "))
240 };
241
242 cuenv_events::emit_task_started!(name, cmd_str, false);
243
244 let mut cmd = if let Some(script) = &task.script {
246 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
248 (
249 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
250 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
251 )
252 } else {
253 ("bash".to_string(), "-c".to_string())
255 };
256
257 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
258 let mut cmd = Command::new(&resolved_shell);
259 cmd.arg(&shell_flag);
260 cmd.arg(script);
261 cmd
262 } else {
263 let resolved_command = self.config.environment.resolve_command(&task.command);
265
266 if let Some(shell) = &task.shell {
267 if let (Some(shell_command), Some(shell_flag)) = (&shell.command, &shell.flag) {
268 let resolved_shell = self.config.environment.resolve_command(shell_command);
269 let mut cmd = Command::new(&resolved_shell);
270 cmd.arg(shell_flag);
271 if task.args.is_empty() {
272 cmd.arg(&resolved_command);
273 } else {
274 let full_command = if task.command.is_empty() {
275 task.args.join(" ")
276 } else {
277 format!("{} {}", resolved_command, task.args.join(" "))
278 };
279 cmd.arg(full_command);
280 }
281 cmd
282 } else {
283 let mut cmd = Command::new(&resolved_command);
284 for arg in &task.args {
285 cmd.arg(arg);
286 }
287 cmd
288 }
289 } else {
290 let mut cmd = Command::new(&resolved_command);
291 for arg in &task.args {
292 cmd.arg(arg);
293 }
294 cmd
295 }
296 };
297
298 cmd.current_dir(&workdir);
300 let env_vars = self.config.environment.merge_with_system_hermetic();
301 for (k, v) in &env_vars {
302 cmd.env(k, v);
303 }
304
305 if !env_vars.contains_key("FORCE_COLOR") {
308 cmd.env("FORCE_COLOR", "1");
309 }
310 if !env_vars.contains_key("CLICOLOR_FORCE") {
311 cmd.env("CLICOLOR_FORCE", "1");
312 }
313
314 if self.config.capture_output.should_capture() {
317 use tokio::io::{AsyncBufReadExt, BufReader};
318
319 let start_time = std::time::Instant::now();
320
321 #[cfg(unix)]
323 setup_process_group(&mut cmd);
324
325 let mut child = cmd
327 .stdout(Stdio::piped())
328 .stderr(Stdio::piped())
329 .spawn()
330 .map_err(|e| Error::Io {
331 source: e,
332 path: None,
333 operation: format!("spawn task {}", name),
334 })?;
335
336 let child_pid = child.id();
338 if let Some(pid) = child_pid {
339 global_registry().register(pid, name.to_string()).await;
340 }
341
342 let stdout_handle = child.stdout.take();
344 let stderr_handle = child.stderr.take();
345
346 let mut stdout_lines = Vec::new();
348 let mut stderr_lines = Vec::new();
349
350 let name_for_stdout = name.to_string();
352 let stdout_task = tokio::spawn(async move {
353 let mut lines = Vec::new();
354 if let Some(stdout) = stdout_handle {
355 let mut reader = BufReader::new(stdout).lines();
356 while let Ok(Some(line)) = reader.next_line().await {
357 cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
358 lines.push(line);
359 }
360 }
361 lines
362 });
363
364 let name_for_stderr = name.to_string();
366 let stderr_task = tokio::spawn(async move {
367 let mut lines = Vec::new();
368 if let Some(stderr) = stderr_handle {
369 let mut reader = BufReader::new(stderr).lines();
370 while let Ok(Some(line)) = reader.next_line().await {
371 cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
372 lines.push(line);
373 }
374 }
375 lines
376 });
377
378 let status = child.wait().await.map_err(|e| Error::Io {
380 source: e,
381 path: None,
382 operation: format!("wait for task {}", name),
383 })?;
384
385 if let Some(pid) = child_pid {
387 global_registry().unregister(pid).await;
388 }
389
390 if let Ok(lines) = stdout_task.await {
392 stdout_lines = lines;
393 }
394 if let Ok(lines) = stderr_task.await {
395 stderr_lines = lines;
396 }
397
398 let duration_ms = start_time.elapsed().as_millis() as u64;
399 let stdout = stdout_lines.join("\n");
400 let stderr = stderr_lines.join("\n");
401 let exit_code = status.code().unwrap_or(-1);
402 let success = status.success();
403
404 cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
406
407 if !success {
408 tracing::warn!(task = %name, exit = exit_code, "Task failed");
409 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
410 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
411 }
412
413 Ok(TaskResult {
414 name: name.to_string(),
415 exit_code: Some(exit_code),
416 stdout,
417 stderr,
418 success,
419 })
420 } else {
421 #[cfg(unix)]
425 setup_process_group(&mut cmd);
426
427 let mut child = cmd
429 .stdout(Stdio::inherit())
430 .stderr(Stdio::inherit())
431 .stdin(Stdio::inherit())
432 .spawn()
433 .map_err(|e| Error::Io {
434 source: e,
435 path: None,
436 operation: format!("spawn task {}", name),
437 })?;
438
439 let child_pid = child.id();
441 if let Some(pid) = child_pid {
442 global_registry().register(pid, name.to_string()).await;
443 }
444
445 let status = child.wait().await.map_err(|e| Error::Io {
446 source: e,
447 path: None,
448 operation: format!("wait for task {}", name),
449 })?;
450
451 if let Some(pid) = child_pid {
453 global_registry().unregister(pid).await;
454 }
455
456 let exit_code = status.code().unwrap_or(-1);
457 let success = status.success();
458
459 if !success {
460 tracing::warn!(task = %name, exit = exit_code, "Task failed");
461 }
462
463 Ok(TaskResult {
464 name: name.to_string(),
465 exit_code: Some(exit_code),
466 stdout: String::new(), stderr: String::new(),
468 success,
469 })
470 }
471 }
472
473 #[async_recursion]
475 pub async fn execute_node(
476 &self,
477 name: &str,
478 node: &TaskNode,
479 all_tasks: &Tasks,
480 ) -> Result<Vec<TaskResult>> {
481 match node {
482 TaskNode::Task(task) => {
483 let result = self.execute_task(name, task.as_ref()).await?;
484 Ok(vec![result])
485 }
486 TaskNode::Group(group) => self.execute_parallel(name, group, all_tasks).await,
487 TaskNode::Sequence(seq) => self.execute_sequential(name, seq, all_tasks).await,
488 }
489 }
490
491 #[async_recursion]
493 pub async fn execute_definition(
494 &self,
495 name: &str,
496 node: &TaskNode,
497 all_tasks: &Tasks,
498 ) -> Result<Vec<TaskResult>> {
499 self.execute_node(name, node, all_tasks).await
500 }
501
502 async fn execute_sequential(
503 &self,
504 prefix: &str,
505 sequence: &[TaskNode],
506 all_tasks: &Tasks,
507 ) -> Result<Vec<TaskResult>> {
508 if !self.config.capture_output.should_capture() {
509 cuenv_events::emit_task_group_started!(prefix, true, sequence.len());
510 }
511 let mut results = Vec::new();
512 for (i, step) in sequence.iter().enumerate() {
513 let task_name = format!("{}[{}]", prefix, i);
514 let task_results = self.execute_node(&task_name, step, all_tasks).await?;
515 for result in &task_results {
516 if !result.success {
518 return Err(Error::task_failed(
519 &result.name,
520 result.exit_code.unwrap_or(-1),
521 &result.stdout,
522 &result.stderr,
523 ));
524 }
525 }
526 results.extend(task_results);
527 }
528 Ok(results)
529 }
530
531 async fn execute_parallel(
532 &self,
533 prefix: &str,
534 group: &TaskGroup,
535 all_tasks: &Tasks,
536 ) -> Result<Vec<TaskResult>> {
537 if let Some(default_task) = group.children.get("default") {
539 if !self.config.capture_output.should_capture() {
540 cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
541 }
542 let task_name = format!("{}.default", prefix);
545 return self.execute_node(&task_name, default_task, all_tasks).await;
546 }
547
548 if !self.config.capture_output.should_capture() {
549 cuenv_events::emit_task_group_started!(prefix, false, group.children.len());
550 }
551 let mut join_set = JoinSet::new();
552 let all_tasks = Arc::new(all_tasks.clone());
553 let mut all_results = Vec::new();
554 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
555 if let Some(failed) = results.iter().find(|r| !r.success) {
556 return Err(Error::task_failed(
557 &failed.name,
558 failed.exit_code.unwrap_or(-1),
559 &failed.stdout,
560 &failed.stderr,
561 ));
562 }
563 all_results.extend(results);
564 Ok(())
565 };
566 for (name, child_node) in &group.children {
567 let task_name = format!("{}.{}", prefix, name);
568 let child_node = child_node.clone();
569 let all_tasks = Arc::clone(&all_tasks);
570 let executor = self.clone_with_config();
571 join_set.spawn(async move {
572 executor
573 .execute_node(&task_name, &child_node, &all_tasks)
574 .await
575 });
576 if self.config.max_parallel > 0
577 && join_set.len() >= self.config.max_parallel
578 && let Some(result) = join_set.join_next().await
579 {
580 match result {
581 Ok(Ok(results)) => merge_results(results)?,
582 Ok(Err(e)) => return Err(e),
583 Err(e) => {
584 return Err(Error::execution(format!("Task execution panicked: {}", e)));
585 }
586 }
587 }
588 }
589 while let Some(result) = join_set.join_next().await {
590 match result {
591 Ok(Ok(results)) => merge_results(results)?,
592 Ok(Err(e)) => return Err(e),
593 Err(e) => {
594 return Err(Error::execution(format!("Task execution panicked: {}", e)));
595 }
596 }
597 }
598 Ok(all_results)
599 }
600
601 #[instrument(name = "execute_graph", skip(self, graph), fields(task_count = graph.task_count()))]
602 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
603 let parallel_groups = graph.get_parallel_groups()?;
604 let mut all_results = Vec::new();
605
606 for mut group in parallel_groups {
614 let mut join_set = JoinSet::new();
615
616 while !group.is_empty() || !join_set.is_empty() {
617 while let Some(node) = group.pop() {
619 let task = node.task.clone();
620 let name = node.name.clone();
621 let executor = self.clone_with_config();
622 join_set.spawn(async move { executor.execute_task(&name, &task).await });
623
624 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
625 break;
626 }
627 }
628
629 if let Some(result) = join_set.join_next().await {
630 match result {
631 Ok(Ok(task_result)) => {
632 if !task_result.success {
633 join_set.abort_all();
634 return Err(Error::task_failed(
635 &task_result.name,
636 task_result.exit_code.unwrap_or(-1),
637 &task_result.stdout,
638 &task_result.stderr,
639 ));
640 }
641 all_results.push(task_result);
642 }
643 Ok(Err(e)) => {
644 join_set.abort_all();
645 return Err(e);
646 }
647 Err(e) => {
648 join_set.abort_all();
649 return Err(Error::execution(format!(
650 "Task execution panicked: {}",
651 e
652 )));
653 }
654 }
655 }
656 }
657 }
658
659 Ok(all_results)
660 }
661
662 fn clone_with_config(&self) -> Self {
663 Self::with_shared_backend(self.config.clone(), self.backend.clone())
665 }
666}
667
668fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
669 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
670
671 loop {
672 let is_root = match manager {
673 PackageManager::Npm
674 | PackageManager::Bun
675 | PackageManager::YarnClassic
676 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
677 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
678 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
679 PackageManager::Deno => deno_json_has_workspace(¤t),
680 };
681
682 if is_root {
683 return current;
684 }
685
686 if let Some(parent) = current.parent() {
687 current = parent.to_path_buf();
688 } else {
689 return start.to_path_buf();
690 }
691 }
692}
693
694fn package_json_has_workspaces(dir: &Path) -> bool {
695 let path = dir.join("package.json");
696 let content = std::fs::read_to_string(&path);
697 let Ok(json) = content.and_then(|s| {
698 serde_json::from_str::<serde_json::Value>(&s)
699 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
700 }) else {
701 return false;
702 };
703
704 match json.get("workspaces") {
705 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
706 Some(serde_json::Value::Object(map)) => map
707 .get("packages")
708 .and_then(|packages| packages.as_array())
709 .map(|arr| !arr.is_empty())
710 .unwrap_or(false),
711 _ => false,
712 }
713}
714
715fn cargo_toml_has_workspace(dir: &Path) -> bool {
716 let path = dir.join("Cargo.toml");
717 let Ok(content) = std::fs::read_to_string(&path) else {
718 return false;
719 };
720
721 content.contains("[workspace]")
722}
723
724fn deno_json_has_workspace(dir: &Path) -> bool {
725 let path = dir.join("deno.json");
726 let content = std::fs::read_to_string(&path);
727 let Ok(json) = content.and_then(|s| {
728 serde_json::from_str::<serde_json::Value>(&s)
729 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
730 }) else {
731 return false;
732 };
733
734 match json.get("workspace") {
736 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
737 Some(serde_json::Value::Object(_)) => true,
738 _ => false,
739 }
740}
741
742pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
745 let exit_code = result
746 .exit_code
747 .map(|c| c.to_string())
748 .unwrap_or_else(|| "unknown".to_string());
749
750 let mut sections = Vec::new();
751 sections.push(format!(
752 "Task '{}' failed with exit code {}.",
753 result.name, exit_code
754 ));
755
756 let output = format_failure_streams(result, max_output_lines);
757 if output.is_empty() {
758 sections.push(
759 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
760 .to_string(),
761 );
762 } else {
763 sections.push(output);
764 }
765
766 sections.join("\n\n")
767}
768
769fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
770 let mut streams = Vec::new();
771
772 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
773 streams.push(stdout);
774 }
775
776 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
777 streams.push(stderr);
778 }
779
780 streams.join("\n\n")
781}
782
783fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
784 let normalized = content.trim_end();
785 if normalized.is_empty() {
786 return None;
787 }
788
789 let lines: Vec<&str> = normalized.lines().collect();
790 let total = lines.len();
791 let start = total.saturating_sub(max_output_lines);
792 let snippet = lines[start..].join("\n");
793
794 let header = if total > max_output_lines {
795 format!("{label} (last {max_output_lines} of {total} lines):")
796 } else {
797 format!("{label}:")
798 };
799
800 Some(format!("{header}\n{snippet}"))
801}
802
803pub async fn execute_command(
807 command: &str,
808 args: &[String],
809 environment: &Environment,
810) -> Result<i32> {
811 execute_command_with_redaction(command, args, environment, &[]).await
812}
813
814pub async fn execute_command_with_redaction(
818 command: &str,
819 args: &[String],
820 environment: &Environment,
821 secrets: &[String],
822) -> Result<i32> {
823 use tokio::io::{AsyncBufReadExt, BufReader};
824
825 tracing::info!("Executing command: {} {:?}", command, args);
826 let mut cmd = Command::new(command);
827 cmd.args(args);
828 let env_vars = environment.merge_with_system_hermetic();
830 for (key, value) in env_vars {
831 cmd.env(key, value);
832 }
833
834 if secrets.is_empty() {
835 cmd.stdout(Stdio::inherit());
837 cmd.stderr(Stdio::inherit());
838 cmd.stdin(Stdio::inherit());
839 let status = cmd.status().await.map_err(|e| {
840 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
841 })?;
842 return Ok(status.code().unwrap_or(1));
843 }
844
845 cmd.stdout(Stdio::piped());
847 cmd.stderr(Stdio::piped());
848 cmd.stdin(Stdio::inherit());
849
850 let mut child = cmd.spawn().map_err(|e| {
851 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
852 })?;
853
854 let stdout = child
855 .stdout
856 .take()
857 .ok_or_else(|| Error::execution("stdout pipe not available"))?;
858 let stderr = child
859 .stderr
860 .take()
861 .ok_or_else(|| Error::execution("stderr pipe not available"))?;
862
863 let mut sorted_secrets: Vec<&str> = secrets.iter().map(String::as_str).collect();
865 sorted_secrets.sort_by_key(|s| std::cmp::Reverse(s.len()));
866 let sorted_secrets: Vec<String> = sorted_secrets.into_iter().map(String::from).collect();
867
868 let secrets_clone = sorted_secrets.clone();
870 let stdout_task = tokio::spawn(async move {
871 let reader = BufReader::new(stdout);
872 let mut lines = reader.lines();
873 while let Ok(Some(line)) = lines.next_line().await {
874 let mut redacted = line;
875 for secret in &secrets_clone {
876 if secret.len() >= 4 {
877 redacted = redacted.replace(secret, "[REDACTED]");
878 }
879 }
880 cuenv_events::emit_stdout!(&redacted);
881 }
882 });
883
884 let stderr_task = tokio::spawn(async move {
886 let reader = BufReader::new(stderr);
887 let mut lines = reader.lines();
888 while let Ok(Some(line)) = lines.next_line().await {
889 let mut redacted = line;
890 for secret in &sorted_secrets {
891 if secret.len() >= 4 {
892 redacted = redacted.replace(secret, "[REDACTED]");
893 }
894 }
895 cuenv_events::emit_stderr!(&redacted);
896 }
897 });
898
899 let status = child.wait().await.map_err(|e| {
901 Error::configuration(format!("Failed to wait for command '{}': {}", command, e))
902 })?;
903
904 let _ = stdout_task.await;
905 let _ = stderr_task.await;
906
907 Ok(status.code().unwrap_or(1))
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913 use crate::tasks::TaskDependency;
914 use tempfile::TempDir;
915
916 #[tokio::test]
917 async fn test_executor_config_default() {
918 let config = ExecutorConfig::default();
919 assert!(config.capture_output.should_capture());
920 assert_eq!(config.max_parallel, 0);
921 assert!(config.environment.is_empty());
922 }
923
924 #[tokio::test]
925 async fn test_task_result() {
926 let result = TaskResult {
927 name: "test".to_string(),
928 exit_code: Some(0),
929 stdout: "output".to_string(),
930 stderr: String::new(),
931 success: true,
932 };
933 assert_eq!(result.name, "test");
934 assert_eq!(result.exit_code, Some(0));
935 assert!(result.success);
936 assert_eq!(result.stdout, "output");
937 }
938
939 #[tokio::test]
940 async fn test_execute_simple_task() {
941 let config = ExecutorConfig {
942 capture_output: OutputCapture::Capture,
943 ..Default::default()
944 };
945 let executor = TaskExecutor::new(config);
946 let task = Task {
947 command: "echo".to_string(),
948 args: vec!["hello".to_string()],
949 description: Some("Hello task".to_string()),
950 ..Default::default()
951 };
952 let result = executor.execute_task("test", &task).await.unwrap();
953 assert!(result.success);
954 assert_eq!(result.exit_code, Some(0));
955 assert!(result.stdout.contains("hello"));
956 }
957
958 #[tokio::test]
959 async fn test_execute_with_environment() {
960 let mut config = ExecutorConfig {
961 capture_output: OutputCapture::Capture,
962 ..Default::default()
963 };
964 config
965 .environment
966 .set("TEST_VAR".to_string(), "test_value".to_string());
967 let executor = TaskExecutor::new(config);
968 let task = Task {
969 command: "printenv".to_string(),
970 args: vec!["TEST_VAR".to_string()],
971 description: Some("Print env task".to_string()),
972 ..Default::default()
973 };
974 let result = executor.execute_task("test", &task).await.unwrap();
975 assert!(result.success);
976 assert!(result.stdout.contains("test_value"));
977 }
978
979 #[tokio::test]
980 async fn test_execute_failing_task() {
981 let config = ExecutorConfig {
982 capture_output: OutputCapture::Capture,
983 ..Default::default()
984 };
985 let executor = TaskExecutor::new(config);
986 let task = Task {
987 command: "false".to_string(),
988 description: Some("Failing task".to_string()),
989 ..Default::default()
990 };
991 let result = executor.execute_task("test", &task).await.unwrap();
992 assert!(!result.success);
993 assert_eq!(result.exit_code, Some(1));
994 }
995
996 #[tokio::test]
997 async fn test_execute_sequential_group() {
998 let config = ExecutorConfig {
999 capture_output: OutputCapture::Capture,
1000 ..Default::default()
1001 };
1002 let executor = TaskExecutor::new(config);
1003 let task1 = Task {
1004 command: "echo".to_string(),
1005 args: vec!["first".to_string()],
1006 description: Some("First task".to_string()),
1007 ..Default::default()
1008 };
1009 let task2 = Task {
1010 command: "echo".to_string(),
1011 args: vec!["second".to_string()],
1012 description: Some("Second task".to_string()),
1013 ..Default::default()
1014 };
1015 let sequence = vec![
1016 TaskNode::Task(Box::new(task1)),
1017 TaskNode::Task(Box::new(task2)),
1018 ];
1019 let all_tasks = Tasks::new();
1020 let node = TaskNode::Sequence(sequence);
1021 let results = executor
1022 .execute_node("seq", &node, &all_tasks)
1023 .await
1024 .unwrap();
1025 assert_eq!(results.len(), 2);
1026 assert!(results[0].stdout.contains("first"));
1027 assert!(results[1].stdout.contains("second"));
1028 }
1029
1030 #[tokio::test]
1031 async fn test_command_injection_prevention() {
1032 let config = ExecutorConfig {
1033 capture_output: OutputCapture::Capture,
1034 ..Default::default()
1035 };
1036 let executor = TaskExecutor::new(config);
1037 let malicious_task = Task {
1038 command: "echo".to_string(),
1039 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1040 description: Some("Malicious task test".to_string()),
1041 ..Default::default()
1042 };
1043 let result = executor
1044 .execute_task("malicious", &malicious_task)
1045 .await
1046 .unwrap();
1047 assert!(result.success);
1048 assert!(result.stdout.contains("hello ; rm -rf /"));
1049 }
1050
1051 #[tokio::test]
1052 async fn test_special_characters_in_args() {
1053 let config = ExecutorConfig {
1054 capture_output: OutputCapture::Capture,
1055 ..Default::default()
1056 };
1057 let executor = TaskExecutor::new(config);
1058 let special_chars = vec![
1059 "$USER",
1060 "$(whoami)",
1061 "`whoami`",
1062 "&& echo hacked",
1063 "|| echo failed",
1064 "> /tmp/hack",
1065 "| cat",
1066 ];
1067 for special_arg in special_chars {
1068 let task = Task {
1069 command: "echo".to_string(),
1070 args: vec!["safe".to_string(), special_arg.to_string()],
1071 description: Some("Special character test".to_string()),
1072 ..Default::default()
1073 };
1074 let result = executor.execute_task("special", &task).await.unwrap();
1075 assert!(result.success);
1076 assert!(result.stdout.contains("safe"));
1077 assert!(result.stdout.contains(special_arg));
1078 }
1079 }
1080
1081 #[tokio::test]
1082 async fn test_environment_variable_safety() {
1083 let mut config = ExecutorConfig {
1084 capture_output: OutputCapture::Capture,
1085 ..Default::default()
1086 };
1087 config
1088 .environment
1089 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1090 let executor = TaskExecutor::new(config);
1091 let task = Task {
1092 command: "printenv".to_string(),
1093 args: vec!["DANGEROUS_VAR".to_string()],
1094 description: Some("Environment variable safety test".to_string()),
1095 ..Default::default()
1096 };
1097 let result = executor.execute_task("env_test", &task).await.unwrap();
1098 assert!(result.success);
1099 assert!(result.stdout.contains("; rm -rf /"));
1100 }
1101
1102 #[tokio::test]
1103 async fn test_execute_graph_parallel_groups() {
1104 let config = ExecutorConfig {
1106 capture_output: OutputCapture::Capture,
1107 max_parallel: 2,
1108 ..Default::default()
1109 };
1110 let executor = TaskExecutor::new(config);
1111 let mut graph = TaskGraph::new();
1112
1113 let t1 = Task {
1114 command: "echo".into(),
1115 args: vec!["A".into()],
1116 ..Default::default()
1117 };
1118 let t2 = Task {
1119 command: "echo".into(),
1120 args: vec!["B".into()],
1121 ..Default::default()
1122 };
1123
1124 graph.add_task("t1", t1).unwrap();
1125 graph.add_task("t2", t2).unwrap();
1126 let results = executor.execute_graph(&graph).await.unwrap();
1127 assert_eq!(results.len(), 2);
1128 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1129 assert!(joined.contains("A") && joined.contains("B"));
1130 }
1131
1132 #[tokio::test]
1133 async fn test_execute_graph_respects_dependency_levels() {
1134 let tmp = TempDir::new().unwrap();
1135 let root = tmp.path();
1136
1137 let config = ExecutorConfig {
1138 capture_output: OutputCapture::Capture,
1139 max_parallel: 2,
1140 project_root: root.to_path_buf(),
1141 ..Default::default()
1142 };
1143 let executor = TaskExecutor::new(config);
1144
1145 let mut tasks = Tasks::new();
1146 tasks.tasks.insert(
1147 "dep".into(),
1148 TaskNode::Task(Box::new(Task {
1149 command: "sh".into(),
1150 args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
1151 ..Default::default()
1152 })),
1153 );
1154 tasks.tasks.insert(
1155 "consumer".into(),
1156 TaskNode::Task(Box::new(Task {
1157 command: "sh".into(),
1158 args: vec!["-c".into(), "cat marker.txt".into()],
1159 depends_on: vec![TaskDependency::from_name("dep")],
1160 ..Default::default()
1161 })),
1162 );
1163
1164 let mut graph = TaskGraph::new();
1165 graph.build_for_task("consumer", &tasks).unwrap();
1166
1167 let results = executor.execute_graph(&graph).await.unwrap();
1168 assert_eq!(results.len(), 2);
1169
1170 let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
1171 assert!(consumer.success);
1172 assert!(consumer.stdout.contains("ok"));
1173 }
1174
1175 #[test]
1176 fn test_summarize_task_failure_with_exit_code() {
1177 let result = TaskResult {
1178 name: "build".to_string(),
1179 exit_code: Some(127),
1180 stdout: String::new(),
1181 stderr: "command not found".to_string(),
1182 success: false,
1183 };
1184 let summary = summarize_task_failure(&result, 10);
1185 assert!(summary.contains("build"));
1186 assert!(summary.contains("127"));
1187 assert!(summary.contains("command not found"));
1188 }
1189
1190 #[test]
1191 fn test_summarize_task_failure_no_exit_code() {
1192 let result = TaskResult {
1193 name: "killed".to_string(),
1194 exit_code: None,
1195 stdout: String::new(),
1196 stderr: String::new(),
1197 success: false,
1198 };
1199 let summary = summarize_task_failure(&result, 10);
1200 assert!(summary.contains("killed"));
1201 assert!(summary.contains("unknown"));
1202 }
1203
1204 #[test]
1205 fn test_summarize_task_failure_no_output() {
1206 let result = TaskResult {
1207 name: "silent".to_string(),
1208 exit_code: Some(1),
1209 stdout: String::new(),
1210 stderr: String::new(),
1211 success: false,
1212 };
1213 let summary = summarize_task_failure(&result, 10);
1214 assert!(summary.contains("No stdout/stderr"));
1215 assert!(summary.contains("RUST_LOG=debug"));
1216 }
1217
1218 #[test]
1219 fn test_summarize_task_failure_truncates_long_output() {
1220 let result = TaskResult {
1221 name: "verbose".to_string(),
1222 exit_code: Some(1),
1223 stdout: (1..=50)
1224 .map(|i| format!("line {}", i))
1225 .collect::<Vec<_>>()
1226 .join("\n"),
1227 stderr: String::new(),
1228 success: false,
1229 };
1230 let summary = summarize_task_failure(&result, 10);
1231 assert!(summary.contains("last 10 of 50 lines"));
1232 assert!(summary.contains("line 50"));
1233 assert!(!summary.contains("line 1\n")); }
1235
1236 #[test]
1237 fn test_summarize_stream_empty() {
1238 assert!(summarize_stream("test", "", 10).is_none());
1239 assert!(summarize_stream("test", " ", 10).is_none());
1240 assert!(summarize_stream("test", "\n\n", 10).is_none());
1241 }
1242
1243 #[test]
1244 fn test_summarize_stream_short() {
1245 let result = summarize_stream("stdout", "line 1\nline 2", 10).unwrap();
1246 assert!(result.contains("stdout:"));
1247 assert!(result.contains("line 1"));
1248 assert!(result.contains("line 2"));
1249 assert!(!result.contains("last"));
1250 }
1251
1252 #[test]
1253 fn test_format_failure_streams_both() {
1254 let result = TaskResult {
1255 name: "test".to_string(),
1256 exit_code: Some(1),
1257 stdout: "stdout content".to_string(),
1258 stderr: "stderr content".to_string(),
1259 success: false,
1260 };
1261 let formatted = format_failure_streams(&result, 10);
1262 assert!(formatted.contains("stdout:"));
1263 assert!(formatted.contains("stderr:"));
1264 assert!(formatted.contains("stdout content"));
1265 assert!(formatted.contains("stderr content"));
1266 }
1267
1268 #[test]
1269 fn test_find_workspace_root_with_npm() {
1270 let tmp = TempDir::new().unwrap();
1271 std::fs::write(
1273 tmp.path().join("package.json"),
1274 r#"{"workspaces": ["packages/*"]}"#,
1275 )
1276 .unwrap();
1277 let subdir = tmp.path().join("packages").join("subpkg");
1278 std::fs::create_dir_all(&subdir).unwrap();
1279
1280 let root = find_workspace_root(PackageManager::Npm, &subdir);
1281 assert_eq!(root, tmp.path().canonicalize().unwrap());
1282 }
1283
1284 #[test]
1285 fn test_find_workspace_root_with_pnpm() {
1286 let tmp = TempDir::new().unwrap();
1287 std::fs::write(
1288 tmp.path().join("pnpm-workspace.yaml"),
1289 "packages:\n - 'packages/*'",
1290 )
1291 .unwrap();
1292 let subdir = tmp.path().join("packages").join("app");
1293 std::fs::create_dir_all(&subdir).unwrap();
1294
1295 let root = find_workspace_root(PackageManager::Pnpm, &subdir);
1296 assert_eq!(root, tmp.path().canonicalize().unwrap());
1297 }
1298
1299 #[test]
1300 fn test_find_workspace_root_with_cargo() {
1301 let tmp = TempDir::new().unwrap();
1302 std::fs::write(
1303 tmp.path().join("Cargo.toml"),
1304 "[workspace]\nmembers = [\"crates/*\"]",
1305 )
1306 .unwrap();
1307 let subdir = tmp.path().join("crates").join("core");
1308 std::fs::create_dir_all(&subdir).unwrap();
1309
1310 let root = find_workspace_root(PackageManager::Cargo, &subdir);
1311 assert_eq!(root, tmp.path().canonicalize().unwrap());
1312 }
1313
1314 #[test]
1315 fn test_find_workspace_root_no_workspace() {
1316 let tmp = TempDir::new().unwrap();
1317 let root = find_workspace_root(PackageManager::Npm, tmp.path());
1318 assert_eq!(root, tmp.path().to_path_buf());
1320 }
1321
1322 #[test]
1323 fn test_package_json_has_workspaces_array() {
1324 let tmp = TempDir::new().unwrap();
1325 std::fs::write(
1326 tmp.path().join("package.json"),
1327 r#"{"workspaces": ["packages/*"]}"#,
1328 )
1329 .unwrap();
1330 assert!(package_json_has_workspaces(tmp.path()));
1331 }
1332
1333 #[test]
1334 fn test_package_json_has_workspaces_object() {
1335 let tmp = TempDir::new().unwrap();
1336 std::fs::write(
1337 tmp.path().join("package.json"),
1338 r#"{"workspaces": {"packages": ["packages/*"]}}"#,
1339 )
1340 .unwrap();
1341 assert!(package_json_has_workspaces(tmp.path()));
1342 }
1343
1344 #[test]
1345 fn test_package_json_no_workspaces() {
1346 let tmp = TempDir::new().unwrap();
1347 std::fs::write(tmp.path().join("package.json"), r#"{"name": "test"}"#).unwrap();
1348 assert!(!package_json_has_workspaces(tmp.path()));
1349 }
1350
1351 #[test]
1352 fn test_package_json_empty_workspaces() {
1353 let tmp = TempDir::new().unwrap();
1354 std::fs::write(tmp.path().join("package.json"), r#"{"workspaces": []}"#).unwrap();
1355 assert!(!package_json_has_workspaces(tmp.path()));
1356 }
1357
1358 #[test]
1359 fn test_package_json_missing() {
1360 let tmp = TempDir::new().unwrap();
1361 assert!(!package_json_has_workspaces(tmp.path()));
1362 }
1363
1364 #[test]
1365 fn test_cargo_toml_has_workspace() {
1366 let tmp = TempDir::new().unwrap();
1367 std::fs::write(
1368 tmp.path().join("Cargo.toml"),
1369 "[workspace]\nmembers = [\"crates/*\"]",
1370 )
1371 .unwrap();
1372 assert!(cargo_toml_has_workspace(tmp.path()));
1373 }
1374
1375 #[test]
1376 fn test_cargo_toml_no_workspace() {
1377 let tmp = TempDir::new().unwrap();
1378 std::fs::write(tmp.path().join("Cargo.toml"), "[package]\nname = \"test\"").unwrap();
1379 assert!(!cargo_toml_has_workspace(tmp.path()));
1380 }
1381
1382 #[test]
1383 fn test_cargo_toml_missing() {
1384 let tmp = TempDir::new().unwrap();
1385 assert!(!cargo_toml_has_workspace(tmp.path()));
1386 }
1387
1388 #[test]
1389 fn test_deno_json_has_workspace_array() {
1390 let tmp = TempDir::new().unwrap();
1391 std::fs::write(
1392 tmp.path().join("deno.json"),
1393 r#"{"workspace": ["./packages/*"]}"#,
1394 )
1395 .unwrap();
1396 assert!(deno_json_has_workspace(tmp.path()));
1397 }
1398
1399 #[test]
1400 fn test_deno_json_has_workspace_object() {
1401 let tmp = TempDir::new().unwrap();
1402 std::fs::write(
1403 tmp.path().join("deno.json"),
1404 r#"{"workspace": {"members": ["./packages/*"]}}"#,
1405 )
1406 .unwrap();
1407 assert!(deno_json_has_workspace(tmp.path()));
1408 }
1409
1410 #[test]
1411 fn test_deno_json_no_workspace() {
1412 let tmp = TempDir::new().unwrap();
1413 std::fs::write(tmp.path().join("deno.json"), r#"{"name": "test"}"#).unwrap();
1414 assert!(!deno_json_has_workspace(tmp.path()));
1415 }
1416
1417 #[test]
1418 fn test_deno_json_missing() {
1419 let tmp = TempDir::new().unwrap();
1420 assert!(!deno_json_has_workspace(tmp.path()));
1421 }
1422
1423 #[test]
1424 fn test_executor_config_with_fields() {
1425 let config = ExecutorConfig {
1426 capture_output: OutputCapture::Capture,
1427 max_parallel: 4,
1428 working_dir: Some(PathBuf::from("/tmp")),
1429 project_root: PathBuf::from("/project"),
1430 cue_module_root: Some(PathBuf::from("/project/cue.mod")),
1431 materialize_outputs: Some(PathBuf::from("/outputs")),
1432 cache_dir: Some(PathBuf::from("/cache")),
1433 show_cache_path: true,
1434 cli_backend: Some("host".to_string()),
1435 ..Default::default()
1436 };
1437 assert!(config.capture_output.should_capture());
1438 assert_eq!(config.max_parallel, 4);
1439 assert_eq!(config.working_dir, Some(PathBuf::from("/tmp")));
1440 assert!(config.show_cache_path);
1441 }
1442
1443 #[test]
1444 fn test_task_result_clone() {
1445 let result = TaskResult {
1446 name: "test".to_string(),
1447 exit_code: Some(0),
1448 stdout: "output".to_string(),
1449 stderr: "error".to_string(),
1450 success: true,
1451 };
1452 let cloned = result.clone();
1453 assert_eq!(cloned.name, result.name);
1454 assert_eq!(cloned.exit_code, result.exit_code);
1455 assert_eq!(cloned.stdout, result.stdout);
1456 assert_eq!(cloned.stderr, result.stderr);
1457 assert_eq!(cloned.success, result.success);
1458 }
1459
1460 #[test]
1461 fn test_task_failure_snippet_lines_constant() {
1462 assert_eq!(TASK_FAILURE_SNIPPET_LINES, 20);
1463 }
1464}