1use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
8use super::cache::{BuildActionInput, RecordInput, TaskCacheConfig};
9use super::process_registry::global_registry;
10use super::{Task, TaskGraph, TaskGroup, TaskNode, Tasks};
11use crate::OutputCapture;
12use crate::config::BackendConfig;
13use crate::environment::Environment;
14use crate::{Error, Result};
15use async_recursion::async_recursion;
16use cuenv_workspaces::PackageManager;
17use std::path::{Path, PathBuf};
18use std::process::Stdio;
19use std::sync::Arc;
20use tokio::process::Command;
21use tokio::task::JoinSet;
22use tracing::instrument;
23
24#[cfg(unix)]
27#[allow(unused_imports)]
28use std::os::unix::process::CommandExt;
29
30#[cfg(unix)]
35fn setup_process_group(cmd: &mut Command) {
36 #[expect(unsafe_code, reason = "Required for POSIX process group management")]
40 unsafe {
41 cmd.pre_exec(|| {
42 libc::setpgid(0, 0);
43 Ok(())
44 });
45 }
46}
47
48#[derive(Debug, Clone)]
50pub struct TaskResult {
51 pub name: String,
52 pub exit_code: Option<i32>,
53 pub stdout: String,
54 pub stderr: String,
55 pub success: bool,
56}
57
58pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
60
61#[derive(Debug, Clone)]
63pub struct ExecutorConfig {
64 pub capture_output: OutputCapture,
66 pub max_parallel: usize,
68 pub environment: Environment,
70 pub working_dir: Option<PathBuf>,
72 pub project_root: PathBuf,
74 pub cue_module_root: Option<PathBuf>,
76 pub materialize_outputs: Option<PathBuf>,
78 pub cache_dir: Option<PathBuf>,
80 pub show_cache_path: bool,
82 pub backend_config: Option<BackendConfig>,
84 pub cli_backend: Option<String>,
86 pub cache: Option<TaskCacheConfig>,
90}
91
92impl Default for ExecutorConfig {
93 fn default() -> Self {
94 Self {
95 capture_output: OutputCapture::Capture,
96 max_parallel: 0,
97 environment: Environment::new(),
98 working_dir: None,
99 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
100 cue_module_root: None,
101 materialize_outputs: None,
102 cache_dir: None,
103 show_cache_path: false,
104 backend_config: None,
105 cli_backend: None,
106 cache: None,
107 }
108 }
109}
110
111pub struct TaskExecutor {
113 config: ExecutorConfig,
114 backend: Arc<dyn TaskBackend>,
115}
116impl TaskExecutor {
117 pub fn new(config: ExecutorConfig) -> Self {
119 Self::with_dagger_factory(config, None)
120 }
121
122 pub fn with_dagger_factory(
126 config: ExecutorConfig,
127 dagger_factory: Option<BackendFactory>,
128 ) -> Self {
129 let backend = create_backend_with_factory(
130 config.backend_config.as_ref(),
131 config.project_root.clone(),
132 config.cli_backend.as_deref(),
133 dagger_factory,
134 );
135 Self { config, backend }
136 }
137
138 fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
140 Self { config, backend }
141 }
142
143 #[instrument(name = "execute_task", skip(self, task), fields(task_name = %name))]
154 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
155 let cache_handle: Option<(TaskCacheConfig, cuenv_cas::Digest, PathBuf)> =
159 if let Some(cache) = self.config.cache.clone() {
160 let workdir = self.workdir_for_task(task);
161 match super::cache::build_action(BuildActionInput {
162 task,
163 task_name: name,
164 environment: &self.config.environment,
165 cache: &cache,
166 workdir: &workdir,
167 project_root: self.project_root_for_task(task),
168 module_root: self
169 .config
170 .cue_module_root
171 .as_deref()
172 .unwrap_or(&self.config.project_root),
173 })
174 .await?
175 {
176 Some((_, action_digest)) => {
177 if let Some(cached) = super::cache::lookup(&cache, &action_digest, task)? {
179 tracing::debug!(task = %name, "action cache hit");
180 return self.return_cache_hit(CacheHitInput {
181 name,
182 task,
183 cache: &cache,
184 workdir: &workdir,
185 cached: &cached,
186 });
187 }
188 tracing::debug!(task = %name, "action cache miss");
189 Some((cache, action_digest, workdir))
190 }
191 None => None,
192 }
193 } else {
194 None
195 };
196
197 let start = std::time::Instant::now();
199 let result = if self.backend.name() == "dagger" {
200 let ctx = super::backend::TaskExecutionContext {
201 name,
202 task,
203 environment: &self.config.environment,
204 project_root: &self.config.project_root,
205 capture_output: self.config.capture_output,
206 };
207 self.backend.execute(&ctx).await?
208 } else {
209 self.execute_task_non_hermetic(name, task).await?
210 };
211 let duration_ms = start.elapsed().as_millis();
212
213 if let Some((cache, action_digest, workdir)) = cache_handle
216 && super::cache::effective_policy(task).mode.allows_write()
217 && result.exit_code == Some(0)
218 && let Err(e) = super::cache::record(RecordInput {
219 cache: &cache,
220 action_digest: &action_digest,
221 workdir: &workdir,
222 task,
223 stdout: &result.stdout,
224 stderr: &result.stderr,
225 exit_code: 0,
226 duration_ms,
227 })
228 {
229 tracing::warn!(task = %name, error = %e, "cache write failed");
230 }
231
232 Ok(result)
233 }
234
235 fn return_cache_hit(&self, input: CacheHitInput<'_>) -> Result<TaskResult> {
240 let CacheHitInput {
241 name,
242 task,
243 cache,
244 workdir,
245 cached,
246 } = input;
247
248 let (stdout, stderr, exit_code) = super::cache::materialize_hit(cache, workdir, cached)?;
249 let success = exit_code == 0;
250
251 let cmd_str = if let Some(script) = &task.script {
252 format!("[script: {} bytes] (cached)", script.len())
253 } else if task.command.is_empty() {
254 format!("{} (cached)", task.args.join(" "))
255 } else {
256 format!("{} {} (cached)", task.command, task.args.join(" "))
257 };
258 cuenv_events::emit_task_started!(name, cmd_str, false);
259 emit_cached_output_events(name, "stdout", &stdout);
260 emit_cached_output_events(name, "stderr", &stderr);
261 cuenv_events::emit_task_completed!(
262 name,
263 success,
264 exit_code,
265 cached.execution_metadata.duration_ms
266 );
267
268 Ok(TaskResult {
269 name: name.to_string(),
270 exit_code: Some(exit_code),
271 stdout,
272 stderr,
273 success,
274 })
275 }
276
277 fn workdir_for_task(&self, task: &Task) -> PathBuf {
283 if let Some(ref dir) = task.directory {
284 self.config
285 .cue_module_root
286 .as_ref()
287 .unwrap_or(&self.config.project_root)
288 .join(dir)
289 } else if let Some(ref project_root) = task.project_root {
290 project_root.clone()
291 } else if let Some(ref source) = task.source {
292 if let Some(dir) = source.directory() {
293 self.config
294 .cue_module_root
295 .as_ref()
296 .unwrap_or(&self.config.project_root)
297 .join(dir)
298 } else if let Some(ref project_root) = task.project_root {
299 project_root.clone()
300 } else {
301 self.config
302 .cue_module_root
303 .clone()
304 .unwrap_or_else(|| self.config.project_root.clone())
305 }
306 } else if !task.hermetic {
307 if let Some(manager) = cuenv_workspaces::detect_from_command(&task.command) {
308 find_workspace_root(manager, &self.config.project_root)
309 } else {
310 self.config.project_root.clone()
311 }
312 } else {
313 self.config.project_root.clone()
314 }
315 }
316
317 fn project_root_for_task<'a>(&'a self, task: &'a Task) -> &'a Path {
318 task.project_root
319 .as_deref()
320 .unwrap_or(&self.config.project_root)
321 }
322
323 async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
327 if task.is_task_ref() && task.project_root.is_none() {
329 return Err(Error::configuration(format!(
330 "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
331 This usually means:\n\
332 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
333 - The referenced task '{}' doesn't exist in that project\n\
334 - There was an error loading the referenced project's env.cue\n\
335 Run with RUST_LOG=debug for more details.",
336 name,
337 task.task_ref.as_deref().unwrap_or("unknown"),
338 task.task_ref
339 .as_deref()
340 .and_then(|r| r.split(':').next_back())
341 .unwrap_or("unknown")
342 )));
343 }
344
345 let workdir = self.workdir_for_task(task);
347
348 tracing::info!(
349 task = %name,
350 workdir = %workdir.display(),
351 hermetic = false,
352 "Executing non-hermetic task"
353 );
354
355 let cmd_str = if let Some(script) = &task.script {
358 format!("[script: {} bytes]", script.len())
359 } else if task.command.is_empty() {
360 task.args.join(" ")
361 } else {
362 format!("{} {}", task.command, task.args.join(" "))
363 };
364
365 cuenv_events::emit_task_started!(name, cmd_str, false);
366
367 let command_spec =
369 task.command_spec(|command| self.config.environment.resolve_command(command))?;
370 let mut cmd = Command::new(&command_spec.program);
371 cmd.args(&command_spec.args);
372
373 cmd.current_dir(&workdir);
375 let env_vars = self.config.environment.merge_with_system_hermetic();
376 for (k, v) in &env_vars {
377 cmd.env(k, v);
378 }
379
380 for (key, value) in &task.env {
382 if let Some(s) = value.as_str() {
383 if let Some(host_var) = super::output_refs::parse_passthrough(s) {
384 if let Ok(host_val) = std::env::var(host_var) {
385 cmd.env(key, host_val);
386 }
387 } else if !s.starts_with("cuenv:ref:") {
388 cmd.env(key, s);
390 }
391 } else if let Some(n) = value.as_i64() {
393 cmd.env(key, n.to_string());
394 } else if let Some(b) = value.as_bool() {
395 cmd.env(key, b.to_string());
396 }
397 }
399
400 if !env_vars.contains_key("FORCE_COLOR") {
403 cmd.env("FORCE_COLOR", "1");
404 }
405 if !env_vars.contains_key("CLICOLOR_FORCE") {
406 cmd.env("CLICOLOR_FORCE", "1");
407 }
408
409 if self.config.capture_output.should_capture() {
412 use tokio::io::{AsyncBufReadExt, BufReader};
413
414 let start_time = std::time::Instant::now();
415
416 #[cfg(unix)]
418 setup_process_group(&mut cmd);
419
420 let mut child = cmd
422 .stdout(Stdio::piped())
423 .stderr(Stdio::piped())
424 .spawn()
425 .map_err(|e| Error::Io {
426 source: e,
427 path: None,
428 operation: format!("spawn task {}", name),
429 })?;
430
431 let child_pid = child.id();
433 if let Some(pid) = child_pid {
434 global_registry().register(pid, name.to_string()).await;
435 }
436
437 let stdout_handle = child.stdout.take();
439 let stderr_handle = child.stderr.take();
440
441 let mut stdout_lines = Vec::new();
443 let mut stderr_lines = Vec::new();
444
445 let name_for_stdout = name.to_string();
447 let stdout_task = tokio::spawn(async move {
448 let mut lines = Vec::new();
449 if let Some(stdout) = stdout_handle {
450 let mut reader = BufReader::new(stdout).lines();
451 while let Ok(Some(line)) = reader.next_line().await {
452 cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
453 lines.push(line);
454 }
455 }
456 lines
457 });
458
459 let name_for_stderr = name.to_string();
461 let stderr_task = tokio::spawn(async move {
462 let mut lines = Vec::new();
463 if let Some(stderr) = stderr_handle {
464 let mut reader = BufReader::new(stderr).lines();
465 while let Ok(Some(line)) = reader.next_line().await {
466 cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
467 lines.push(line);
468 }
469 }
470 lines
471 });
472
473 let status = child.wait().await.map_err(|e| Error::Io {
475 source: e,
476 path: None,
477 operation: format!("wait for task {}", name),
478 })?;
479
480 if let Some(pid) = child_pid {
482 global_registry().unregister(pid).await;
483 }
484
485 if let Ok(lines) = stdout_task.await {
487 stdout_lines = lines;
488 }
489 if let Ok(lines) = stderr_task.await {
490 stderr_lines = lines;
491 }
492
493 let duration_ms = start_time.elapsed().as_millis() as u64;
494 let stdout = stdout_lines.join("\n");
495 let stderr = stderr_lines.join("\n");
496 let exit_code = status.code().unwrap_or(-1);
497 let success = status.success();
498
499 cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
501
502 if !success {
503 tracing::warn!(task = %name, exit = exit_code, "Task failed");
504 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
505 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
506 }
507
508 Ok(TaskResult {
509 name: name.to_string(),
510 exit_code: Some(exit_code),
511 stdout,
512 stderr,
513 success,
514 })
515 } else {
516 #[cfg(unix)]
520 setup_process_group(&mut cmd);
521
522 let mut child = cmd
524 .stdout(Stdio::inherit())
525 .stderr(Stdio::inherit())
526 .stdin(Stdio::inherit())
527 .spawn()
528 .map_err(|e| Error::Io {
529 source: e,
530 path: None,
531 operation: format!("spawn task {}", name),
532 })?;
533
534 let child_pid = child.id();
536 if let Some(pid) = child_pid {
537 global_registry().register(pid, name.to_string()).await;
538 }
539
540 let status = child.wait().await.map_err(|e| Error::Io {
541 source: e,
542 path: None,
543 operation: format!("wait for task {}", name),
544 })?;
545
546 if let Some(pid) = child_pid {
548 global_registry().unregister(pid).await;
549 }
550
551 let exit_code = status.code().unwrap_or(-1);
552 let success = status.success();
553
554 if !success {
555 tracing::warn!(task = %name, exit = exit_code, "Task failed");
556 }
557
558 Ok(TaskResult {
559 name: name.to_string(),
560 exit_code: Some(exit_code),
561 stdout: String::new(), stderr: String::new(),
563 success,
564 })
565 }
566 }
567
568 #[async_recursion]
570 pub async fn execute_node(
571 &self,
572 name: &str,
573 node: &TaskNode,
574 all_tasks: &Tasks,
575 ) -> Result<Vec<TaskResult>> {
576 match node {
577 TaskNode::Task(task) => {
578 let result = self.execute_task(name, task.as_ref()).await?;
579 Ok(vec![result])
580 }
581 TaskNode::Group(group) => self.execute_parallel(name, group, all_tasks).await,
582 TaskNode::Sequence(seq) => self.execute_sequential(name, seq, all_tasks).await,
583 }
584 }
585
586 #[async_recursion]
588 pub async fn execute_definition(
589 &self,
590 name: &str,
591 node: &TaskNode,
592 all_tasks: &Tasks,
593 ) -> Result<Vec<TaskResult>> {
594 self.execute_node(name, node, all_tasks).await
595 }
596
597 async fn execute_sequential(
598 &self,
599 prefix: &str,
600 sequence: &[TaskNode],
601 all_tasks: &Tasks,
602 ) -> Result<Vec<TaskResult>> {
603 if !self.config.capture_output.should_capture() {
604 cuenv_events::emit_task_group_started!(prefix, true, sequence.len());
605 }
606 let mut results = Vec::new();
607 let mut seq_results: std::collections::HashMap<String, TaskResult> =
609 std::collections::HashMap::new();
610
611 for (i, step) in sequence.iter().enumerate() {
612 let task_name = format!("{}[{}]", prefix, i);
613
614 let step = if let TaskNode::Task(task) = step
618 && super::output_refs::has_output_refs(&task.args, &task.env)
619 {
620 let mut resolved_task = (**task).clone();
621 let resolver = super::output_refs::OutputRefResolver {
622 task_name: &task_name,
623 results: &seq_results,
624 };
625 resolver.resolve(&mut resolved_task.args, &mut resolved_task.env)?;
626 TaskNode::Task(Box::new(resolved_task))
627 } else {
628 step.clone()
629 };
630
631 let task_results = self.execute_node(&task_name, &step, all_tasks).await?;
632 for result in &task_results {
633 if !result.success {
635 return Err(Error::task_failed(
636 &result.name,
637 result.exit_code.unwrap_or(-1),
638 &result.stdout,
639 &result.stderr,
640 ));
641 }
642 seq_results.insert(result.name.clone(), result.clone());
643 }
644 results.extend(task_results);
645 }
646 Ok(results)
647 }
648
649 async fn execute_parallel(
650 &self,
651 prefix: &str,
652 group: &TaskGroup,
653 all_tasks: &Tasks,
654 ) -> Result<Vec<TaskResult>> {
655 if let Some(default_task) = group.children.get("default") {
657 if !self.config.capture_output.should_capture() {
658 cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
659 }
660 let task_name = format!("{}.default", prefix);
663 return self.execute_node(&task_name, default_task, all_tasks).await;
664 }
665
666 if !self.config.capture_output.should_capture() {
667 cuenv_events::emit_task_group_started!(prefix, false, group.children.len());
668 }
669 let mut join_set = JoinSet::new();
670 let all_tasks = Arc::new(all_tasks.clone());
671 let mut all_results = Vec::new();
672 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
673 if let Some(failed) = results.iter().find(|r| !r.success) {
674 return Err(Error::task_failed(
675 &failed.name,
676 failed.exit_code.unwrap_or(-1),
677 &failed.stdout,
678 &failed.stderr,
679 ));
680 }
681 all_results.extend(results);
682 Ok(())
683 };
684 for (name, child_node) in &group.children {
685 let task_name = format!("{}.{}", prefix, name);
686 let child_node = child_node.clone();
687 let all_tasks = Arc::clone(&all_tasks);
688 let executor = self.clone_with_config();
689 join_set.spawn(async move {
690 executor
691 .execute_node(&task_name, &child_node, &all_tasks)
692 .await
693 });
694 if self.config.max_parallel > 0
695 && join_set.len() >= self.config.max_parallel
696 && let Some(result) = join_set.join_next().await
697 {
698 match result {
699 Ok(Ok(results)) => merge_results(results)?,
700 Ok(Err(e)) => return Err(e),
701 Err(e) => {
702 return Err(Error::execution(format!("Task execution panicked: {}", e)));
703 }
704 }
705 }
706 }
707 while let Some(result) = join_set.join_next().await {
708 match result {
709 Ok(Ok(results)) => merge_results(results)?,
710 Ok(Err(e)) => return Err(e),
711 Err(e) => {
712 return Err(Error::execution(format!("Task execution panicked: {}", e)));
713 }
714 }
715 }
716 Ok(all_results)
717 }
718
719 #[instrument(name = "execute_graph", skip(self, graph), fields(task_count = graph.task_count()))]
720 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
721 let parallel_groups = graph.get_parallel_groups()?;
722 let mut all_results = Vec::new();
723 let mut results_map: std::collections::HashMap<String, TaskResult> =
726 std::collections::HashMap::new();
727
728 for mut group in parallel_groups {
736 let mut join_set = JoinSet::new();
737
738 while !group.is_empty() || !join_set.is_empty() {
739 while let Some(node) = group.pop() {
741 let mut task = node.task.clone();
742 let name = node.name.clone();
743
744 let resolver = super::output_refs::OutputRefResolver {
748 task_name: &name,
749 results: &results_map,
750 };
751 resolver.resolve(&mut task.args, &mut task.env)?;
752
753 let executor = self.clone_with_config();
754 join_set.spawn(async move { executor.execute_task(&name, &task).await });
755
756 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
757 break;
758 }
759 }
760
761 if let Some(result) = join_set.join_next().await {
762 match result {
763 Ok(Ok(task_result)) => {
764 if !task_result.success {
765 join_set.abort_all();
766 return Err(Error::task_failed(
767 &task_result.name,
768 task_result.exit_code.unwrap_or(-1),
769 &task_result.stdout,
770 &task_result.stderr,
771 ));
772 }
773 results_map.insert(task_result.name.clone(), task_result.clone());
774 all_results.push(task_result);
775 }
776 Ok(Err(e)) => {
777 join_set.abort_all();
778 return Err(e);
779 }
780 Err(e) => {
781 join_set.abort_all();
782 return Err(Error::execution(format!(
783 "Task execution panicked: {}",
784 e
785 )));
786 }
787 }
788 }
789 }
790 }
791
792 Ok(all_results)
793 }
794
795 fn clone_with_config(&self) -> Self {
796 Self::with_shared_backend(self.config.clone(), self.backend.clone())
798 }
799}
800
801struct CacheHitInput<'a> {
802 name: &'a str,
803 task: &'a Task,
804 cache: &'a TaskCacheConfig,
805 workdir: &'a Path,
806 cached: &'a cuenv_cas::ActionResult,
807}
808
809fn emit_cached_output_events(name: &str, stream: &'static str, content: &str) {
810 for line in content.lines() {
811 cuenv_events::emit_task_output!(name, stream, line);
812 }
813}
814
815fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
816 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
817
818 loop {
819 let is_root = match manager {
820 PackageManager::Npm
821 | PackageManager::Bun
822 | PackageManager::YarnClassic
823 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
824 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
825 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
826 PackageManager::Deno => deno_json_has_workspace(¤t),
827 };
828
829 if is_root {
830 return current;
831 }
832
833 if let Some(parent) = current.parent() {
834 current = parent.to_path_buf();
835 } else {
836 return start.to_path_buf();
837 }
838 }
839}
840
841fn package_json_has_workspaces(dir: &Path) -> bool {
842 let path = dir.join("package.json");
843 let content = std::fs::read_to_string(&path);
844 let Ok(json) = content.and_then(|s| {
845 serde_json::from_str::<serde_json::Value>(&s)
846 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
847 }) else {
848 return false;
849 };
850
851 match json.get("workspaces") {
852 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
853 Some(serde_json::Value::Object(map)) => map
854 .get("packages")
855 .and_then(|packages| packages.as_array())
856 .map(|arr| !arr.is_empty())
857 .unwrap_or(false),
858 _ => false,
859 }
860}
861
862fn cargo_toml_has_workspace(dir: &Path) -> bool {
863 let path = dir.join("Cargo.toml");
864 let Ok(content) = std::fs::read_to_string(&path) else {
865 return false;
866 };
867
868 content.contains("[workspace]")
869}
870
871fn deno_json_has_workspace(dir: &Path) -> bool {
872 let path = dir.join("deno.json");
873 let content = std::fs::read_to_string(&path);
874 let Ok(json) = content.and_then(|s| {
875 serde_json::from_str::<serde_json::Value>(&s)
876 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
877 }) else {
878 return false;
879 };
880
881 match json.get("workspace") {
883 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
884 Some(serde_json::Value::Object(_)) => true,
885 _ => false,
886 }
887}
888
889pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
892 let exit_code = result
893 .exit_code
894 .map(|c| c.to_string())
895 .unwrap_or_else(|| "unknown".to_string());
896
897 let mut sections = Vec::new();
898 sections.push(format!(
899 "Task '{}' failed with exit code {}.",
900 result.name, exit_code
901 ));
902
903 let output = format_failure_streams(result, max_output_lines);
904 if output.is_empty() {
905 sections.push(
906 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
907 .to_string(),
908 );
909 } else {
910 sections.push(output);
911 }
912
913 sections.join("\n\n")
914}
915
916fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
917 let mut streams = Vec::new();
918
919 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
920 streams.push(stdout);
921 }
922
923 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
924 streams.push(stderr);
925 }
926
927 streams.join("\n\n")
928}
929
930fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
931 let normalized = content.trim_end();
932 if normalized.is_empty() {
933 return None;
934 }
935
936 let lines: Vec<&str> = normalized.lines().collect();
937 let total = lines.len();
938 let start = total.saturating_sub(max_output_lines);
939 let snippet = lines[start..].join("\n");
940
941 let header = if total > max_output_lines {
942 format!("{label} (last {max_output_lines} of {total} lines):")
943 } else {
944 format!("{label}:")
945 };
946
947 Some(format!("{header}\n{snippet}"))
948}
949
950pub async fn execute_command(
954 command: &str,
955 args: &[String],
956 environment: &Environment,
957) -> Result<i32> {
958 execute_command_with_redaction(command, args, environment, &[]).await
959}
960
961pub async fn execute_command_with_redaction(
965 command: &str,
966 args: &[String],
967 environment: &Environment,
968 secrets: &[String],
969) -> Result<i32> {
970 use tokio::io::{AsyncBufReadExt, BufReader};
971
972 tracing::info!("Executing command: {} {:?}", command, args);
973 let mut cmd = Command::new(command);
974 cmd.args(args);
975 let env_vars = environment.merge_with_system_hermetic();
977 for (key, value) in env_vars {
978 cmd.env(key, value);
979 }
980
981 if secrets.is_empty() {
982 cmd.stdout(Stdio::inherit());
984 cmd.stderr(Stdio::inherit());
985 cmd.stdin(Stdio::inherit());
986 let status = cmd.status().await.map_err(|e| {
987 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
988 })?;
989 return Ok(status.code().unwrap_or(1));
990 }
991
992 cmd.stdout(Stdio::piped());
994 cmd.stderr(Stdio::piped());
995 cmd.stdin(Stdio::inherit());
996
997 let mut child = cmd.spawn().map_err(|e| {
998 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
999 })?;
1000
1001 let stdout = child
1002 .stdout
1003 .take()
1004 .ok_or_else(|| Error::execution("stdout pipe not available"))?;
1005 let stderr = child
1006 .stderr
1007 .take()
1008 .ok_or_else(|| Error::execution("stderr pipe not available"))?;
1009
1010 let mut sorted_secrets: Vec<&str> = secrets.iter().map(String::as_str).collect();
1012 sorted_secrets.sort_by_key(|s| std::cmp::Reverse(s.len()));
1013 let sorted_secrets: Vec<String> = sorted_secrets.into_iter().map(String::from).collect();
1014
1015 let secrets_clone = sorted_secrets.clone();
1017 let stdout_task = tokio::spawn(async move {
1018 let reader = BufReader::new(stdout);
1019 let mut lines = reader.lines();
1020 while let Ok(Some(line)) = lines.next_line().await {
1021 let mut redacted = line;
1022 for secret in &secrets_clone {
1023 if secret.len() >= 4 {
1024 redacted = redacted.replace(secret, "[REDACTED]");
1025 }
1026 }
1027 cuenv_events::emit_stdout!(&redacted);
1028 }
1029 });
1030
1031 let stderr_task = tokio::spawn(async move {
1033 let reader = BufReader::new(stderr);
1034 let mut lines = reader.lines();
1035 while let Ok(Some(line)) = lines.next_line().await {
1036 let mut redacted = line;
1037 for secret in &sorted_secrets {
1038 if secret.len() >= 4 {
1039 redacted = redacted.replace(secret, "[REDACTED]");
1040 }
1041 }
1042 cuenv_events::emit_stderr!(&redacted);
1043 }
1044 });
1045
1046 let status = child.wait().await.map_err(|e| {
1048 Error::configuration(format!("Failed to wait for command '{}': {}", command, e))
1049 })?;
1050
1051 let _ = stdout_task.await;
1052 let _ = stderr_task.await;
1053
1054 Ok(status.code().unwrap_or(1))
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059 use super::*;
1060 use crate::tasks::TaskDependency;
1061 use crate::tasks::cache::TaskCacheConfig;
1062 use cuenv_cas::{LocalActionCache, LocalCas};
1063 use cuenv_events::{CuenvEventLayer, EventCategory, TaskEvent};
1064 use cuenv_vcs::WalkHasher;
1065 use tempfile::TempDir;
1066 use tokio::sync::mpsc;
1067 use tracing_subscriber::layer::SubscriberExt;
1068
1069 #[tokio::test]
1070 async fn test_executor_config_default() {
1071 let config = ExecutorConfig::default();
1072 assert!(config.capture_output.should_capture());
1073 assert_eq!(config.max_parallel, 0);
1074 assert!(config.environment.is_empty());
1075 }
1076
1077 #[tokio::test]
1078 async fn test_task_result() {
1079 let result = TaskResult {
1080 name: "test".to_string(),
1081 exit_code: Some(0),
1082 stdout: "output".to_string(),
1083 stderr: String::new(),
1084 success: true,
1085 };
1086 assert_eq!(result.name, "test");
1087 assert_eq!(result.exit_code, Some(0));
1088 assert!(result.success);
1089 assert_eq!(result.stdout, "output");
1090 }
1091
1092 #[tokio::test]
1093 async fn test_execute_simple_task() {
1094 let config = ExecutorConfig {
1095 capture_output: OutputCapture::Capture,
1096 ..Default::default()
1097 };
1098 let executor = TaskExecutor::new(config);
1099 let task = Task {
1100 command: "echo".to_string(),
1101 args: vec!["hello".to_string()],
1102 description: Some("Hello task".to_string()),
1103 ..Default::default()
1104 };
1105 let result = executor.execute_task("test", &task).await.unwrap();
1106 assert!(result.success);
1107 assert_eq!(result.exit_code, Some(0));
1108 assert!(result.stdout.contains("hello"));
1109 }
1110
1111 #[tokio::test]
1112 async fn test_execute_with_environment() {
1113 let mut config = ExecutorConfig {
1114 capture_output: OutputCapture::Capture,
1115 ..Default::default()
1116 };
1117 config
1118 .environment
1119 .set("TEST_VAR".to_string(), "test_value".to_string());
1120 let executor = TaskExecutor::new(config);
1121 let task = Task {
1122 command: "printenv".to_string(),
1123 args: vec!["TEST_VAR".to_string()],
1124 description: Some("Print env task".to_string()),
1125 ..Default::default()
1126 };
1127 let result = executor.execute_task("test", &task).await.unwrap();
1128 assert!(result.success);
1129 assert!(result.stdout.contains("test_value"));
1130 }
1131
1132 #[tokio::test]
1133 async fn test_execute_failing_task() {
1134 let config = ExecutorConfig {
1135 capture_output: OutputCapture::Capture,
1136 ..Default::default()
1137 };
1138 let executor = TaskExecutor::new(config);
1139 let task = Task {
1140 command: "false".to_string(),
1141 description: Some("Failing task".to_string()),
1142 ..Default::default()
1143 };
1144 let result = executor.execute_task("test", &task).await.unwrap();
1145 assert!(!result.success);
1146 assert_eq!(result.exit_code, Some(1));
1147 }
1148
1149 #[tokio::test]
1150 async fn test_execute_script_task() {
1151 let config = ExecutorConfig {
1152 capture_output: OutputCapture::Capture,
1153 ..Default::default()
1154 };
1155 let executor = TaskExecutor::new(config);
1156 let task = Task {
1157 script: Some("echo hello from script".to_string()),
1158 ..Default::default()
1159 };
1160
1161 let result = executor.execute_task("script", &task).await.unwrap();
1162
1163 assert!(result.success);
1164 assert_eq!(result.exit_code, Some(0));
1165 assert!(result.stdout.contains("hello from script"));
1166 }
1167
1168 #[tokio::test]
1169 async fn test_execute_script_task_with_shell_options() {
1170 let config = ExecutorConfig {
1171 capture_output: OutputCapture::Capture,
1172 ..Default::default()
1173 };
1174 let executor = TaskExecutor::new(config);
1175 let task = Task {
1176 script: Some("false\necho should-not-run".to_string()),
1177 script_shell: Some(super::super::ScriptShell::Bash),
1178 shell_options: Some(super::super::ShellOptions::default()),
1179 ..Default::default()
1180 };
1181
1182 let result = executor
1183 .execute_task("script.failfast", &task)
1184 .await
1185 .unwrap();
1186
1187 assert!(!result.success);
1188 assert_eq!(result.exit_code, Some(1));
1189 assert!(!result.stdout.contains("should-not-run"));
1190 }
1191
1192 #[tokio::test]
1193 async fn test_execute_script_task_rejects_pipefail_for_sh() {
1194 let config = ExecutorConfig {
1195 capture_output: OutputCapture::Capture,
1196 ..Default::default()
1197 };
1198 let executor = TaskExecutor::new(config);
1199 let task = Task {
1200 script: Some("echo hello".to_string()),
1201 script_shell: Some(super::super::ScriptShell::Sh),
1202 shell_options: Some(super::super::ShellOptions::default()),
1203 ..Default::default()
1204 };
1205
1206 let err = executor.execute_task("script.sh", &task).await.unwrap_err();
1207
1208 assert!(
1209 err.to_string()
1210 .contains("shellOptions.pipefail with unsupported script shell 'sh'"),
1211 "unexpected error: {err}"
1212 );
1213 }
1214
1215 #[tokio::test]
1216 async fn test_execute_script_task_rejects_unsupported_shell_options() {
1217 let config = ExecutorConfig {
1218 capture_output: OutputCapture::Capture,
1219 ..Default::default()
1220 };
1221 let executor = TaskExecutor::new(config);
1222 let task = Task {
1223 script: Some("console.log('hello')".to_string()),
1224 script_shell: Some(super::super::ScriptShell::Node),
1225 shell_options: Some(super::super::ShellOptions::default()),
1226 ..Default::default()
1227 };
1228
1229 let err = executor
1230 .execute_task("script.node", &task)
1231 .await
1232 .unwrap_err();
1233
1234 assert!(
1235 err.to_string().contains("unsupported script shell 'node'"),
1236 "unexpected error: {err}"
1237 );
1238 }
1239
1240 #[tokio::test]
1241 async fn test_execute_sequential_group() {
1242 let config = ExecutorConfig {
1243 capture_output: OutputCapture::Capture,
1244 ..Default::default()
1245 };
1246 let executor = TaskExecutor::new(config);
1247 let task1 = Task {
1248 command: "echo".to_string(),
1249 args: vec!["first".to_string()],
1250 description: Some("First task".to_string()),
1251 ..Default::default()
1252 };
1253 let task2 = Task {
1254 command: "echo".to_string(),
1255 args: vec!["second".to_string()],
1256 description: Some("Second task".to_string()),
1257 ..Default::default()
1258 };
1259 let sequence = vec![
1260 TaskNode::Task(Box::new(task1)),
1261 TaskNode::Task(Box::new(task2)),
1262 ];
1263 let all_tasks = Tasks::new();
1264 let node = TaskNode::Sequence(sequence);
1265 let results = executor
1266 .execute_node("seq", &node, &all_tasks)
1267 .await
1268 .unwrap();
1269 assert_eq!(results.len(), 2);
1270 assert!(results[0].stdout.contains("first"));
1271 assert!(results[1].stdout.contains("second"));
1272 }
1273
1274 #[tokio::test]
1275 async fn test_command_injection_prevention() {
1276 let config = ExecutorConfig {
1277 capture_output: OutputCapture::Capture,
1278 ..Default::default()
1279 };
1280 let executor = TaskExecutor::new(config);
1281 let malicious_task = Task {
1282 command: "echo".to_string(),
1283 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1284 description: Some("Malicious task test".to_string()),
1285 ..Default::default()
1286 };
1287 let result = executor
1288 .execute_task("malicious", &malicious_task)
1289 .await
1290 .unwrap();
1291 assert!(result.success);
1292 assert!(result.stdout.contains("hello ; rm -rf /"));
1293 }
1294
1295 #[tokio::test]
1296 async fn test_special_characters_in_args() {
1297 let config = ExecutorConfig {
1298 capture_output: OutputCapture::Capture,
1299 ..Default::default()
1300 };
1301 let executor = TaskExecutor::new(config);
1302 let special_chars = vec![
1303 "$USER",
1304 "$(whoami)",
1305 "`whoami`",
1306 "&& echo hacked",
1307 "|| echo failed",
1308 "> /tmp/hack",
1309 "| cat",
1310 ];
1311 for special_arg in special_chars {
1312 let task = Task {
1313 command: "echo".to_string(),
1314 args: vec!["safe".to_string(), special_arg.to_string()],
1315 description: Some("Special character test".to_string()),
1316 ..Default::default()
1317 };
1318 let result = executor.execute_task("special", &task).await.unwrap();
1319 assert!(result.success);
1320 assert!(result.stdout.contains("safe"));
1321 assert!(result.stdout.contains(special_arg));
1322 }
1323 }
1324
1325 #[tokio::test]
1326 async fn test_environment_variable_safety() {
1327 let mut config = ExecutorConfig {
1328 capture_output: OutputCapture::Capture,
1329 ..Default::default()
1330 };
1331 config
1332 .environment
1333 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1334 let executor = TaskExecutor::new(config);
1335 let task = Task {
1336 command: "printenv".to_string(),
1337 args: vec!["DANGEROUS_VAR".to_string()],
1338 description: Some("Environment variable safety test".to_string()),
1339 ..Default::default()
1340 };
1341 let result = executor.execute_task("env_test", &task).await.unwrap();
1342 assert!(result.success);
1343 assert!(result.stdout.contains("; rm -rf /"));
1344 }
1345
1346 #[tokio::test]
1347 async fn test_execute_graph_parallel_groups() {
1348 let config = ExecutorConfig {
1350 capture_output: OutputCapture::Capture,
1351 max_parallel: 2,
1352 ..Default::default()
1353 };
1354 let executor = TaskExecutor::new(config);
1355 let mut graph = TaskGraph::new();
1356
1357 let t1 = Task {
1358 command: "echo".into(),
1359 args: vec!["A".into()],
1360 ..Default::default()
1361 };
1362 let t2 = Task {
1363 command: "echo".into(),
1364 args: vec!["B".into()],
1365 ..Default::default()
1366 };
1367
1368 graph.add_task("t1", t1).unwrap();
1369 graph.add_task("t2", t2).unwrap();
1370 let results = executor.execute_graph(&graph).await.unwrap();
1371 assert_eq!(results.len(), 2);
1372 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1373 assert!(joined.contains("A") && joined.contains("B"));
1374 }
1375
1376 #[tokio::test]
1377 async fn test_execute_graph_respects_dependency_levels() {
1378 let tmp = TempDir::new().unwrap();
1379 let root = tmp.path();
1380
1381 let config = ExecutorConfig {
1382 capture_output: OutputCapture::Capture,
1383 max_parallel: 2,
1384 project_root: root.to_path_buf(),
1385 ..Default::default()
1386 };
1387 let executor = TaskExecutor::new(config);
1388
1389 let mut tasks = Tasks::new();
1390 tasks.tasks.insert(
1391 "dep".into(),
1392 TaskNode::Task(Box::new(Task {
1393 command: "sh".into(),
1394 args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
1395 ..Default::default()
1396 })),
1397 );
1398 tasks.tasks.insert(
1399 "consumer".into(),
1400 TaskNode::Task(Box::new(Task {
1401 command: "sh".into(),
1402 args: vec!["-c".into(), "cat marker.txt".into()],
1403 depends_on: vec![TaskDependency::from_name("dep")],
1404 ..Default::default()
1405 })),
1406 );
1407
1408 let mut graph = TaskGraph::new();
1409 graph.build_for_task("consumer", &tasks).unwrap();
1410
1411 let results = executor.execute_graph(&graph).await.unwrap();
1412 assert_eq!(results.len(), 2);
1413
1414 let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
1415 assert!(consumer.success);
1416 assert!(consumer.stdout.contains("ok"));
1417 }
1418
1419 #[tokio::test]
1420 async fn test_cache_hit_replays_task_output_events() {
1421 let workspace = TempDir::new().unwrap();
1422 let cache_root = TempDir::new().unwrap();
1423 std::fs::write(workspace.path().join("input.txt"), "v1").unwrap();
1424
1425 let cache = TaskCacheConfig {
1426 cas: Arc::new(LocalCas::open(cache_root.path()).unwrap()),
1427 action_cache: Arc::new(LocalActionCache::open(cache_root.path()).unwrap()),
1428 vcs_hasher: Arc::new(WalkHasher::new(workspace.path())),
1429 vcs_hasher_root: workspace.path().to_path_buf(),
1430 cuenv_version: "test".to_string(),
1431 runtime_identity_properties: std::collections::BTreeMap::new(),
1432 cache_disabled_reason: None,
1433 };
1434 let executor = TaskExecutor::new(ExecutorConfig {
1435 capture_output: OutputCapture::Capture,
1436 project_root: workspace.path().to_path_buf(),
1437 cache: Some(cache),
1438 ..Default::default()
1439 });
1440 let task = Task {
1441 command: "sh".to_string(),
1442 args: vec![
1443 "-c".to_string(),
1444 "printf 'hello\\n' && cat input.txt > out.txt".to_string(),
1445 ],
1446 inputs: vec![super::super::Input::Path("input.txt".to_string())],
1447 outputs: vec!["out.txt".to_string()],
1448 cache: Some(super::super::TaskCachePolicy {
1449 mode: super::super::TaskCacheMode::ReadWrite,
1450 max_age: None,
1451 }),
1452 ..Task::default()
1453 };
1454
1455 executor.execute_task("cached", &task).await.unwrap();
1456 std::fs::remove_file(workspace.path().join("out.txt")).unwrap();
1457
1458 let (tx, mut rx) = mpsc::unbounded_channel();
1459 let layer = CuenvEventLayer::new(tx);
1460 let subscriber = tracing_subscriber::registry().with(layer);
1461 let _guard = tracing::subscriber::set_default(subscriber);
1462
1463 let result = executor.execute_task("cached", &task).await.unwrap();
1464 assert!(result.success);
1465
1466 let mut saw_output = false;
1467 while let Ok(event) = rx.try_recv() {
1468 if let EventCategory::Task(TaskEvent::Output { name, content, .. }) = event.category
1469 && name == "cached"
1470 && content == "hello"
1471 {
1472 saw_output = true;
1473 break;
1474 }
1475 }
1476
1477 assert!(
1478 saw_output,
1479 "expected cached task output event to be replayed"
1480 );
1481 }
1482
1483 #[test]
1484 fn test_summarize_task_failure_with_exit_code() {
1485 let result = TaskResult {
1486 name: "build".to_string(),
1487 exit_code: Some(127),
1488 stdout: String::new(),
1489 stderr: "command not found".to_string(),
1490 success: false,
1491 };
1492 let summary = summarize_task_failure(&result, 10);
1493 assert!(summary.contains("build"));
1494 assert!(summary.contains("127"));
1495 assert!(summary.contains("command not found"));
1496 }
1497
1498 #[test]
1499 fn test_summarize_task_failure_no_exit_code() {
1500 let result = TaskResult {
1501 name: "killed".to_string(),
1502 exit_code: None,
1503 stdout: String::new(),
1504 stderr: String::new(),
1505 success: false,
1506 };
1507 let summary = summarize_task_failure(&result, 10);
1508 assert!(summary.contains("killed"));
1509 assert!(summary.contains("unknown"));
1510 }
1511
1512 #[test]
1513 fn test_summarize_task_failure_no_output() {
1514 let result = TaskResult {
1515 name: "silent".to_string(),
1516 exit_code: Some(1),
1517 stdout: String::new(),
1518 stderr: String::new(),
1519 success: false,
1520 };
1521 let summary = summarize_task_failure(&result, 10);
1522 assert!(summary.contains("No stdout/stderr"));
1523 assert!(summary.contains("RUST_LOG=debug"));
1524 }
1525
1526 #[test]
1527 fn test_summarize_task_failure_truncates_long_output() {
1528 let result = TaskResult {
1529 name: "verbose".to_string(),
1530 exit_code: Some(1),
1531 stdout: (1..=50)
1532 .map(|i| format!("line {}", i))
1533 .collect::<Vec<_>>()
1534 .join("\n"),
1535 stderr: String::new(),
1536 success: false,
1537 };
1538 let summary = summarize_task_failure(&result, 10);
1539 assert!(summary.contains("last 10 of 50 lines"));
1540 assert!(summary.contains("line 50"));
1541 assert!(!summary.contains("line 1\n")); }
1543
1544 #[test]
1545 fn test_summarize_stream_empty() {
1546 assert!(summarize_stream("test", "", 10).is_none());
1547 assert!(summarize_stream("test", " ", 10).is_none());
1548 assert!(summarize_stream("test", "\n\n", 10).is_none());
1549 }
1550
1551 #[test]
1552 fn test_summarize_stream_short() {
1553 let result = summarize_stream("stdout", "line 1\nline 2", 10).unwrap();
1554 assert!(result.contains("stdout:"));
1555 assert!(result.contains("line 1"));
1556 assert!(result.contains("line 2"));
1557 assert!(!result.contains("last"));
1558 }
1559
1560 #[test]
1561 fn test_format_failure_streams_both() {
1562 let result = TaskResult {
1563 name: "test".to_string(),
1564 exit_code: Some(1),
1565 stdout: "stdout content".to_string(),
1566 stderr: "stderr content".to_string(),
1567 success: false,
1568 };
1569 let formatted = format_failure_streams(&result, 10);
1570 assert!(formatted.contains("stdout:"));
1571 assert!(formatted.contains("stderr:"));
1572 assert!(formatted.contains("stdout content"));
1573 assert!(formatted.contains("stderr content"));
1574 }
1575
1576 #[test]
1577 fn test_find_workspace_root_with_npm() {
1578 let tmp = TempDir::new().unwrap();
1579 std::fs::write(
1581 tmp.path().join("package.json"),
1582 r#"{"workspaces": ["packages/*"]}"#,
1583 )
1584 .unwrap();
1585 let subdir = tmp.path().join("packages").join("subpkg");
1586 std::fs::create_dir_all(&subdir).unwrap();
1587
1588 let root = find_workspace_root(PackageManager::Npm, &subdir);
1589 assert_eq!(root, tmp.path().canonicalize().unwrap());
1590 }
1591
1592 #[test]
1593 fn test_find_workspace_root_with_pnpm() {
1594 let tmp = TempDir::new().unwrap();
1595 std::fs::write(
1596 tmp.path().join("pnpm-workspace.yaml"),
1597 "packages:\n - 'packages/*'",
1598 )
1599 .unwrap();
1600 let subdir = tmp.path().join("packages").join("app");
1601 std::fs::create_dir_all(&subdir).unwrap();
1602
1603 let root = find_workspace_root(PackageManager::Pnpm, &subdir);
1604 assert_eq!(root, tmp.path().canonicalize().unwrap());
1605 }
1606
1607 #[test]
1608 fn test_find_workspace_root_with_cargo() {
1609 let tmp = TempDir::new().unwrap();
1610 std::fs::write(
1611 tmp.path().join("Cargo.toml"),
1612 "[workspace]\nmembers = [\"crates/*\"]",
1613 )
1614 .unwrap();
1615 let subdir = tmp.path().join("crates").join("core");
1616 std::fs::create_dir_all(&subdir).unwrap();
1617
1618 let root = find_workspace_root(PackageManager::Cargo, &subdir);
1619 assert_eq!(root, tmp.path().canonicalize().unwrap());
1620 }
1621
1622 #[test]
1623 fn test_find_workspace_root_no_workspace() {
1624 let tmp = TempDir::new().unwrap();
1625 let root = find_workspace_root(PackageManager::Npm, tmp.path());
1626 assert_eq!(root, tmp.path().to_path_buf());
1628 }
1629
1630 #[test]
1631 fn test_package_json_has_workspaces_array() {
1632 let tmp = TempDir::new().unwrap();
1633 std::fs::write(
1634 tmp.path().join("package.json"),
1635 r#"{"workspaces": ["packages/*"]}"#,
1636 )
1637 .unwrap();
1638 assert!(package_json_has_workspaces(tmp.path()));
1639 }
1640
1641 #[test]
1642 fn test_package_json_has_workspaces_object() {
1643 let tmp = TempDir::new().unwrap();
1644 std::fs::write(
1645 tmp.path().join("package.json"),
1646 r#"{"workspaces": {"packages": ["packages/*"]}}"#,
1647 )
1648 .unwrap();
1649 assert!(package_json_has_workspaces(tmp.path()));
1650 }
1651
1652 #[test]
1653 fn test_package_json_no_workspaces() {
1654 let tmp = TempDir::new().unwrap();
1655 std::fs::write(tmp.path().join("package.json"), r#"{"name": "test"}"#).unwrap();
1656 assert!(!package_json_has_workspaces(tmp.path()));
1657 }
1658
1659 #[test]
1660 fn test_package_json_empty_workspaces() {
1661 let tmp = TempDir::new().unwrap();
1662 std::fs::write(tmp.path().join("package.json"), r#"{"workspaces": []}"#).unwrap();
1663 assert!(!package_json_has_workspaces(tmp.path()));
1664 }
1665
1666 #[test]
1667 fn test_package_json_missing() {
1668 let tmp = TempDir::new().unwrap();
1669 assert!(!package_json_has_workspaces(tmp.path()));
1670 }
1671
1672 #[test]
1673 fn test_cargo_toml_has_workspace() {
1674 let tmp = TempDir::new().unwrap();
1675 std::fs::write(
1676 tmp.path().join("Cargo.toml"),
1677 "[workspace]\nmembers = [\"crates/*\"]",
1678 )
1679 .unwrap();
1680 assert!(cargo_toml_has_workspace(tmp.path()));
1681 }
1682
1683 #[test]
1684 fn test_cargo_toml_no_workspace() {
1685 let tmp = TempDir::new().unwrap();
1686 std::fs::write(tmp.path().join("Cargo.toml"), "[package]\nname = \"test\"").unwrap();
1687 assert!(!cargo_toml_has_workspace(tmp.path()));
1688 }
1689
1690 #[test]
1691 fn test_cargo_toml_missing() {
1692 let tmp = TempDir::new().unwrap();
1693 assert!(!cargo_toml_has_workspace(tmp.path()));
1694 }
1695
1696 #[test]
1697 fn test_deno_json_has_workspace_array() {
1698 let tmp = TempDir::new().unwrap();
1699 std::fs::write(
1700 tmp.path().join("deno.json"),
1701 r#"{"workspace": ["./packages/*"]}"#,
1702 )
1703 .unwrap();
1704 assert!(deno_json_has_workspace(tmp.path()));
1705 }
1706
1707 #[test]
1708 fn test_deno_json_has_workspace_object() {
1709 let tmp = TempDir::new().unwrap();
1710 std::fs::write(
1711 tmp.path().join("deno.json"),
1712 r#"{"workspace": {"members": ["./packages/*"]}}"#,
1713 )
1714 .unwrap();
1715 assert!(deno_json_has_workspace(tmp.path()));
1716 }
1717
1718 #[test]
1719 fn test_deno_json_no_workspace() {
1720 let tmp = TempDir::new().unwrap();
1721 std::fs::write(tmp.path().join("deno.json"), r#"{"name": "test"}"#).unwrap();
1722 assert!(!deno_json_has_workspace(tmp.path()));
1723 }
1724
1725 #[test]
1726 fn test_deno_json_missing() {
1727 let tmp = TempDir::new().unwrap();
1728 assert!(!deno_json_has_workspace(tmp.path()));
1729 }
1730
1731 #[test]
1732 fn test_executor_config_with_fields() {
1733 let config = ExecutorConfig {
1734 capture_output: OutputCapture::Capture,
1735 max_parallel: 4,
1736 working_dir: Some(PathBuf::from("/tmp")),
1737 project_root: PathBuf::from("/project"),
1738 cue_module_root: Some(PathBuf::from("/project/cue.mod")),
1739 materialize_outputs: Some(PathBuf::from("/outputs")),
1740 cache_dir: Some(PathBuf::from("/cache")),
1741 show_cache_path: true,
1742 cli_backend: Some("host".to_string()),
1743 ..Default::default()
1744 };
1745 assert!(config.capture_output.should_capture());
1746 assert_eq!(config.max_parallel, 4);
1747 assert_eq!(config.working_dir, Some(PathBuf::from("/tmp")));
1748 assert!(config.show_cache_path);
1749 }
1750
1751 #[test]
1752 fn test_task_result_clone() {
1753 let result = TaskResult {
1754 name: "test".to_string(),
1755 exit_code: Some(0),
1756 stdout: "output".to_string(),
1757 stderr: "error".to_string(),
1758 success: true,
1759 };
1760 let cloned = result.clone();
1761 assert_eq!(cloned.name, result.name);
1762 assert_eq!(cloned.exit_code, result.exit_code);
1763 assert_eq!(cloned.stdout, result.stdout);
1764 assert_eq!(cloned.stderr, result.stderr);
1765 assert_eq!(cloned.success, result.success);
1766 }
1767
1768 #[test]
1769 fn test_task_failure_snippet_lines_constant() {
1770 assert_eq!(TASK_FAILURE_SNIPPET_LINES, 20);
1771 }
1772}