1use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
8use super::{Task, TaskGraph, TaskGroup, TaskNode, Tasks};
9use crate::config::BackendConfig;
10use crate::environment::Environment;
11use crate::{Error, Result};
12use async_recursion::async_recursion;
13use cuenv_workspaces::PackageManager;
14use std::path::{Path, PathBuf};
15use std::process::Stdio;
16use std::sync::Arc;
17use tokio::process::Command;
18use tokio::task::JoinSet;
19use tracing::instrument;
20
21#[derive(Debug, Clone)]
23pub struct TaskResult {
24 pub name: String,
25 pub exit_code: Option<i32>,
26 pub stdout: String,
27 pub stderr: String,
28 pub success: bool,
29}
30
31pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
33
34#[derive(Debug, Clone)]
36pub struct ExecutorConfig {
37 pub capture_output: bool,
39 pub max_parallel: usize,
41 pub environment: Environment,
43 pub working_dir: Option<PathBuf>,
45 pub project_root: PathBuf,
47 pub cue_module_root: Option<PathBuf>,
49 pub materialize_outputs: Option<PathBuf>,
51 pub cache_dir: Option<PathBuf>,
53 pub show_cache_path: bool,
55 pub backend_config: Option<BackendConfig>,
57 pub cli_backend: Option<String>,
59}
60
61impl Default for ExecutorConfig {
62 fn default() -> Self {
63 Self {
64 capture_output: true,
65 max_parallel: 0,
66 environment: Environment::new(),
67 working_dir: None,
68 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
69 cue_module_root: None,
70 materialize_outputs: None,
71 cache_dir: None,
72 show_cache_path: false,
73 backend_config: None,
74 cli_backend: None,
75 }
76 }
77}
78
79pub struct TaskExecutor {
81 config: ExecutorConfig,
82 backend: Arc<dyn TaskBackend>,
83}
84impl TaskExecutor {
85 pub fn new(config: ExecutorConfig) -> Self {
87 Self::with_dagger_factory(config, None)
88 }
89
90 pub fn with_dagger_factory(
94 config: ExecutorConfig,
95 dagger_factory: Option<BackendFactory>,
96 ) -> Self {
97 let backend = create_backend_with_factory(
98 config.backend_config.as_ref(),
99 config.project_root.clone(),
100 config.cli_backend.as_deref(),
101 dagger_factory,
102 );
103 Self { config, backend }
104 }
105
106 fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
108 Self { config, backend }
109 }
110
111 #[instrument(name = "execute_task", skip(self, task), fields(task_name = %name))]
113 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
114 if self.backend.name() == "dagger" {
119 return self
120 .backend
121 .execute(
122 name,
123 task,
124 &self.config.environment,
125 &self.config.project_root,
126 self.config.capture_output,
127 )
128 .await;
129 }
130
131 self.execute_task_non_hermetic(name, task).await
133 }
134
135 async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
139 if task.is_task_ref() && task.project_root.is_none() {
141 return Err(Error::configuration(format!(
142 "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
143 This usually means:\n\
144 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
145 - The referenced task '{}' doesn't exist in that project\n\
146 - There was an error loading the referenced project's env.cue\n\
147 Run with RUST_LOG=debug for more details.",
148 name,
149 task.task_ref.as_deref().unwrap_or("unknown"),
150 task.task_ref
151 .as_deref()
152 .and_then(|r| r.split(':').next_back())
153 .unwrap_or("unknown")
154 )));
155 }
156
157 let workdir = if let Some(ref dir) = task.directory {
164 self.config
166 .cue_module_root
167 .as_ref()
168 .unwrap_or(&self.config.project_root)
169 .join(dir)
170 } else if let Some(ref project_root) = task.project_root {
171 project_root.clone()
173 } else if let Some(ref source) = task.source {
174 if let Some(dir) = source.directory() {
176 self.config
177 .cue_module_root
178 .as_ref()
179 .unwrap_or(&self.config.project_root)
180 .join(dir)
181 } else {
182 self.config
186 .cue_module_root
187 .clone()
188 .unwrap_or_else(|| self.config.project_root.clone())
189 }
190 } else if !task.hermetic {
191 if let Some(manager) = cuenv_workspaces::detect_from_command(&task.command) {
193 find_workspace_root(manager, &self.config.project_root)
194 } else {
195 self.config.project_root.clone()
196 }
197 } else {
198 self.config.project_root.clone()
199 };
200
201 tracing::info!(
202 task = %name,
203 workdir = %workdir.display(),
204 hermetic = false,
205 "Executing non-hermetic task"
206 );
207
208 let cmd_str = if let Some(script) = &task.script {
211 format!("[script: {} bytes]", script.len())
212 } else if task.command.is_empty() {
213 task.args.join(" ")
214 } else {
215 format!("{} {}", task.command, task.args.join(" "))
216 };
217
218 cuenv_events::emit_task_started!(name, cmd_str, false);
219
220 let mut cmd = if let Some(script) = &task.script {
222 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
224 (
225 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
226 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
227 )
228 } else {
229 ("bash".to_string(), "-c".to_string())
231 };
232
233 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
234 let mut cmd = Command::new(&resolved_shell);
235 cmd.arg(&shell_flag);
236 cmd.arg(script);
237 cmd
238 } else {
239 let resolved_command = self.config.environment.resolve_command(&task.command);
241
242 if let Some(shell) = &task.shell {
243 if let (Some(shell_command), Some(shell_flag)) = (&shell.command, &shell.flag) {
244 let resolved_shell = self.config.environment.resolve_command(shell_command);
245 let mut cmd = Command::new(&resolved_shell);
246 cmd.arg(shell_flag);
247 if task.args.is_empty() {
248 cmd.arg(&resolved_command);
249 } else {
250 let full_command = if task.command.is_empty() {
251 task.args.join(" ")
252 } else {
253 format!("{} {}", resolved_command, task.args.join(" "))
254 };
255 cmd.arg(full_command);
256 }
257 cmd
258 } else {
259 let mut cmd = Command::new(&resolved_command);
260 for arg in &task.args {
261 cmd.arg(arg);
262 }
263 cmd
264 }
265 } else {
266 let mut cmd = Command::new(&resolved_command);
267 for arg in &task.args {
268 cmd.arg(arg);
269 }
270 cmd
271 }
272 };
273
274 cmd.current_dir(&workdir);
276 let env_vars = self.config.environment.merge_with_system_hermetic();
277 for (k, v) in &env_vars {
278 cmd.env(k, v);
279 }
280
281 if !env_vars.contains_key("FORCE_COLOR") {
284 cmd.env("FORCE_COLOR", "1");
285 }
286 if !env_vars.contains_key("CLICOLOR_FORCE") {
287 cmd.env("CLICOLOR_FORCE", "1");
288 }
289
290 if self.config.capture_output {
293 use tokio::io::{AsyncBufReadExt, BufReader};
294
295 let start_time = std::time::Instant::now();
296
297 let mut child = cmd
299 .stdout(Stdio::piped())
300 .stderr(Stdio::piped())
301 .spawn()
302 .map_err(|e| Error::Io {
303 source: e,
304 path: None,
305 operation: format!("spawn task {}", name),
306 })?;
307
308 let stdout_handle = child.stdout.take();
310 let stderr_handle = child.stderr.take();
311
312 let mut stdout_lines = Vec::new();
314 let mut stderr_lines = Vec::new();
315
316 let name_for_stdout = name.to_string();
318 let stdout_task = tokio::spawn(async move {
319 let mut lines = Vec::new();
320 if let Some(stdout) = stdout_handle {
321 let mut reader = BufReader::new(stdout).lines();
322 while let Ok(Some(line)) = reader.next_line().await {
323 cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
324 lines.push(line);
325 }
326 }
327 lines
328 });
329
330 let name_for_stderr = name.to_string();
332 let stderr_task = tokio::spawn(async move {
333 let mut lines = Vec::new();
334 if let Some(stderr) = stderr_handle {
335 let mut reader = BufReader::new(stderr).lines();
336 while let Ok(Some(line)) = reader.next_line().await {
337 cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
338 lines.push(line);
339 }
340 }
341 lines
342 });
343
344 let status = child.wait().await.map_err(|e| Error::Io {
346 source: e,
347 path: None,
348 operation: format!("wait for task {}", name),
349 })?;
350
351 if let Ok(lines) = stdout_task.await {
353 stdout_lines = lines;
354 }
355 if let Ok(lines) = stderr_task.await {
356 stderr_lines = lines;
357 }
358
359 let duration_ms = start_time.elapsed().as_millis() as u64;
360 let stdout = stdout_lines.join("\n");
361 let stderr = stderr_lines.join("\n");
362 let exit_code = status.code().unwrap_or(-1);
363 let success = status.success();
364
365 cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
367
368 if !success {
369 tracing::warn!(task = %name, exit = exit_code, "Task failed");
370 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
371 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
372 }
373
374 Ok(TaskResult {
375 name: name.to_string(),
376 exit_code: Some(exit_code),
377 stdout,
378 stderr,
379 success,
380 })
381 } else {
382 let status = cmd
384 .stdout(Stdio::inherit())
385 .stderr(Stdio::inherit())
386 .status()
387 .await
388 .map_err(|e| Error::Io {
389 source: e,
390 path: None,
391 operation: format!("spawn task {}", name),
392 })?;
393
394 let exit_code = status.code().unwrap_or(-1);
395 let success = status.success();
396
397 if !success {
398 tracing::warn!(task = %name, exit = exit_code, "Task failed");
399 }
400
401 Ok(TaskResult {
402 name: name.to_string(),
403 exit_code: Some(exit_code),
404 stdout: String::new(), stderr: String::new(),
406 success,
407 })
408 }
409 }
410
411 #[async_recursion]
413 pub async fn execute_node(
414 &self,
415 name: &str,
416 node: &TaskNode,
417 all_tasks: &Tasks,
418 ) -> Result<Vec<TaskResult>> {
419 match node {
420 TaskNode::Task(task) => {
421 let result = self.execute_task(name, task.as_ref()).await?;
422 Ok(vec![result])
423 }
424 TaskNode::Group(group) => self.execute_parallel(name, group, all_tasks).await,
425 TaskNode::Sequence(seq) => self.execute_sequential(name, seq, all_tasks).await,
426 }
427 }
428
429 #[async_recursion]
431 pub async fn execute_definition(
432 &self,
433 name: &str,
434 node: &TaskNode,
435 all_tasks: &Tasks,
436 ) -> Result<Vec<TaskResult>> {
437 self.execute_node(name, node, all_tasks).await
438 }
439
440 async fn execute_sequential(
441 &self,
442 prefix: &str,
443 sequence: &[TaskNode],
444 all_tasks: &Tasks,
445 ) -> Result<Vec<TaskResult>> {
446 if !self.config.capture_output {
447 cuenv_events::emit_task_group_started!(prefix, true, sequence.len());
448 }
449 let mut results = Vec::new();
450 for (i, step) in sequence.iter().enumerate() {
451 let task_name = format!("{}[{}]", prefix, i);
452 let task_results = self.execute_node(&task_name, step, all_tasks).await?;
453 for result in &task_results {
454 if !result.success {
456 return Err(Error::task_failed(
457 &result.name,
458 result.exit_code.unwrap_or(-1),
459 &result.stdout,
460 &result.stderr,
461 ));
462 }
463 }
464 results.extend(task_results);
465 }
466 Ok(results)
467 }
468
469 async fn execute_parallel(
470 &self,
471 prefix: &str,
472 group: &TaskGroup,
473 all_tasks: &Tasks,
474 ) -> Result<Vec<TaskResult>> {
475 if let Some(default_task) = group.children.get("default") {
477 if !self.config.capture_output {
478 cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
479 }
480 let task_name = format!("{}.default", prefix);
483 return self.execute_node(&task_name, default_task, all_tasks).await;
484 }
485
486 if !self.config.capture_output {
487 cuenv_events::emit_task_group_started!(prefix, false, group.children.len());
488 }
489 let mut join_set = JoinSet::new();
490 let all_tasks = Arc::new(all_tasks.clone());
491 let mut all_results = Vec::new();
492 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
493 if let Some(failed) = results.iter().find(|r| !r.success) {
494 return Err(Error::task_failed(
495 &failed.name,
496 failed.exit_code.unwrap_or(-1),
497 &failed.stdout,
498 &failed.stderr,
499 ));
500 }
501 all_results.extend(results);
502 Ok(())
503 };
504 for (name, child_node) in &group.children {
505 let task_name = format!("{}.{}", prefix, name);
506 let child_node = child_node.clone();
507 let all_tasks = Arc::clone(&all_tasks);
508 let executor = self.clone_with_config();
509 join_set.spawn(async move {
510 executor
511 .execute_node(&task_name, &child_node, &all_tasks)
512 .await
513 });
514 if self.config.max_parallel > 0
515 && join_set.len() >= self.config.max_parallel
516 && let Some(result) = join_set.join_next().await
517 {
518 match result {
519 Ok(Ok(results)) => merge_results(results)?,
520 Ok(Err(e)) => return Err(e),
521 Err(e) => {
522 return Err(Error::execution(format!("Task execution panicked: {}", e)));
523 }
524 }
525 }
526 }
527 while let Some(result) = join_set.join_next().await {
528 match result {
529 Ok(Ok(results)) => merge_results(results)?,
530 Ok(Err(e)) => return Err(e),
531 Err(e) => {
532 return Err(Error::execution(format!("Task execution panicked: {}", e)));
533 }
534 }
535 }
536 Ok(all_results)
537 }
538
539 #[instrument(name = "execute_graph", skip(self, graph), fields(task_count = graph.task_count()))]
540 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
541 let parallel_groups = graph.get_parallel_groups()?;
542 let mut all_results = Vec::new();
543
544 for mut group in parallel_groups {
552 let mut join_set = JoinSet::new();
553
554 while !group.is_empty() || !join_set.is_empty() {
555 while let Some(node) = group.pop() {
557 let task = node.task.clone();
558 let name = node.name.clone();
559 let executor = self.clone_with_config();
560 join_set.spawn(async move { executor.execute_task(&name, &task).await });
561
562 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
563 break;
564 }
565 }
566
567 if let Some(result) = join_set.join_next().await {
568 match result {
569 Ok(Ok(task_result)) => {
570 if !task_result.success {
571 join_set.abort_all();
572 return Err(Error::task_failed(
573 &task_result.name,
574 task_result.exit_code.unwrap_or(-1),
575 &task_result.stdout,
576 &task_result.stderr,
577 ));
578 }
579 all_results.push(task_result);
580 }
581 Ok(Err(e)) => {
582 join_set.abort_all();
583 return Err(e);
584 }
585 Err(e) => {
586 join_set.abort_all();
587 return Err(Error::execution(format!(
588 "Task execution panicked: {}",
589 e
590 )));
591 }
592 }
593 }
594 }
595 }
596
597 Ok(all_results)
598 }
599
600 fn clone_with_config(&self) -> Self {
601 Self::with_shared_backend(self.config.clone(), self.backend.clone())
603 }
604}
605
606fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
607 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
608
609 loop {
610 let is_root = match manager {
611 PackageManager::Npm
612 | PackageManager::Bun
613 | PackageManager::YarnClassic
614 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
615 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
616 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
617 PackageManager::Deno => deno_json_has_workspace(¤t),
618 };
619
620 if is_root {
621 return current;
622 }
623
624 if let Some(parent) = current.parent() {
625 current = parent.to_path_buf();
626 } else {
627 return start.to_path_buf();
628 }
629 }
630}
631
632fn package_json_has_workspaces(dir: &Path) -> bool {
633 let path = dir.join("package.json");
634 let content = std::fs::read_to_string(&path);
635 let Ok(json) = content.and_then(|s| {
636 serde_json::from_str::<serde_json::Value>(&s)
637 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
638 }) else {
639 return false;
640 };
641
642 match json.get("workspaces") {
643 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
644 Some(serde_json::Value::Object(map)) => map
645 .get("packages")
646 .and_then(|packages| packages.as_array())
647 .map(|arr| !arr.is_empty())
648 .unwrap_or(false),
649 _ => false,
650 }
651}
652
653fn cargo_toml_has_workspace(dir: &Path) -> bool {
654 let path = dir.join("Cargo.toml");
655 let Ok(content) = std::fs::read_to_string(&path) else {
656 return false;
657 };
658
659 content.contains("[workspace]")
660}
661
662fn deno_json_has_workspace(dir: &Path) -> bool {
663 let path = dir.join("deno.json");
664 let content = std::fs::read_to_string(&path);
665 let Ok(json) = content.and_then(|s| {
666 serde_json::from_str::<serde_json::Value>(&s)
667 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
668 }) else {
669 return false;
670 };
671
672 match json.get("workspace") {
674 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
675 Some(serde_json::Value::Object(_)) => true,
676 _ => false,
677 }
678}
679
680pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
683 let exit_code = result
684 .exit_code
685 .map(|c| c.to_string())
686 .unwrap_or_else(|| "unknown".to_string());
687
688 let mut sections = Vec::new();
689 sections.push(format!(
690 "Task '{}' failed with exit code {}.",
691 result.name, exit_code
692 ));
693
694 let output = format_failure_streams(result, max_output_lines);
695 if output.is_empty() {
696 sections.push(
697 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
698 .to_string(),
699 );
700 } else {
701 sections.push(output);
702 }
703
704 sections.join("\n\n")
705}
706
707fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
708 let mut streams = Vec::new();
709
710 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
711 streams.push(stdout);
712 }
713
714 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
715 streams.push(stderr);
716 }
717
718 streams.join("\n\n")
719}
720
721fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
722 let normalized = content.trim_end();
723 if normalized.is_empty() {
724 return None;
725 }
726
727 let lines: Vec<&str> = normalized.lines().collect();
728 let total = lines.len();
729 let start = total.saturating_sub(max_output_lines);
730 let snippet = lines[start..].join("\n");
731
732 let header = if total > max_output_lines {
733 format!("{label} (last {max_output_lines} of {total} lines):")
734 } else {
735 format!("{label}:")
736 };
737
738 Some(format!("{header}\n{snippet}"))
739}
740
741pub async fn execute_command(
745 command: &str,
746 args: &[String],
747 environment: &Environment,
748) -> Result<i32> {
749 execute_command_with_redaction(command, args, environment, &[]).await
750}
751
752pub async fn execute_command_with_redaction(
756 command: &str,
757 args: &[String],
758 environment: &Environment,
759 secrets: &[String],
760) -> Result<i32> {
761 use tokio::io::{AsyncBufReadExt, BufReader};
762
763 tracing::info!("Executing command: {} {:?}", command, args);
764 let mut cmd = Command::new(command);
765 cmd.args(args);
766 let env_vars = environment.merge_with_system_hermetic();
768 for (key, value) in env_vars {
769 cmd.env(key, value);
770 }
771
772 if secrets.is_empty() {
773 cmd.stdout(Stdio::inherit());
775 cmd.stderr(Stdio::inherit());
776 cmd.stdin(Stdio::inherit());
777 let status = cmd.status().await.map_err(|e| {
778 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
779 })?;
780 return Ok(status.code().unwrap_or(1));
781 }
782
783 cmd.stdout(Stdio::piped());
785 cmd.stderr(Stdio::piped());
786 cmd.stdin(Stdio::inherit());
787
788 let mut child = cmd.spawn().map_err(|e| {
789 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
790 })?;
791
792 let stdout = child
793 .stdout
794 .take()
795 .ok_or_else(|| Error::execution("stdout pipe not available"))?;
796 let stderr = child
797 .stderr
798 .take()
799 .ok_or_else(|| Error::execution("stderr pipe not available"))?;
800
801 let mut sorted_secrets: Vec<&str> = secrets.iter().map(String::as_str).collect();
803 sorted_secrets.sort_by_key(|s| std::cmp::Reverse(s.len()));
804 let sorted_secrets: Vec<String> = sorted_secrets.into_iter().map(String::from).collect();
805
806 let secrets_clone = sorted_secrets.clone();
808 let stdout_task = tokio::spawn(async move {
809 let reader = BufReader::new(stdout);
810 let mut lines = reader.lines();
811 while let Ok(Some(line)) = lines.next_line().await {
812 let mut redacted = line;
813 for secret in &secrets_clone {
814 if secret.len() >= 4 {
815 redacted = redacted.replace(secret, "[REDACTED]");
816 }
817 }
818 cuenv_events::emit_stdout!(&redacted);
819 }
820 });
821
822 let stderr_task = tokio::spawn(async move {
824 let reader = BufReader::new(stderr);
825 let mut lines = reader.lines();
826 while let Ok(Some(line)) = lines.next_line().await {
827 let mut redacted = line;
828 for secret in &sorted_secrets {
829 if secret.len() >= 4 {
830 redacted = redacted.replace(secret, "[REDACTED]");
831 }
832 }
833 cuenv_events::emit_stderr!(&redacted);
834 }
835 });
836
837 let status = child.wait().await.map_err(|e| {
839 Error::configuration(format!("Failed to wait for command '{}': {}", command, e))
840 })?;
841
842 let _ = stdout_task.await;
843 let _ = stderr_task.await;
844
845 Ok(status.code().unwrap_or(1))
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851 use crate::tasks::TaskDependency;
852 use tempfile::TempDir;
853
854 #[tokio::test]
855 async fn test_executor_config_default() {
856 let config = ExecutorConfig::default();
857 assert!(config.capture_output);
858 assert_eq!(config.max_parallel, 0);
859 assert!(config.environment.is_empty());
860 }
861
862 #[tokio::test]
863 async fn test_task_result() {
864 let result = TaskResult {
865 name: "test".to_string(),
866 exit_code: Some(0),
867 stdout: "output".to_string(),
868 stderr: String::new(),
869 success: true,
870 };
871 assert_eq!(result.name, "test");
872 assert_eq!(result.exit_code, Some(0));
873 assert!(result.success);
874 assert_eq!(result.stdout, "output");
875 }
876
877 #[tokio::test]
878 async fn test_execute_simple_task() {
879 let config = ExecutorConfig {
880 capture_output: true,
881 ..Default::default()
882 };
883 let executor = TaskExecutor::new(config);
884 let task = Task {
885 command: "echo".to_string(),
886 args: vec!["hello".to_string()],
887 description: Some("Hello task".to_string()),
888 ..Default::default()
889 };
890 let result = executor.execute_task("test", &task).await.unwrap();
891 assert!(result.success);
892 assert_eq!(result.exit_code, Some(0));
893 assert!(result.stdout.contains("hello"));
894 }
895
896 #[tokio::test]
897 async fn test_execute_with_environment() {
898 let mut config = ExecutorConfig {
899 capture_output: true,
900 ..Default::default()
901 };
902 config
903 .environment
904 .set("TEST_VAR".to_string(), "test_value".to_string());
905 let executor = TaskExecutor::new(config);
906 let task = Task {
907 command: "printenv".to_string(),
908 args: vec!["TEST_VAR".to_string()],
909 description: Some("Print env task".to_string()),
910 ..Default::default()
911 };
912 let result = executor.execute_task("test", &task).await.unwrap();
913 assert!(result.success);
914 assert!(result.stdout.contains("test_value"));
915 }
916
917 #[tokio::test]
918 async fn test_execute_failing_task() {
919 let config = ExecutorConfig {
920 capture_output: true,
921 ..Default::default()
922 };
923 let executor = TaskExecutor::new(config);
924 let task = Task {
925 command: "false".to_string(),
926 description: Some("Failing task".to_string()),
927 ..Default::default()
928 };
929 let result = executor.execute_task("test", &task).await.unwrap();
930 assert!(!result.success);
931 assert_eq!(result.exit_code, Some(1));
932 }
933
934 #[tokio::test]
935 async fn test_execute_sequential_group() {
936 let config = ExecutorConfig {
937 capture_output: true,
938 ..Default::default()
939 };
940 let executor = TaskExecutor::new(config);
941 let task1 = Task {
942 command: "echo".to_string(),
943 args: vec!["first".to_string()],
944 description: Some("First task".to_string()),
945 ..Default::default()
946 };
947 let task2 = Task {
948 command: "echo".to_string(),
949 args: vec!["second".to_string()],
950 description: Some("Second task".to_string()),
951 ..Default::default()
952 };
953 let sequence = vec![
954 TaskNode::Task(Box::new(task1)),
955 TaskNode::Task(Box::new(task2)),
956 ];
957 let all_tasks = Tasks::new();
958 let node = TaskNode::Sequence(sequence);
959 let results = executor
960 .execute_node("seq", &node, &all_tasks)
961 .await
962 .unwrap();
963 assert_eq!(results.len(), 2);
964 assert!(results[0].stdout.contains("first"));
965 assert!(results[1].stdout.contains("second"));
966 }
967
968 #[tokio::test]
969 async fn test_command_injection_prevention() {
970 let config = ExecutorConfig {
971 capture_output: true,
972 ..Default::default()
973 };
974 let executor = TaskExecutor::new(config);
975 let malicious_task = Task {
976 command: "echo".to_string(),
977 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
978 description: Some("Malicious task test".to_string()),
979 ..Default::default()
980 };
981 let result = executor
982 .execute_task("malicious", &malicious_task)
983 .await
984 .unwrap();
985 assert!(result.success);
986 assert!(result.stdout.contains("hello ; rm -rf /"));
987 }
988
989 #[tokio::test]
990 async fn test_special_characters_in_args() {
991 let config = ExecutorConfig {
992 capture_output: true,
993 ..Default::default()
994 };
995 let executor = TaskExecutor::new(config);
996 let special_chars = vec![
997 "$USER",
998 "$(whoami)",
999 "`whoami`",
1000 "&& echo hacked",
1001 "|| echo failed",
1002 "> /tmp/hack",
1003 "| cat",
1004 ];
1005 for special_arg in special_chars {
1006 let task = Task {
1007 command: "echo".to_string(),
1008 args: vec!["safe".to_string(), special_arg.to_string()],
1009 description: Some("Special character test".to_string()),
1010 ..Default::default()
1011 };
1012 let result = executor.execute_task("special", &task).await.unwrap();
1013 assert!(result.success);
1014 assert!(result.stdout.contains("safe"));
1015 assert!(result.stdout.contains(special_arg));
1016 }
1017 }
1018
1019 #[tokio::test]
1020 async fn test_environment_variable_safety() {
1021 let mut config = ExecutorConfig {
1022 capture_output: true,
1023 ..Default::default()
1024 };
1025 config
1026 .environment
1027 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1028 let executor = TaskExecutor::new(config);
1029 let task = Task {
1030 command: "printenv".to_string(),
1031 args: vec!["DANGEROUS_VAR".to_string()],
1032 description: Some("Environment variable safety test".to_string()),
1033 ..Default::default()
1034 };
1035 let result = executor.execute_task("env_test", &task).await.unwrap();
1036 assert!(result.success);
1037 assert!(result.stdout.contains("; rm -rf /"));
1038 }
1039
1040 #[tokio::test]
1041 async fn test_execute_graph_parallel_groups() {
1042 let config = ExecutorConfig {
1044 capture_output: true,
1045 max_parallel: 2,
1046 ..Default::default()
1047 };
1048 let executor = TaskExecutor::new(config);
1049 let mut graph = TaskGraph::new();
1050
1051 let t1 = Task {
1052 command: "echo".into(),
1053 args: vec!["A".into()],
1054 ..Default::default()
1055 };
1056 let t2 = Task {
1057 command: "echo".into(),
1058 args: vec!["B".into()],
1059 ..Default::default()
1060 };
1061
1062 graph.add_task("t1", t1).unwrap();
1063 graph.add_task("t2", t2).unwrap();
1064 let results = executor.execute_graph(&graph).await.unwrap();
1065 assert_eq!(results.len(), 2);
1066 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1067 assert!(joined.contains("A") && joined.contains("B"));
1068 }
1069
1070 #[tokio::test]
1071 async fn test_execute_graph_respects_dependency_levels() {
1072 let tmp = TempDir::new().unwrap();
1073 let root = tmp.path();
1074
1075 let config = ExecutorConfig {
1076 capture_output: true,
1077 max_parallel: 2,
1078 project_root: root.to_path_buf(),
1079 ..Default::default()
1080 };
1081 let executor = TaskExecutor::new(config);
1082
1083 let mut tasks = Tasks::new();
1084 tasks.tasks.insert(
1085 "dep".into(),
1086 TaskNode::Task(Box::new(Task {
1087 command: "sh".into(),
1088 args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
1089 ..Default::default()
1090 })),
1091 );
1092 tasks.tasks.insert(
1093 "consumer".into(),
1094 TaskNode::Task(Box::new(Task {
1095 command: "sh".into(),
1096 args: vec!["-c".into(), "cat marker.txt".into()],
1097 depends_on: vec![TaskDependency::from_name("dep")],
1098 ..Default::default()
1099 })),
1100 );
1101
1102 let mut graph = TaskGraph::new();
1103 graph.build_for_task("consumer", &tasks).unwrap();
1104
1105 let results = executor.execute_graph(&graph).await.unwrap();
1106 assert_eq!(results.len(), 2);
1107
1108 let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
1109 assert!(consumer.success);
1110 assert!(consumer.stdout.contains("ok"));
1111 }
1112
1113 #[test]
1114 fn test_summarize_task_failure_with_exit_code() {
1115 let result = TaskResult {
1116 name: "build".to_string(),
1117 exit_code: Some(127),
1118 stdout: String::new(),
1119 stderr: "command not found".to_string(),
1120 success: false,
1121 };
1122 let summary = summarize_task_failure(&result, 10);
1123 assert!(summary.contains("build"));
1124 assert!(summary.contains("127"));
1125 assert!(summary.contains("command not found"));
1126 }
1127
1128 #[test]
1129 fn test_summarize_task_failure_no_exit_code() {
1130 let result = TaskResult {
1131 name: "killed".to_string(),
1132 exit_code: None,
1133 stdout: String::new(),
1134 stderr: String::new(),
1135 success: false,
1136 };
1137 let summary = summarize_task_failure(&result, 10);
1138 assert!(summary.contains("killed"));
1139 assert!(summary.contains("unknown"));
1140 }
1141
1142 #[test]
1143 fn test_summarize_task_failure_no_output() {
1144 let result = TaskResult {
1145 name: "silent".to_string(),
1146 exit_code: Some(1),
1147 stdout: String::new(),
1148 stderr: String::new(),
1149 success: false,
1150 };
1151 let summary = summarize_task_failure(&result, 10);
1152 assert!(summary.contains("No stdout/stderr"));
1153 assert!(summary.contains("RUST_LOG=debug"));
1154 }
1155
1156 #[test]
1157 fn test_summarize_task_failure_truncates_long_output() {
1158 let result = TaskResult {
1159 name: "verbose".to_string(),
1160 exit_code: Some(1),
1161 stdout: (1..=50)
1162 .map(|i| format!("line {}", i))
1163 .collect::<Vec<_>>()
1164 .join("\n"),
1165 stderr: String::new(),
1166 success: false,
1167 };
1168 let summary = summarize_task_failure(&result, 10);
1169 assert!(summary.contains("last 10 of 50 lines"));
1170 assert!(summary.contains("line 50"));
1171 assert!(!summary.contains("line 1\n")); }
1173
1174 #[test]
1175 fn test_summarize_stream_empty() {
1176 assert!(summarize_stream("test", "", 10).is_none());
1177 assert!(summarize_stream("test", " ", 10).is_none());
1178 assert!(summarize_stream("test", "\n\n", 10).is_none());
1179 }
1180
1181 #[test]
1182 fn test_summarize_stream_short() {
1183 let result = summarize_stream("stdout", "line 1\nline 2", 10).unwrap();
1184 assert!(result.contains("stdout:"));
1185 assert!(result.contains("line 1"));
1186 assert!(result.contains("line 2"));
1187 assert!(!result.contains("last"));
1188 }
1189
1190 #[test]
1191 fn test_format_failure_streams_both() {
1192 let result = TaskResult {
1193 name: "test".to_string(),
1194 exit_code: Some(1),
1195 stdout: "stdout content".to_string(),
1196 stderr: "stderr content".to_string(),
1197 success: false,
1198 };
1199 let formatted = format_failure_streams(&result, 10);
1200 assert!(formatted.contains("stdout:"));
1201 assert!(formatted.contains("stderr:"));
1202 assert!(formatted.contains("stdout content"));
1203 assert!(formatted.contains("stderr content"));
1204 }
1205
1206 #[test]
1207 fn test_find_workspace_root_with_npm() {
1208 let tmp = TempDir::new().unwrap();
1209 std::fs::write(
1211 tmp.path().join("package.json"),
1212 r#"{"workspaces": ["packages/*"]}"#,
1213 )
1214 .unwrap();
1215 let subdir = tmp.path().join("packages").join("subpkg");
1216 std::fs::create_dir_all(&subdir).unwrap();
1217
1218 let root = find_workspace_root(PackageManager::Npm, &subdir);
1219 assert_eq!(root, tmp.path().canonicalize().unwrap());
1220 }
1221
1222 #[test]
1223 fn test_find_workspace_root_with_pnpm() {
1224 let tmp = TempDir::new().unwrap();
1225 std::fs::write(
1226 tmp.path().join("pnpm-workspace.yaml"),
1227 "packages:\n - 'packages/*'",
1228 )
1229 .unwrap();
1230 let subdir = tmp.path().join("packages").join("app");
1231 std::fs::create_dir_all(&subdir).unwrap();
1232
1233 let root = find_workspace_root(PackageManager::Pnpm, &subdir);
1234 assert_eq!(root, tmp.path().canonicalize().unwrap());
1235 }
1236
1237 #[test]
1238 fn test_find_workspace_root_with_cargo() {
1239 let tmp = TempDir::new().unwrap();
1240 std::fs::write(
1241 tmp.path().join("Cargo.toml"),
1242 "[workspace]\nmembers = [\"crates/*\"]",
1243 )
1244 .unwrap();
1245 let subdir = tmp.path().join("crates").join("core");
1246 std::fs::create_dir_all(&subdir).unwrap();
1247
1248 let root = find_workspace_root(PackageManager::Cargo, &subdir);
1249 assert_eq!(root, tmp.path().canonicalize().unwrap());
1250 }
1251
1252 #[test]
1253 fn test_find_workspace_root_no_workspace() {
1254 let tmp = TempDir::new().unwrap();
1255 let root = find_workspace_root(PackageManager::Npm, tmp.path());
1256 assert_eq!(root, tmp.path().to_path_buf());
1258 }
1259
1260 #[test]
1261 fn test_package_json_has_workspaces_array() {
1262 let tmp = TempDir::new().unwrap();
1263 std::fs::write(
1264 tmp.path().join("package.json"),
1265 r#"{"workspaces": ["packages/*"]}"#,
1266 )
1267 .unwrap();
1268 assert!(package_json_has_workspaces(tmp.path()));
1269 }
1270
1271 #[test]
1272 fn test_package_json_has_workspaces_object() {
1273 let tmp = TempDir::new().unwrap();
1274 std::fs::write(
1275 tmp.path().join("package.json"),
1276 r#"{"workspaces": {"packages": ["packages/*"]}}"#,
1277 )
1278 .unwrap();
1279 assert!(package_json_has_workspaces(tmp.path()));
1280 }
1281
1282 #[test]
1283 fn test_package_json_no_workspaces() {
1284 let tmp = TempDir::new().unwrap();
1285 std::fs::write(tmp.path().join("package.json"), r#"{"name": "test"}"#).unwrap();
1286 assert!(!package_json_has_workspaces(tmp.path()));
1287 }
1288
1289 #[test]
1290 fn test_package_json_empty_workspaces() {
1291 let tmp = TempDir::new().unwrap();
1292 std::fs::write(tmp.path().join("package.json"), r#"{"workspaces": []}"#).unwrap();
1293 assert!(!package_json_has_workspaces(tmp.path()));
1294 }
1295
1296 #[test]
1297 fn test_package_json_missing() {
1298 let tmp = TempDir::new().unwrap();
1299 assert!(!package_json_has_workspaces(tmp.path()));
1300 }
1301
1302 #[test]
1303 fn test_cargo_toml_has_workspace() {
1304 let tmp = TempDir::new().unwrap();
1305 std::fs::write(
1306 tmp.path().join("Cargo.toml"),
1307 "[workspace]\nmembers = [\"crates/*\"]",
1308 )
1309 .unwrap();
1310 assert!(cargo_toml_has_workspace(tmp.path()));
1311 }
1312
1313 #[test]
1314 fn test_cargo_toml_no_workspace() {
1315 let tmp = TempDir::new().unwrap();
1316 std::fs::write(tmp.path().join("Cargo.toml"), "[package]\nname = \"test\"").unwrap();
1317 assert!(!cargo_toml_has_workspace(tmp.path()));
1318 }
1319
1320 #[test]
1321 fn test_cargo_toml_missing() {
1322 let tmp = TempDir::new().unwrap();
1323 assert!(!cargo_toml_has_workspace(tmp.path()));
1324 }
1325
1326 #[test]
1327 fn test_deno_json_has_workspace_array() {
1328 let tmp = TempDir::new().unwrap();
1329 std::fs::write(
1330 tmp.path().join("deno.json"),
1331 r#"{"workspace": ["./packages/*"]}"#,
1332 )
1333 .unwrap();
1334 assert!(deno_json_has_workspace(tmp.path()));
1335 }
1336
1337 #[test]
1338 fn test_deno_json_has_workspace_object() {
1339 let tmp = TempDir::new().unwrap();
1340 std::fs::write(
1341 tmp.path().join("deno.json"),
1342 r#"{"workspace": {"members": ["./packages/*"]}}"#,
1343 )
1344 .unwrap();
1345 assert!(deno_json_has_workspace(tmp.path()));
1346 }
1347
1348 #[test]
1349 fn test_deno_json_no_workspace() {
1350 let tmp = TempDir::new().unwrap();
1351 std::fs::write(tmp.path().join("deno.json"), r#"{"name": "test"}"#).unwrap();
1352 assert!(!deno_json_has_workspace(tmp.path()));
1353 }
1354
1355 #[test]
1356 fn test_deno_json_missing() {
1357 let tmp = TempDir::new().unwrap();
1358 assert!(!deno_json_has_workspace(tmp.path()));
1359 }
1360
1361 #[test]
1362 fn test_executor_config_with_fields() {
1363 let config = ExecutorConfig {
1364 capture_output: true,
1365 max_parallel: 4,
1366 working_dir: Some(PathBuf::from("/tmp")),
1367 project_root: PathBuf::from("/project"),
1368 cue_module_root: Some(PathBuf::from("/project/cue.mod")),
1369 materialize_outputs: Some(PathBuf::from("/outputs")),
1370 cache_dir: Some(PathBuf::from("/cache")),
1371 show_cache_path: true,
1372 cli_backend: Some("host".to_string()),
1373 ..Default::default()
1374 };
1375 assert!(config.capture_output);
1376 assert_eq!(config.max_parallel, 4);
1377 assert_eq!(config.working_dir, Some(PathBuf::from("/tmp")));
1378 assert!(config.show_cache_path);
1379 }
1380
1381 #[test]
1382 fn test_task_result_clone() {
1383 let result = TaskResult {
1384 name: "test".to_string(),
1385 exit_code: Some(0),
1386 stdout: "output".to_string(),
1387 stderr: "error".to_string(),
1388 success: true,
1389 };
1390 let cloned = result.clone();
1391 assert_eq!(cloned.name, result.name);
1392 assert_eq!(cloned.exit_code, result.exit_code);
1393 assert_eq!(cloned.stdout, result.stdout);
1394 assert_eq!(cloned.stderr, result.stderr);
1395 assert_eq!(cloned.success, result.success);
1396 }
1397
1398 #[test]
1399 fn test_task_failure_snippet_lines_constant() {
1400 assert_eq!(TASK_FAILURE_SNIPPET_LINES, 20);
1401 }
1402}