1use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
9use super::{ParallelGroup, Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
10use crate::cache::tasks as task_cache;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::manifest::WorkspaceConfig;
14use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
15use crate::{Error, Result};
16use async_recursion::async_recursion;
17use chrono::Utc;
18use cuenv_workspaces::{
19 BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
20 NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
21 PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
22 YarnModernLockfileParser, detect_from_command, detect_package_managers,
23 materializer::{
24 Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
25 },
26};
27use std::collections::{BTreeMap, HashMap, HashSet};
28use std::path::{Path, PathBuf};
29use std::process::Stdio;
30use std::sync::{Arc, OnceLock};
31use tokio::process::Command;
32use tokio::task::JoinSet;
33
34#[derive(Debug, Clone)]
36pub struct TaskResult {
37 pub name: String,
38 pub exit_code: Option<i32>,
39 pub stdout: String,
40 pub stderr: String,
41 pub success: bool,
42}
43
44pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
46
47#[derive(Debug, Clone)]
49pub struct ExecutorConfig {
50 pub capture_output: bool,
52 pub max_parallel: usize,
54 pub environment: Environment,
56 pub working_dir: Option<PathBuf>,
58 pub project_root: PathBuf,
60 pub cue_module_root: Option<PathBuf>,
62 pub materialize_outputs: Option<PathBuf>,
64 pub cache_dir: Option<PathBuf>,
66 pub show_cache_path: bool,
68 pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
70 pub backend_config: Option<BackendConfig>,
72 pub cli_backend: Option<String>,
74}
75
76impl Default for ExecutorConfig {
77 fn default() -> Self {
78 Self {
79 capture_output: false,
80 max_parallel: 0,
81 environment: Environment::new(),
82 working_dir: None,
83 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
84 cue_module_root: None,
85 materialize_outputs: None,
86 cache_dir: None,
87 show_cache_path: false,
88 workspaces: None,
89 backend_config: None,
90 cli_backend: None,
91 }
92 }
93}
94
95pub struct TaskExecutor {
97 config: ExecutorConfig,
98 backend: Arc<dyn TaskBackend>,
99}
100impl TaskExecutor {
101 pub fn new(config: ExecutorConfig) -> Self {
103 Self::with_dagger_factory(config, None)
104 }
105
106 pub fn with_dagger_factory(
110 config: ExecutorConfig,
111 dagger_factory: Option<BackendFactory>,
112 ) -> Self {
113 let backend = create_backend_with_factory(
114 config.backend_config.as_ref(),
115 config.project_root.clone(),
116 config.cli_backend.as_deref(),
117 dagger_factory,
118 );
119 Self { config, backend }
120 }
121
122 fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
124 Self { config, backend }
125 }
126
127 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
129 if self.backend.name() == "dagger" {
138 return self
139 .backend
140 .execute(
141 name,
142 task,
143 &self.config.environment,
144 &self.config.project_root,
145 self.config.capture_output,
146 )
147 .await;
148 }
149
150 return self.execute_task_non_hermetic(name, task).await;
152
153 #[allow(unreachable_code)]
155 {
156 let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
158 let mut workspace_input_patterns = Vec::new();
159 let mut workspace_lockfile_hashes = BTreeMap::new();
160
161 for workspace_name in &task.workspaces {
162 if let Some(global_workspaces) = &self.config.workspaces {
163 if let Some(ws_config) = global_workspaces.get(workspace_name) {
164 if ws_config.enabled {
165 let (ws, entries, paths, hash) = self
166 .resolve_workspace(name, task, workspace_name, ws_config)
167 .await?;
168 workspace_ctxs.push((ws, entries));
169 workspace_input_patterns.extend(paths);
170 if let Some(h) = hash {
171 workspace_lockfile_hashes.insert(workspace_name.clone(), h);
172 }
173 }
174 } else {
175 tracing::warn!(
176 task = %name,
177 workspace = %workspace_name,
178 "Workspace not found in global configuration"
179 );
180 }
181 }
182 }
183
184 let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
189
190 let project_prefix = primary_workspace_root
191 .as_ref()
192 .and_then(|root| self.config.project_root.strip_prefix(root).ok())
193 .map(|p| p.to_path_buf());
194 let input_root = primary_workspace_root
195 .clone()
196 .unwrap_or_else(|| self.config.project_root.clone());
197
198 let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
199 let resolved_inputs = {
200 let _g = span_inputs.enter();
201 let resolver = InputResolver::new(&input_root);
202 let mut all_inputs = task.collect_all_inputs_with_prefix(project_prefix.as_deref());
203 all_inputs.extend(workspace_input_patterns.iter().cloned());
204 resolver.resolve(&all_inputs)?
205 };
206 if task_trace_enabled() {
207 tracing::info!(
208 task = %name,
209 input_root = %input_root.display(),
210 project_root = %self.config.project_root.display(),
211 inputs_count = resolved_inputs.files.len(),
212 workspace_inputs = workspace_input_patterns.len(),
213 "Resolved task inputs"
214 );
215 }
216
217 let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
219 let env_summary: BTreeMap<String, String> = self
221 .config
222 .environment
223 .vars
224 .iter()
225 .map(|(k, v)| (k.clone(), v.clone()))
226 .collect();
227 let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
228 let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
229 let shell_json = serde_json::to_value(&task.shell).ok();
230 let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
231 None
232 } else {
233 Some(workspace_lockfile_hashes)
234 };
235
236 let envelope = task_cache::CacheKeyEnvelope {
237 inputs: inputs_summary.clone(),
238 command: task.command.clone(),
239 args: task.args.clone(),
240 shell: shell_json,
241 env: env_summary.clone(),
242 cuenv_version: cuenv_version.clone(),
243 platform: platform.clone(),
244 workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
245 workspace_package_hashes: None,
247 };
248 let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
249
250 let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
252 let cache_hit = {
253 let _g = span_cache.enter();
254 task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
255 };
256
257 if let Some(hit) = cache_hit {
258 tracing::info!(
259 task = %name,
260 key = %cache_key,
261 path = %hit.path.display(),
262 "Task {} cache hit: {}. Skipping execution.",
263 name,
264 cache_key
265 );
266 if self.config.show_cache_path {
267 tracing::info!(cache_path = %hit.path.display(), "Cache path");
268 }
269 if let Some(dest) = &self.config.materialize_outputs {
270 let count = task_cache::materialize_outputs(
271 &cache_key,
272 dest,
273 self.config.cache_dir.as_deref(),
274 )?;
275 tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
276 }
277 let stdout_path = hit.path.join("logs").join("stdout.log");
282 let stderr_path = hit.path.join("logs").join("stderr.log");
283 let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
284 let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
285 if !(stdout.is_empty() && stderr.is_empty()) {
289 if !self.config.capture_output {
290 cuenv_events::emit_task_cache_hit!(name, cache_key);
291 if !stdout.is_empty() {
293 cuenv_events::emit_task_output!(name, "stdout", stdout);
294 }
295 if !stderr.is_empty() {
296 cuenv_events::emit_task_output!(name, "stderr", stderr);
297 }
298 }
299 return Ok(TaskResult {
300 name: name.to_string(),
301 exit_code: Some(0),
302 stdout,
303 stderr,
304 success: true,
305 });
306 } else {
307 tracing::info!(
308 task = %name,
309 key = %cache_key,
310 "Cache entry lacks logs; executing to backfill logs"
311 );
312 }
313 }
314
315 tracing::info!(
316 task = %name,
317 key = %cache_key,
318 "Task {} executing hermetically… key {}",
319 name,
320 cache_key
321 );
322
323 let hermetic_root = create_hermetic_dir(name, &cache_key)?;
324 if self.config.show_cache_path {
325 tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
326 }
327
328 let span_populate =
330 tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
331 {
332 let _g = span_populate.enter();
333 populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
334 }
335
336 for (ws, entries) in workspace_ctxs {
338 self.materialize_workspace(&ws, &entries, &hermetic_root)
339 .await?;
340 }
341
342 if let Some(ref inputs_from) = task.inputs_from {
344 for task_output in inputs_from {
345 let source_task = &task_output.task;
346 let source_cache_key = task_cache::lookup_latest(
347 &self.config.project_root,
348 source_task,
349 self.config.cache_dir.as_deref(),
350 )
351 .ok_or_else(|| {
352 Error::configuration(format!(
353 "Task '{}' depends on outputs from '{}' but no cached result found. \
354 Ensure '{}' runs before this task (add it to dependsOn).",
355 name, source_task, source_task
356 ))
357 })?;
358
359 let materialized = task_cache::materialize_outputs(
361 &source_cache_key,
362 &hermetic_root,
363 self.config.cache_dir.as_deref(),
364 )?;
365 tracing::info!(
366 task = %name,
367 source_task = %source_task,
368 materialized = materialized,
369 "Materialized outputs from dependent task"
370 );
371 }
372 }
373
374 let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
376
377 let mut cmd = if let Some(script) = &task.script {
379 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
381 (
382 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
383 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
384 )
385 } else {
386 ("bash".to_string(), "-c".to_string())
388 };
389
390 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
391 let mut cmd = Command::new(&resolved_shell);
392 cmd.arg(&shell_flag);
393 cmd.arg(script);
394 cmd
395 } else {
396 let resolved_command = self.config.environment.resolve_command(&task.command);
402
403 if let Some(shell) = &task.shell {
404 if shell.command.is_some() && shell.flag.is_some() {
405 let shell_command = shell.command.as_ref().unwrap();
406 let shell_flag = shell.flag.as_ref().unwrap();
407 let resolved_shell = self.config.environment.resolve_command(shell_command);
409 let mut cmd = Command::new(&resolved_shell);
410 cmd.arg(shell_flag);
411 if task.args.is_empty() {
412 cmd.arg(&resolved_command);
413 } else {
414 let full_command = if task.command.is_empty() {
415 task.args.join(" ")
416 } else {
417 format!("{} {}", resolved_command, task.args.join(" "))
418 };
419 cmd.arg(full_command);
420 }
421 cmd
422 } else {
423 let mut cmd = Command::new(&resolved_command);
424 for arg in &task.args {
425 cmd.arg(arg);
426 }
427 cmd
428 }
429 } else {
430 let mut cmd = Command::new(&resolved_command);
431 for arg in &task.args {
432 cmd.arg(arg);
433 }
434 cmd
435 }
436 };
437
438 let workdir = if let Some(dir) = &self.config.working_dir {
439 dir.clone()
440 } else if let Some(prefix) = project_prefix.as_ref() {
441 hermetic_root.join(prefix)
442 } else {
443 hermetic_root.clone()
444 };
445 std::fs::create_dir_all(&workdir).map_err(|e| Error::Io {
446 source: e,
447 path: Some(workdir.clone().into()),
448 operation: "create_dir_all".into(),
449 })?;
450 cmd.current_dir(&workdir);
451 let env_vars = self.config.environment.merge_with_system();
453 if task_trace_enabled() {
454 tracing::info!(
455 task = %name,
456 hermetic_root = %hermetic_root.display(),
457 workdir = %workdir.display(),
458 command = %task.command,
459 args = ?task.args,
460 env_count = env_vars.len(),
461 "Launching task command"
462 );
463 }
464 for (k, v) in env_vars {
465 cmd.env(k, v);
466 }
467
468 cmd.stdout(Stdio::piped());
471 cmd.stderr(Stdio::piped());
472
473 let stream_logs = !self.config.capture_output;
474 if stream_logs {
475 let cmd_str = if task.command.is_empty() {
476 task.args.join(" ")
477 } else {
478 format!("{} {}", task.command, task.args.join(" "))
479 };
480 cuenv_events::emit_task_started!(name, cmd_str, true);
481 }
482
483 let start = std::time::Instant::now();
484 let mut child = cmd.spawn().map_err(|e| {
485 Error::configuration(format!("Failed to spawn task '{}': {}", name, e))
486 })?;
487
488 let stdout_handle = child.stdout.take();
489 let stderr_handle = child.stderr.take();
490
491 let stdout_task = async move {
492 if let Some(mut stdout) = stdout_handle {
493 let mut output = Vec::new();
494 let mut buf = [0u8; 4096];
495 use std::io::Write;
496 use tokio::io::AsyncReadExt;
497
498 loop {
499 match stdout.read(&mut buf).await {
500 Ok(0) => break, Ok(n) => {
502 let chunk = &buf[0..n];
503 if stream_logs {
504 let mut handle = std::io::stdout().lock();
505 let _ = handle.write_all(chunk);
506 let _ = handle.flush();
507 }
508 output.extend_from_slice(chunk);
509 }
510 Err(_) => break,
511 }
512 }
513 String::from_utf8_lossy(&output).to_string()
514 } else {
515 String::new()
516 }
517 };
518
519 let stderr_task = async move {
520 if let Some(mut stderr) = stderr_handle {
521 let mut output = Vec::new();
522 let mut buf = [0u8; 4096];
523 use std::io::Write;
524 use tokio::io::AsyncReadExt;
525
526 loop {
527 match stderr.read(&mut buf).await {
528 Ok(0) => break, Ok(n) => {
530 let chunk = &buf[0..n];
531 if stream_logs {
532 let mut handle = std::io::stderr().lock();
533 let _ = handle.write_all(chunk);
534 let _ = handle.flush();
535 }
536 output.extend_from_slice(chunk);
537 }
538 Err(_) => break,
539 }
540 }
541 String::from_utf8_lossy(&output).to_string()
542 } else {
543 String::new()
544 }
545 };
546
547 let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
548
549 let status = child.wait().await.map_err(|e| {
550 Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
551 })?;
552 let duration = start.elapsed();
553
554 let exit_code = status.code().unwrap_or(1);
555 let success = status.success();
556 if !success {
557 tracing::warn!(task = %name, exit = exit_code, "Task failed");
558 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
559 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
560 } else {
561 tracing::info!(task = %name, "Task completed successfully");
562 }
563
564 let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
566 task.outputs
567 .iter()
568 .map(|o| prefix.join(o).to_string_lossy().to_string())
569 .collect()
570 } else {
571 task.outputs.clone()
572 };
573 let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
574 let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
575 let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
576
577 let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
579 if outputs_stage.exists() {
580 let _ = std::fs::remove_dir_all(&outputs_stage);
581 }
582 std::fs::create_dir_all(&outputs_stage).map_err(|e| Error::Io {
583 source: e,
584 path: Some(outputs_stage.clone().into()),
585 operation: "create_dir_all".into(),
586 })?;
587
588 for rel in &outputs {
589 let rel_for_project = project_prefix
590 .as_ref()
591 .and_then(|prefix| rel.strip_prefix(prefix).ok())
592 .unwrap_or(rel)
593 .to_path_buf();
594 let src = hermetic_root.join(rel);
595 #[allow(clippy::collapsible_if)]
598 if let Ok(meta) = std::fs::metadata(&src) {
599 if meta.is_file() {
600 let dst = outputs_stage.join(&rel_for_project);
601 if let Some(parent) = dst.parent() {
602 std::fs::create_dir_all(parent).map_err(|e| Error::Io {
603 source: e,
604 path: Some(parent.into()),
605 operation: "create_dir_all".into(),
606 })?;
607 }
608 std::fs::copy(&src, &dst).map_err(|e| Error::Io {
609 source: e,
610 path: Some(dst.into()),
611 operation: "copy".into(),
612 })?;
613 let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
614 output_index.push(task_cache::OutputIndexEntry {
615 rel_path: rel_for_project.to_string_lossy().to_string(),
616 size: meta.len(),
617 sha256: sha,
618 });
619 }
620 }
621 }
622
623 let mut warned = false;
625 for entry in walkdir::WalkDir::new(&hermetic_root)
626 .into_iter()
627 .filter_map(|e| e.ok())
628 {
629 let p = entry.path();
630 if p.is_dir() {
631 continue;
632 }
633 let rel = match p.strip_prefix(&hermetic_root) {
634 Ok(r) => r.to_path_buf(),
635 Err(_) => continue,
636 };
637 let rel_str = rel.to_string_lossy().to_string();
638 let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
639 let initial = initial_hashes.get(&rel_str);
640 let changed = match initial {
641 None => true,
642 Some(prev) => prev != &sha,
643 };
644 if changed && !outputs_set.contains(&rel) {
645 if !warned {
646 tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
647 warned = true;
648 }
649 tracing::debug!(path = %rel_str, "Undeclared write");
650 }
651 }
652
653 if success {
655 let meta = task_cache::TaskResultMeta {
656 task_name: name.to_string(),
657 command: task.command.clone(),
658 args: task.args.clone(),
659 env_summary,
660 inputs_summary: inputs_summary.clone(),
661 created_at: Utc::now(),
662 cuenv_version,
663 platform,
664 duration_ms: duration.as_millis(),
665 exit_code,
666 cache_key_envelope: envelope_json.clone(),
667 output_index,
668 };
669 let logs = task_cache::TaskLogs {
670 stdout: Some(stdout.clone()),
673 stderr: Some(stderr.clone()),
674 };
675 let cache_span = tracing::info_span!("cache.save", key = %cache_key);
676 {
677 let _g = cache_span.enter();
678 task_cache::save_result(
679 &cache_key,
680 &meta,
681 &outputs_stage,
682 &hermetic_root,
683 logs,
684 self.config.cache_dir.as_deref(),
685 )?;
686 }
687
688 task_cache::record_latest(
690 &self.config.project_root,
691 name,
692 &cache_key,
693 self.config.cache_dir.as_deref(),
694 )?;
695
696 } else {
698 }
700
701 Ok(TaskResult {
702 name: name.to_string(),
703 exit_code: Some(exit_code),
704 stdout,
705 stderr,
706 success,
707 })
708 }
709 }
710
711 async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
715 if task.is_task_ref() && task.project_root.is_none() {
717 return Err(Error::configuration(format!(
718 "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
719 This usually means:\n\
720 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
721 - The referenced task '{}' doesn't exist in that project\n\
722 - There was an error loading the referenced project's env.cue\n\
723 Run with RUST_LOG=debug for more details.",
724 name,
725 task.task_ref.as_deref().unwrap_or("unknown"),
726 task.task_ref
727 .as_deref()
728 .and_then(|r| r.split(':').next_back())
729 .unwrap_or("unknown")
730 )));
731 }
732
733 let workdir = if let Some(ref dir) = task.directory {
740 self.config
742 .cue_module_root
743 .as_ref()
744 .unwrap_or(&self.config.project_root)
745 .join(dir)
746 } else if let Some(ref project_root) = task.project_root {
747 project_root.clone()
749 } else if let Some(ref source) = task.source {
750 if let Some(dir) = source.directory() {
752 self.config
753 .cue_module_root
754 .as_ref()
755 .unwrap_or(&self.config.project_root)
756 .join(dir)
757 } else {
758 self.config
762 .cue_module_root
763 .clone()
764 .unwrap_or_else(|| self.config.project_root.clone())
765 }
766 } else if !task.hermetic && !task.workspaces.is_empty() {
767 let workspace_name = &task.workspaces[0];
769 let manager = match workspace_name.as_str() {
770 "bun" => PackageManager::Bun,
771 "npm" => PackageManager::Npm,
772 "pnpm" => PackageManager::Pnpm,
773 "yarn" => PackageManager::YarnModern,
774 "cargo" => PackageManager::Cargo,
775 _ => PackageManager::Npm, };
777 find_workspace_root(manager, &self.config.project_root)
778 } else {
779 self.config.project_root.clone()
780 };
781
782 tracing::info!(
783 task = %name,
784 workdir = %workdir.display(),
785 hermetic = false,
786 "Executing non-hermetic task"
787 );
788
789 let cmd_str = if let Some(script) = &task.script {
791 format!("[script: {} bytes]", script.len())
792 } else if task.command.is_empty() {
793 task.args.join(" ")
794 } else {
795 format!("{} {}", task.command, task.args.join(" "))
796 };
797 if !self.config.capture_output {
798 cuenv_events::emit_task_started!(name, cmd_str, false);
799 }
800
801 let mut cmd = if let Some(script) = &task.script {
803 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
805 (
806 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
807 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
808 )
809 } else {
810 ("bash".to_string(), "-c".to_string())
812 };
813
814 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
815 let mut cmd = Command::new(&resolved_shell);
816 cmd.arg(&shell_flag);
817 cmd.arg(script);
818 cmd
819 } else {
820 let resolved_command = self.config.environment.resolve_command(&task.command);
822
823 if let Some(shell) = &task.shell {
824 if shell.command.is_some() && shell.flag.is_some() {
825 let shell_command = shell.command.as_ref().unwrap();
826 let shell_flag = shell.flag.as_ref().unwrap();
827 let resolved_shell = self.config.environment.resolve_command(shell_command);
828 let mut cmd = Command::new(&resolved_shell);
829 cmd.arg(shell_flag);
830 if task.args.is_empty() {
831 cmd.arg(&resolved_command);
832 } else {
833 let full_command = if task.command.is_empty() {
834 task.args.join(" ")
835 } else {
836 format!("{} {}", resolved_command, task.args.join(" "))
837 };
838 cmd.arg(full_command);
839 }
840 cmd
841 } else {
842 let mut cmd = Command::new(&resolved_command);
843 for arg in &task.args {
844 cmd.arg(arg);
845 }
846 cmd
847 }
848 } else {
849 let mut cmd = Command::new(&resolved_command);
850 for arg in &task.args {
851 cmd.arg(arg);
852 }
853 cmd
854 }
855 };
856
857 cmd.current_dir(&workdir);
859 let env_vars = self.config.environment.merge_with_system();
860 for (k, v) in &env_vars {
861 cmd.env(k, v);
862 }
863
864 if self.config.capture_output {
867 let output = cmd
868 .stdout(Stdio::piped())
869 .stderr(Stdio::piped())
870 .output()
871 .await
872 .map_err(|e| Error::Io {
873 source: e,
874 path: None,
875 operation: format!("spawn task {}", name),
876 })?;
877
878 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
879 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
880 let exit_code = output.status.code().unwrap_or(-1);
881 let success = output.status.success();
882
883 if !success {
884 tracing::warn!(task = %name, exit = exit_code, "Task failed");
885 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
886 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
887 }
888
889 Ok(TaskResult {
890 name: name.to_string(),
891 exit_code: Some(exit_code),
892 stdout,
893 stderr,
894 success,
895 })
896 } else {
897 let status = cmd
899 .stdout(Stdio::inherit())
900 .stderr(Stdio::inherit())
901 .status()
902 .await
903 .map_err(|e| Error::Io {
904 source: e,
905 path: None,
906 operation: format!("spawn task {}", name),
907 })?;
908
909 let exit_code = status.code().unwrap_or(-1);
910 let success = status.success();
911
912 if !success {
913 tracing::warn!(task = %name, exit = exit_code, "Task failed");
914 }
915
916 Ok(TaskResult {
917 name: name.to_string(),
918 exit_code: Some(exit_code),
919 stdout: String::new(), stderr: String::new(),
921 success,
922 })
923 }
924 }
925
926 #[async_recursion]
928 pub async fn execute_definition(
929 &self,
930 name: &str,
931 definition: &TaskDefinition,
932 all_tasks: &Tasks,
933 ) -> Result<Vec<TaskResult>> {
934 match definition {
935 TaskDefinition::Single(task) => {
936 let result = self.execute_task(name, task.as_ref()).await?;
937 Ok(vec![result])
938 }
939 TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
940 }
941 }
942
943 async fn execute_group(
944 &self,
945 prefix: &str,
946 group: &TaskGroup,
947 all_tasks: &Tasks,
948 ) -> Result<Vec<TaskResult>> {
949 match group {
950 TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
951 TaskGroup::Parallel(group) => self.execute_parallel(prefix, group, all_tasks).await,
952 }
953 }
954
955 async fn execute_sequential(
956 &self,
957 prefix: &str,
958 tasks: &[TaskDefinition],
959 all_tasks: &Tasks,
960 ) -> Result<Vec<TaskResult>> {
961 if !self.config.capture_output {
962 cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
963 }
964 let mut results = Vec::new();
965 for (i, task_def) in tasks.iter().enumerate() {
966 let task_name = format!("{}[{}]", prefix, i);
967 let task_results = self
968 .execute_definition(&task_name, task_def, all_tasks)
969 .await?;
970 for result in &task_results {
971 if !result.success {
972 let message = format!(
973 "Sequential task group '{prefix}' halted.\n\n{}",
974 summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
975 );
976 return Err(Error::configuration(message));
977 }
978 }
979 results.extend(task_results);
980 }
981 Ok(results)
982 }
983
984 async fn execute_parallel(
985 &self,
986 prefix: &str,
987 group: &ParallelGroup,
988 all_tasks: &Tasks,
989 ) -> Result<Vec<TaskResult>> {
990 if let Some(default_task) = group.tasks.get("default") {
992 if !self.config.capture_output {
993 cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
994 }
995 let task_name = format!("{}.default", prefix);
998 return self
999 .execute_definition(&task_name, default_task, all_tasks)
1000 .await;
1001 }
1002
1003 if !self.config.capture_output {
1004 cuenv_events::emit_task_group_started!(prefix, false, group.tasks.len());
1005 }
1006 let mut join_set = JoinSet::new();
1007 let all_tasks = Arc::new(all_tasks.clone());
1008 let mut all_results = Vec::new();
1009 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
1010 if let Some(failed) = results.iter().find(|r| !r.success) {
1011 let message = format!(
1012 "Parallel task group '{prefix}' halted.\n\n{}",
1013 summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
1014 );
1015 return Err(Error::configuration(message));
1016 }
1017 all_results.extend(results);
1018 Ok(())
1019 };
1020 for (name, task_def) in &group.tasks {
1021 let task_name = format!("{}.{}", prefix, name);
1022 let task_def = task_def.clone();
1023 let all_tasks = Arc::clone(&all_tasks);
1024 let executor = self.clone_with_config();
1025 join_set.spawn(async move {
1026 executor
1027 .execute_definition(&task_name, &task_def, &all_tasks)
1028 .await
1029 });
1030 if self.config.max_parallel > 0
1031 && join_set.len() >= self.config.max_parallel
1032 && let Some(result) = join_set.join_next().await
1033 {
1034 match result {
1035 Ok(Ok(results)) => merge_results(results)?,
1036 Ok(Err(e)) => return Err(e),
1037 Err(e) => {
1038 return Err(Error::configuration(format!(
1039 "Task execution panicked: {}",
1040 e
1041 )));
1042 }
1043 }
1044 }
1045 }
1046 while let Some(result) = join_set.join_next().await {
1047 match result {
1048 Ok(Ok(results)) => merge_results(results)?,
1049 Ok(Err(e)) => return Err(e),
1050 Err(e) => {
1051 return Err(Error::configuration(format!(
1052 "Task execution panicked: {}",
1053 e
1054 )));
1055 }
1056 }
1057 }
1058 Ok(all_results)
1059 }
1060
1061 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
1062 let parallel_groups = graph.get_parallel_groups()?;
1063 let mut all_results = Vec::new();
1064
1065 for mut group in parallel_groups {
1073 let mut join_set = JoinSet::new();
1074
1075 while !group.is_empty() || !join_set.is_empty() {
1076 while let Some(node) = group.pop() {
1078 let task = node.task.clone();
1079 let name = node.name.clone();
1080 let executor = self.clone_with_config();
1081 join_set.spawn(async move { executor.execute_task(&name, &task).await });
1082
1083 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
1084 break;
1085 }
1086 }
1087
1088 if let Some(result) = join_set.join_next().await {
1089 match result {
1090 Ok(Ok(task_result)) => {
1091 if !task_result.success {
1092 join_set.abort_all();
1093 let message = format!(
1094 "Task graph execution halted.\n\n{}",
1095 summarize_task_failure(
1096 &task_result,
1097 TASK_FAILURE_SNIPPET_LINES,
1098 )
1099 );
1100 return Err(Error::configuration(message));
1101 }
1102 all_results.push(task_result);
1103 }
1104 Ok(Err(e)) => {
1105 join_set.abort_all();
1106 return Err(e);
1107 }
1108 Err(e) => {
1109 join_set.abort_all();
1110 return Err(Error::configuration(format!(
1111 "Task execution panicked: {}",
1112 e
1113 )));
1114 }
1115 }
1116 }
1117 }
1118 }
1119
1120 Ok(all_results)
1121 }
1122
1123 async fn resolve_workspace(
1124 &self,
1125 _task_name: &str,
1126 task: &Task,
1127 workspace_name: &str,
1128 config: &WorkspaceConfig,
1129 ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
1130 let root = self.config.project_root.clone();
1131 let task_label = _task_name.to_string();
1132 let command = task.command.clone();
1133 let config_pm = config.package_manager.clone();
1134 let config_root = config.root.clone();
1135 let mut packages = Vec::new();
1143
1144 let lockfile_override: Option<String> = None; let mut traverse_workspace_deps = true;
1151
1152 let workspace_name_owned = workspace_name.to_string();
1154 tokio::task::spawn_blocking(move || {
1155 let override_for_detection = lockfile_override.as_ref().map(|lock| {
1156 let candidate = PathBuf::from(lock);
1157 if candidate.is_absolute() {
1158 candidate
1159 } else {
1160 root.join(lock)
1161 }
1162 });
1163
1164 let manager = if let Some(pm_str) = config_pm {
1167 match pm_str.as_str() {
1168 "npm" => PackageManager::Npm,
1169 "bun" => PackageManager::Bun,
1170 "pnpm" => PackageManager::Pnpm,
1171 "yarn" => PackageManager::YarnModern,
1172 "yarn-classic" => PackageManager::YarnClassic,
1173 "cargo" => PackageManager::Cargo,
1174 _ => {
1175 return Err(Error::configuration(format!(
1176 "Unknown package manager: {}",
1177 pm_str
1178 )));
1179 }
1180 }
1181 } else {
1182 match workspace_name_owned.as_str() {
1184 "npm" => PackageManager::Npm,
1185 "bun" => PackageManager::Bun,
1186 "pnpm" => PackageManager::Pnpm,
1187 "yarn" => PackageManager::YarnModern,
1188 "cargo" => PackageManager::Cargo,
1189 _ => {
1190 let hint = detect_from_command(&command);
1193 let detected = match detect_package_managers(&root) {
1194 Ok(list) => list,
1195 Err(e) => {
1196 if override_for_detection.is_some() {
1197 Vec::new()
1198 } else {
1199 return Err(Error::configuration(format!(
1200 "Failed to detect package managers: {}",
1201 e
1202 )));
1203 }
1204 }
1205 };
1206
1207 if let Some(h) = hint {
1208 if detected.contains(&h) {
1209 h
1210 } else if !detected.is_empty() {
1211 detected[0]
1213 } else if let Some(ref override_path) = override_for_detection {
1214 infer_manager_from_lockfile(override_path).ok_or_else(|| {
1215 Error::configuration(
1216 "Unable to infer package manager from lockfile override",
1217 )
1218 })?
1219 } else {
1220 return Err(Error::configuration(
1221 format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
1222 ));
1223 }
1224 } else if !detected.is_empty() {
1225 detected[0]
1226 } else if let Some(ref override_path) = override_for_detection {
1227 infer_manager_from_lockfile(override_path).ok_or_else(|| {
1228 Error::configuration(
1229 "Unable to infer package manager from lockfile override",
1230 )
1231 })?
1232 } else {
1233 return Err(Error::configuration(
1234 "Could not detect package manager for workspace resolution",
1235 ));
1236 }
1237 }
1238 }
1239 };
1240
1241 let workspace_root = if let Some(root_override) = &config_root {
1245 root.join(root_override)
1246 } else {
1247 find_workspace_root(manager, &root)
1248 };
1249
1250 if task_trace_enabled() {
1251 tracing::info!(
1252 task = %task_label,
1253 manager = %manager,
1254 project_root = %root.display(),
1255 workspace_root = %workspace_root.display(),
1256 "Resolved workspace root for package manager"
1257 );
1258 }
1259
1260 let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
1261 let candidate = PathBuf::from(lock);
1262 if candidate.is_absolute() {
1263 candidate
1264 } else {
1265 workspace_root.join(lock)
1266 }
1267 });
1268
1269 let discovery: Box<dyn WorkspaceDiscovery> = match manager {
1271 PackageManager::Npm
1272 | PackageManager::Bun
1273 | PackageManager::YarnClassic
1274 | PackageManager::YarnModern => Box::new(PackageJsonDiscovery),
1275 PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
1276 PackageManager::Cargo => Box::new(CargoTomlDiscovery),
1277 };
1278
1279 let workspace = discovery.discover(&workspace_root).map_err(|e| {
1280 Error::configuration(format!("Failed to discover workspace: {}", e))
1281 })?;
1282
1283 let lockfile_path = if let Some(path) = lockfile_override_path {
1285 if !path.exists() {
1286 return Err(Error::configuration(format!(
1287 "Workspace lockfile override does not exist: {}",
1288 path.display()
1289 )));
1290 }
1291 path
1292 } else {
1293 workspace.lockfile.clone().ok_or_else(|| {
1294 Error::configuration("Workspace resolution requires a lockfile")
1295 })?
1296 };
1297
1298 let parser: Box<dyn LockfileParser> = match manager {
1299 PackageManager::Npm => Box::new(NpmLockfileParser),
1300 PackageManager::Bun => Box::new(BunLockfileParser),
1301 PackageManager::Pnpm => Box::new(PnpmLockfileParser),
1302 PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
1303 PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
1304 PackageManager::Cargo => Box::new(CargoLockfileParser),
1305 };
1306
1307 let entries = parser
1308 .parse(&lockfile_path)
1309 .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
1310 if task_trace_enabled() {
1311 tracing::info!(
1312 task = %task_label,
1313 lockfile = %lockfile_path.display(),
1314 members = entries.len(),
1315 "Parsed workspace lockfile"
1316 );
1317 }
1318
1319 let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
1321
1322 if packages.is_empty() {
1326 let current_member = workspace
1327 .members
1328 .iter()
1329 .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
1330 if let Some(member) = current_member {
1331 let inferred = vec![member.name.clone()];
1332 if task_trace_enabled() {
1333 tracing::info!(
1334 task = %task_label,
1335 inferred_packages = ?inferred,
1336 "Inferred workspace packages from current project"
1337 );
1338 }
1339 packages = inferred;
1340 traverse_workspace_deps = true;
1341 }
1342 }
1343
1344 let mut member_paths = Vec::new();
1346
1347 member_paths.push(manager.workspace_config_name().to_string());
1349 if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
1350 member_paths.push(rel.to_string_lossy().to_string());
1351 } else {
1352 member_paths.push(lockfile_path.to_string_lossy().to_string());
1353 }
1354
1355 if packages.is_empty() {
1356 for member in &workspace.members {
1357 let manifest_rel = member
1358 .path
1359 .join(manager.workspace_config_name());
1360 member_paths.push(manifest_rel.to_string_lossy().to_string());
1361 }
1362 } else {
1363 let mut to_visit: Vec<String> = packages.clone();
1364 let mut visited = HashSet::new();
1365
1366 while let Some(pkg_name) = to_visit.pop() {
1367 if visited.contains(&pkg_name) {
1368 continue;
1369 }
1370 visited.insert(pkg_name.clone());
1371
1372 if let Some(member) = workspace.find_member(&pkg_name) {
1373 let manifest_rel = member
1374 .path
1375 .join(manager.workspace_config_name());
1376 member_paths.push(manifest_rel.to_string_lossy().to_string());
1377
1378 if traverse_workspace_deps {
1380 let mut dependency_candidates: HashSet<String> = HashSet::new();
1381
1382 if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1383 for dep in &entry.dependencies {
1384 if entries
1385 .iter()
1386 .any(|e| e.name == dep.name && e.is_workspace_member)
1387 {
1388 dependency_candidates.insert(dep.name.clone());
1389 }
1390 }
1391 }
1392
1393 for dep_name in &member.dependencies {
1394 if workspace.find_member(dep_name).is_some() {
1395 dependency_candidates.insert(dep_name.clone());
1396 }
1397 }
1398
1399 for dep_name in dependency_candidates {
1400 to_visit.push(dep_name);
1401 }
1402 }
1403 }
1404 }
1405 }
1406
1407 if task_trace_enabled() {
1408 tracing::info!(
1409 task = %task_label,
1410 members = ?member_paths,
1411 "Workspace input member paths selected"
1412 );
1413 }
1414
1415 Ok((workspace, entries, member_paths, Some(hash)))
1416 })
1417 .await
1418 .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1419 }
1420
1421 async fn materialize_workspace(
1422 &self,
1423 workspace: &Workspace,
1424 entries: &[LockfileEntry],
1425 target_dir: &Path,
1426 ) -> Result<()> {
1427 let materializer: Box<dyn Materializer> = match workspace.manager {
1429 PackageManager::Npm
1430 | PackageManager::Bun
1431 | PackageManager::Pnpm
1432 | PackageManager::YarnClassic
1433 | PackageManager::YarnModern => Box::new(NodeModulesMaterializer),
1434 PackageManager::Cargo => Box::new(CargoMaterializer),
1435 };
1436
1437 materializer
1438 .materialize(workspace, entries, target_dir)
1439 .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1440 }
1441
1442 fn clone_with_config(&self) -> Self {
1443 Self::with_shared_backend(self.config.clone(), self.backend.clone())
1445 }
1446}
1447
1448fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1449 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1450
1451 loop {
1452 let is_root = match manager {
1453 PackageManager::Npm
1454 | PackageManager::Bun
1455 | PackageManager::YarnClassic
1456 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
1457 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1458 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
1459 };
1460
1461 if is_root {
1462 return current;
1463 }
1464
1465 if let Some(parent) = current.parent() {
1466 current = parent.to_path_buf();
1467 } else {
1468 return start.to_path_buf();
1469 }
1470 }
1471}
1472
1473fn package_json_has_workspaces(dir: &Path) -> bool {
1474 let path = dir.join("package.json");
1475 let content = std::fs::read_to_string(&path);
1476 let Ok(json) = content.and_then(|s| {
1477 serde_json::from_str::<serde_json::Value>(&s)
1478 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1479 }) else {
1480 return false;
1481 };
1482
1483 match json.get("workspaces") {
1484 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1485 Some(serde_json::Value::Object(map)) => map
1486 .get("packages")
1487 .and_then(|packages| packages.as_array())
1488 .map(|arr| !arr.is_empty())
1489 .unwrap_or(false),
1490 _ => false,
1491 }
1492}
1493
1494fn cargo_toml_has_workspace(dir: &Path) -> bool {
1495 let path = dir.join("Cargo.toml");
1496 let Ok(content) = std::fs::read_to_string(&path) else {
1497 return false;
1498 };
1499
1500 content.contains("[workspace]")
1501}
1502
1503fn task_trace_enabled() -> bool {
1504 static ENABLED: OnceLock<bool> = OnceLock::new();
1505 *ENABLED.get_or_init(|| {
1506 matches!(
1507 std::env::var("CUENV_TRACE_TASKS")
1508 .unwrap_or_default()
1509 .trim()
1510 .to_ascii_lowercase()
1511 .as_str(),
1512 "1" | "true" | "yes" | "on"
1513 )
1514 })
1515}
1516
1517pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1520 let exit_code = result
1521 .exit_code
1522 .map(|c| c.to_string())
1523 .unwrap_or_else(|| "unknown".to_string());
1524
1525 let mut sections = Vec::new();
1526 sections.push(format!(
1527 "Task '{}' failed with exit code {}.",
1528 result.name, exit_code
1529 ));
1530
1531 let output = format_failure_streams(result, max_output_lines);
1532 if output.is_empty() {
1533 sections.push(
1534 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1535 .to_string(),
1536 );
1537 } else {
1538 sections.push(output);
1539 }
1540
1541 sections.join("\n\n")
1542}
1543
1544fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1545 let mut streams = Vec::new();
1546
1547 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1548 streams.push(stdout);
1549 }
1550
1551 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1552 streams.push(stderr);
1553 }
1554
1555 streams.join("\n\n")
1556}
1557
1558fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1559 let normalized = content.trim_end();
1560 if normalized.is_empty() {
1561 return None;
1562 }
1563
1564 let lines: Vec<&str> = normalized.lines().collect();
1565 let total = lines.len();
1566 let start = total.saturating_sub(max_output_lines);
1567 let snippet = lines[start..].join("\n");
1568
1569 let header = if total > max_output_lines {
1570 format!("{label} (last {max_output_lines} of {total} lines):")
1571 } else {
1572 format!("{label}:")
1573 };
1574
1575 Some(format!("{header}\n{snippet}"))
1576}
1577
1578fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1579 match path.file_name().and_then(|n| n.to_str())? {
1580 "package-lock.json" => Some(PackageManager::Npm),
1581 "bun.lock" => Some(PackageManager::Bun),
1582 "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1583 "yarn.lock" => Some(PackageManager::YarnModern),
1584 "Cargo.lock" => Some(PackageManager::Cargo),
1585 _ => None,
1586 }
1587}
1588
1589fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1590 let sanitized_task = task_name
1593 .chars()
1594 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1595 .collect::<String>();
1596
1597 let base = std::env::temp_dir().join(format!(
1598 "cuenv-work-{}-{}",
1599 sanitized_task,
1600 &key[..12.min(key.len())]
1601 ));
1602
1603 if base.exists()
1606 && let Err(e) = std::fs::remove_dir_all(&base)
1607 {
1608 let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1611 let fallback = std::env::temp_dir().join(format!(
1612 "cuenv-work-{}-{}-{}",
1613 sanitized_task,
1614 &key[..12.min(key.len())],
1615 ts
1616 ));
1617 tracing::warn!(
1618 previous = %base.display(),
1619 fallback = %fallback.display(),
1620 error = %e,
1621 "Failed to clean previous hermetic workdir; using fresh fallback directory"
1622 );
1623 std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1624 source: e,
1625 path: Some(fallback.clone().into()),
1626 operation: "create_dir_all".into(),
1627 })?;
1628 return Ok(fallback);
1629 }
1630
1631 std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1632 source: e,
1633 path: Some(base.clone().into()),
1634 operation: "create_dir_all".into(),
1635 })?;
1636 Ok(base)
1637}
1638
1639pub async fn execute_command(
1641 command: &str,
1642 args: &[String],
1643 environment: &Environment,
1644) -> Result<i32> {
1645 tracing::info!("Executing command: {} {:?}", command, args);
1646 let mut cmd = Command::new(command);
1647 cmd.args(args);
1648 let env_vars = environment.merge_with_system();
1649 for (key, value) in env_vars {
1650 cmd.env(key, value);
1651 }
1652 cmd.stdout(Stdio::inherit());
1653 cmd.stderr(Stdio::inherit());
1654 cmd.stdin(Stdio::inherit());
1655 let status = cmd.status().await.map_err(|e| {
1656 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1657 })?;
1658 Ok(status.code().unwrap_or(1))
1659}
1660
1661#[cfg(test)]
1662mod tests {
1663 use super::*;
1664 use crate::tasks::Input;
1665 use std::fs;
1666 use tempfile::TempDir;
1667
1668 #[tokio::test]
1669 async fn test_executor_config_default() {
1670 let config = ExecutorConfig::default();
1671 assert!(!config.capture_output);
1672 assert_eq!(config.max_parallel, 0);
1673 assert!(config.environment.is_empty());
1674 }
1675
1676 #[tokio::test]
1677 async fn test_task_result() {
1678 let result = TaskResult {
1679 name: "test".to_string(),
1680 exit_code: Some(0),
1681 stdout: "output".to_string(),
1682 stderr: String::new(),
1683 success: true,
1684 };
1685 assert_eq!(result.name, "test");
1686 assert_eq!(result.exit_code, Some(0));
1687 assert!(result.success);
1688 assert_eq!(result.stdout, "output");
1689 }
1690
1691 #[tokio::test]
1692 async fn test_execute_simple_task() {
1693 let config = ExecutorConfig {
1694 capture_output: true,
1695 ..Default::default()
1696 };
1697 let executor = TaskExecutor::new(config);
1698 let task = Task {
1699 command: "echo".to_string(),
1700 args: vec!["hello".to_string()],
1701 description: Some("Hello task".to_string()),
1702 ..Default::default()
1703 };
1704 let result = executor.execute_task("test", &task).await.unwrap();
1705 assert!(result.success);
1706 assert_eq!(result.exit_code, Some(0));
1707 assert!(result.stdout.contains("hello"));
1708 }
1709
1710 #[tokio::test]
1711 async fn test_execute_with_environment() {
1712 let mut config = ExecutorConfig {
1713 capture_output: true,
1714 ..Default::default()
1715 };
1716 config
1717 .environment
1718 .set("TEST_VAR".to_string(), "test_value".to_string());
1719 let executor = TaskExecutor::new(config);
1720 let task = Task {
1721 command: "printenv".to_string(),
1722 args: vec!["TEST_VAR".to_string()],
1723 description: Some("Print env task".to_string()),
1724 ..Default::default()
1725 };
1726 let result = executor.execute_task("test", &task).await.unwrap();
1727 assert!(result.success);
1728 assert!(result.stdout.contains("test_value"));
1729 }
1730
1731 #[tokio::test]
1732 async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1733 let tmp = TempDir::new().unwrap();
1734 let root = tmp.path();
1735
1736 fs::write(
1738 root.join("package.json"),
1739 r#"{
1740 "name": "root-app",
1741 "version": "0.0.0",
1742 "workspaces": ["packages/*", "apps/*"],
1743 "dependencies": {
1744 "@rawkodeacademy/content-technologies": "workspace:*"
1745 }
1746}"#,
1747 )
1748 .unwrap();
1749 fs::write(
1752 root.join("bun.lock"),
1753 r#"{
1754 "lockfileVersion": 1,
1755 "workspaces": {
1756 "": {
1757 "name": "root-app",
1758 "dependencies": {
1759 "@rawkodeacademy/content-technologies": "workspace:*"
1760 }
1761 },
1762 "packages/content-technologies": {
1763 "name": "@rawkodeacademy/content-technologies",
1764 "version": "0.0.1"
1765 },
1766 "apps/site": {
1767 "version": "0.0.0",
1768 "dependencies": {
1769 "@rawkodeacademy/content-technologies": "workspace:*"
1770 }
1771 }
1772 },
1773 "packages": {}
1774}"#,
1775 )
1776 .unwrap();
1777
1778 fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1780 fs::write(
1781 root.join("packages/content-technologies/package.json"),
1782 r#"{
1783 "name": "@rawkodeacademy/content-technologies",
1784 "version": "0.0.1"
1785}"#,
1786 )
1787 .unwrap();
1788
1789 fs::create_dir_all(root.join("apps/site")).unwrap();
1790 fs::write(
1791 root.join("apps/site/package.json"),
1792 r#"{
1793 "name": "site",
1794 "version": "0.0.0",
1795 "dependencies": {
1796 "@rawkodeacademy/content-technologies": "workspace:*"
1797 }
1798}"#,
1799 )
1800 .unwrap();
1801
1802 let mut workspaces = HashMap::new();
1803 workspaces.insert(
1804 "bun".to_string(),
1805 WorkspaceConfig {
1806 enabled: true,
1807 package_manager: Some("bun".to_string()),
1808 root: None,
1809 hooks: None,
1810 },
1811 );
1812
1813 let config = ExecutorConfig {
1814 capture_output: true,
1815 project_root: root.join("apps/site"),
1816 workspaces: Some(workspaces),
1817 ..Default::default()
1818 };
1819 let executor = TaskExecutor::new(config);
1820
1821 let task = Task {
1822 command: "sh".to_string(),
1823 args: vec![
1824 "-c".to_string(),
1825 "find ../.. -maxdepth 4 -type d | sort".to_string(),
1826 ],
1827 inputs: vec![Input::Path("package.json".to_string())],
1828 workspaces: vec!["bun".to_string()],
1829 ..Default::default()
1830 };
1831
1832 let result = executor.execute_task("install", &task).await.unwrap();
1833 assert!(
1834 result.success,
1835 "command failed stdout='{}' stderr='{}'",
1836 result.stdout, result.stderr
1837 );
1838 assert!(
1839 result
1840 .stdout
1841 .split_whitespace()
1842 .any(|line| line.ends_with("packages/content-technologies")),
1843 "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1844 result.stdout,
1845 result.stderr
1846 );
1847 }
1848
1849 #[tokio::test]
1850 async fn test_execute_failing_task() {
1851 let config = ExecutorConfig {
1852 capture_output: true,
1853 ..Default::default()
1854 };
1855 let executor = TaskExecutor::new(config);
1856 let task = Task {
1857 command: "false".to_string(),
1858 description: Some("Failing task".to_string()),
1859 ..Default::default()
1860 };
1861 let result = executor.execute_task("test", &task).await.unwrap();
1862 assert!(!result.success);
1863 assert_eq!(result.exit_code, Some(1));
1864 }
1865
1866 #[tokio::test]
1867 async fn test_execute_sequential_group() {
1868 let config = ExecutorConfig {
1869 capture_output: true,
1870 ..Default::default()
1871 };
1872 let executor = TaskExecutor::new(config);
1873 let task1 = Task {
1874 command: "echo".to_string(),
1875 args: vec!["first".to_string()],
1876 description: Some("First task".to_string()),
1877 ..Default::default()
1878 };
1879 let task2 = Task {
1880 command: "echo".to_string(),
1881 args: vec!["second".to_string()],
1882 description: Some("Second task".to_string()),
1883 ..Default::default()
1884 };
1885 let group = TaskGroup::Sequential(vec![
1886 TaskDefinition::Single(Box::new(task1)),
1887 TaskDefinition::Single(Box::new(task2)),
1888 ]);
1889 let all_tasks = Tasks::new();
1890 let results = executor
1891 .execute_group("seq", &group, &all_tasks)
1892 .await
1893 .unwrap();
1894 assert_eq!(results.len(), 2);
1895 assert!(results[0].stdout.contains("first"));
1896 assert!(results[1].stdout.contains("second"));
1897 }
1898
1899 #[tokio::test]
1900 async fn test_command_injection_prevention() {
1901 let config = ExecutorConfig {
1902 capture_output: true,
1903 ..Default::default()
1904 };
1905 let executor = TaskExecutor::new(config);
1906 let malicious_task = Task {
1907 command: "echo".to_string(),
1908 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1909 description: Some("Malicious task test".to_string()),
1910 ..Default::default()
1911 };
1912 let result = executor
1913 .execute_task("malicious", &malicious_task)
1914 .await
1915 .unwrap();
1916 assert!(result.success);
1917 assert!(result.stdout.contains("hello ; rm -rf /"));
1918 }
1919
1920 #[tokio::test]
1921 async fn test_special_characters_in_args() {
1922 let config = ExecutorConfig {
1923 capture_output: true,
1924 ..Default::default()
1925 };
1926 let executor = TaskExecutor::new(config);
1927 let special_chars = vec![
1928 "$USER",
1929 "$(whoami)",
1930 "`whoami`",
1931 "&& echo hacked",
1932 "|| echo failed",
1933 "> /tmp/hack",
1934 "| cat",
1935 ];
1936 for special_arg in special_chars {
1937 let task = Task {
1938 command: "echo".to_string(),
1939 args: vec!["safe".to_string(), special_arg.to_string()],
1940 description: Some("Special character test".to_string()),
1941 ..Default::default()
1942 };
1943 let result = executor.execute_task("special", &task).await.unwrap();
1944 assert!(result.success);
1945 assert!(result.stdout.contains("safe"));
1946 assert!(result.stdout.contains(special_arg));
1947 }
1948 }
1949
1950 #[tokio::test]
1951 async fn test_environment_variable_safety() {
1952 let mut config = ExecutorConfig {
1953 capture_output: true,
1954 ..Default::default()
1955 };
1956 config
1957 .environment
1958 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1959 let executor = TaskExecutor::new(config);
1960 let task = Task {
1961 command: "printenv".to_string(),
1962 args: vec!["DANGEROUS_VAR".to_string()],
1963 description: Some("Environment variable safety test".to_string()),
1964 ..Default::default()
1965 };
1966 let result = executor.execute_task("env_test", &task).await.unwrap();
1967 assert!(result.success);
1968 assert!(result.stdout.contains("; rm -rf /"));
1969 }
1970
1971 #[tokio::test]
1972 async fn test_execute_graph_parallel_groups() {
1973 let config = ExecutorConfig {
1975 capture_output: true,
1976 max_parallel: 2,
1977 ..Default::default()
1978 };
1979 let executor = TaskExecutor::new(config);
1980 let mut graph = TaskGraph::new();
1981
1982 let t1 = Task {
1983 command: "echo".into(),
1984 args: vec!["A".into()],
1985 ..Default::default()
1986 };
1987 let t2 = Task {
1988 command: "echo".into(),
1989 args: vec!["B".into()],
1990 ..Default::default()
1991 };
1992
1993 graph.add_task("t1", t1).unwrap();
1994 graph.add_task("t2", t2).unwrap();
1995 let results = executor.execute_graph(&graph).await.unwrap();
1996 assert_eq!(results.len(), 2);
1997 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1998 assert!(joined.contains("A") && joined.contains("B"));
1999 }
2000
2001 #[tokio::test]
2002 async fn test_execute_graph_respects_dependency_levels() {
2003 let tmp = TempDir::new().unwrap();
2004 let root = tmp.path();
2005
2006 let config = ExecutorConfig {
2007 capture_output: true,
2008 max_parallel: 2,
2009 project_root: root.to_path_buf(),
2010 ..Default::default()
2011 };
2012 let executor = TaskExecutor::new(config);
2013
2014 let mut tasks = Tasks::new();
2015 tasks.tasks.insert(
2016 "dep".into(),
2017 TaskDefinition::Single(Box::new(Task {
2018 command: "sh".into(),
2019 args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
2020 ..Default::default()
2021 })),
2022 );
2023 tasks.tasks.insert(
2024 "consumer".into(),
2025 TaskDefinition::Single(Box::new(Task {
2026 command: "sh".into(),
2027 args: vec!["-c".into(), "cat marker.txt".into()],
2028 depends_on: vec!["dep".into()],
2029 ..Default::default()
2030 })),
2031 );
2032
2033 let mut graph = TaskGraph::new();
2034 graph.build_for_task("consumer", &tasks).unwrap();
2035
2036 let results = executor.execute_graph(&graph).await.unwrap();
2037 assert_eq!(results.len(), 2);
2038
2039 let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
2040 assert!(consumer.success);
2041 assert!(consumer.stdout.contains("ok"));
2042 }
2043}