1use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
9use super::{ParallelGroup, Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
10use crate::cache::tasks as task_cache;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::manifest::WorkspaceConfig;
14use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
15use crate::{Error, Result};
16use async_recursion::async_recursion;
17use chrono::Utc;
18use cuenv_workspaces::{
19 BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
20 NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
21 PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
22 YarnModernLockfileParser, detect_from_command, detect_package_managers,
23 materializer::{
24 Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
25 },
26};
27use std::collections::{BTreeMap, HashMap, HashSet};
28use std::path::{Path, PathBuf};
29use std::process::Stdio;
30use std::sync::{Arc, OnceLock};
31use tokio::process::Command;
32use tokio::task::JoinSet;
33
34#[derive(Debug, Clone)]
36pub struct TaskResult {
37 pub name: String,
38 pub exit_code: Option<i32>,
39 pub stdout: String,
40 pub stderr: String,
41 pub success: bool,
42}
43
44pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
46
47#[derive(Debug, Clone)]
49pub struct ExecutorConfig {
50 pub capture_output: bool,
52 pub max_parallel: usize,
54 pub environment: Environment,
56 pub working_dir: Option<PathBuf>,
58 pub project_root: PathBuf,
60 pub cue_module_root: Option<PathBuf>,
62 pub materialize_outputs: Option<PathBuf>,
64 pub cache_dir: Option<PathBuf>,
66 pub show_cache_path: bool,
68 pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
70 pub backend_config: Option<BackendConfig>,
72 pub cli_backend: Option<String>,
74}
75
76impl Default for ExecutorConfig {
77 fn default() -> Self {
78 Self {
79 capture_output: false,
80 max_parallel: 0,
81 environment: Environment::new(),
82 working_dir: None,
83 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
84 cue_module_root: None,
85 materialize_outputs: None,
86 cache_dir: None,
87 show_cache_path: false,
88 workspaces: None,
89 backend_config: None,
90 cli_backend: None,
91 }
92 }
93}
94
95pub struct TaskExecutor {
97 config: ExecutorConfig,
98 backend: Arc<dyn TaskBackend>,
99}
100impl TaskExecutor {
101 pub fn new(config: ExecutorConfig) -> Self {
103 Self::with_dagger_factory(config, None)
104 }
105
106 pub fn with_dagger_factory(
110 config: ExecutorConfig,
111 dagger_factory: Option<BackendFactory>,
112 ) -> Self {
113 let backend = create_backend_with_factory(
114 config.backend_config.as_ref(),
115 config.project_root.clone(),
116 config.cli_backend.as_deref(),
117 dagger_factory,
118 );
119 Self { config, backend }
120 }
121
122 fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
124 Self { config, backend }
125 }
126
127 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
129 if self.backend.name() == "dagger" {
138 return self
139 .backend
140 .execute(
141 name,
142 task,
143 &self.config.environment,
144 &self.config.project_root,
145 self.config.capture_output,
146 )
147 .await;
148 }
149
150 return self.execute_task_non_hermetic(name, task).await;
152
153 #[allow(unreachable_code)]
155 {
156 let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
158 let mut workspace_input_patterns = Vec::new();
159 let mut workspace_lockfile_hashes = BTreeMap::new();
160
161 for workspace_name in &task.workspaces {
162 if let Some(global_workspaces) = &self.config.workspaces {
163 if let Some(ws_config) = global_workspaces.get(workspace_name) {
164 if ws_config.enabled {
165 let (ws, entries, paths, hash) = self
166 .resolve_workspace(name, task, workspace_name, ws_config)
167 .await?;
168 workspace_ctxs.push((ws, entries));
169 workspace_input_patterns.extend(paths);
170 if let Some(h) = hash {
171 workspace_lockfile_hashes.insert(workspace_name.clone(), h);
172 }
173 }
174 } else {
175 tracing::warn!(
176 task = %name,
177 workspace = %workspace_name,
178 "Workspace not found in global configuration"
179 );
180 }
181 }
182 }
183
184 let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
189
190 let project_prefix = primary_workspace_root
191 .as_ref()
192 .and_then(|root| self.config.project_root.strip_prefix(root).ok())
193 .map(|p| p.to_path_buf());
194 let input_root = primary_workspace_root
195 .clone()
196 .unwrap_or_else(|| self.config.project_root.clone());
197
198 let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
199 let resolved_inputs = {
200 let _g = span_inputs.enter();
201 let resolver = InputResolver::new(&input_root);
202 let mut all_inputs = task.collect_all_inputs_with_prefix(project_prefix.as_deref());
203 all_inputs.extend(workspace_input_patterns.iter().cloned());
204 resolver.resolve(&all_inputs)?
205 };
206 if task_trace_enabled() {
207 tracing::info!(
208 task = %name,
209 input_root = %input_root.display(),
210 project_root = %self.config.project_root.display(),
211 inputs_count = resolved_inputs.files.len(),
212 workspace_inputs = workspace_input_patterns.len(),
213 "Resolved task inputs"
214 );
215 }
216
217 let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
219 let env_summary: BTreeMap<String, String> = self
221 .config
222 .environment
223 .vars
224 .iter()
225 .map(|(k, v)| (k.clone(), v.clone()))
226 .collect();
227 let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
228 let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
229 let shell_json = serde_json::to_value(&task.shell).ok();
230 let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
231 None
232 } else {
233 Some(workspace_lockfile_hashes)
234 };
235
236 let envelope = task_cache::CacheKeyEnvelope {
237 inputs: inputs_summary.clone(),
238 command: task.command.clone(),
239 args: task.args.clone(),
240 shell: shell_json,
241 env: env_summary.clone(),
242 cuenv_version: cuenv_version.clone(),
243 platform: platform.clone(),
244 workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
245 workspace_package_hashes: None,
247 };
248 let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
249
250 let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
252 let cache_hit = {
253 let _g = span_cache.enter();
254 task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
255 };
256
257 if let Some(hit) = cache_hit {
258 tracing::info!(
259 task = %name,
260 key = %cache_key,
261 path = %hit.path.display(),
262 "Task {} cache hit: {}. Skipping execution.",
263 name,
264 cache_key
265 );
266 if self.config.show_cache_path {
267 tracing::info!(cache_path = %hit.path.display(), "Cache path");
268 }
269 if let Some(dest) = &self.config.materialize_outputs {
270 let count = task_cache::materialize_outputs(
271 &cache_key,
272 dest,
273 self.config.cache_dir.as_deref(),
274 )?;
275 tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
276 }
277 let stdout_path = hit.path.join("logs").join("stdout.log");
282 let stderr_path = hit.path.join("logs").join("stderr.log");
283 let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
284 let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
285 if !(stdout.is_empty() && stderr.is_empty()) {
289 if !self.config.capture_output {
290 cuenv_events::emit_task_cache_hit!(name, cache_key);
291 if !stdout.is_empty() {
293 cuenv_events::emit_task_output!(name, "stdout", stdout);
294 }
295 if !stderr.is_empty() {
296 cuenv_events::emit_task_output!(name, "stderr", stderr);
297 }
298 }
299 return Ok(TaskResult {
300 name: name.to_string(),
301 exit_code: Some(0),
302 stdout,
303 stderr,
304 success: true,
305 });
306 } else {
307 tracing::info!(
308 task = %name,
309 key = %cache_key,
310 "Cache entry lacks logs; executing to backfill logs"
311 );
312 }
313 }
314
315 tracing::info!(
316 task = %name,
317 key = %cache_key,
318 "Task {} executing hermetically… key {}",
319 name,
320 cache_key
321 );
322
323 let hermetic_root = create_hermetic_dir(name, &cache_key)?;
324 if self.config.show_cache_path {
325 tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
326 }
327
328 let span_populate =
330 tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
331 {
332 let _g = span_populate.enter();
333 populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
334 }
335
336 for (ws, entries) in workspace_ctxs {
338 self.materialize_workspace(&ws, &entries, &hermetic_root)
339 .await?;
340 }
341
342 if let Some(ref inputs_from) = task.inputs_from {
344 for task_output in inputs_from {
345 let source_task = &task_output.task;
346 let source_cache_key = task_cache::lookup_latest(
347 &self.config.project_root,
348 source_task,
349 self.config.cache_dir.as_deref(),
350 )
351 .ok_or_else(|| {
352 Error::configuration(format!(
353 "Task '{}' depends on outputs from '{}' but no cached result found. \
354 Ensure '{}' runs before this task (add it to dependsOn).",
355 name, source_task, source_task
356 ))
357 })?;
358
359 let materialized = task_cache::materialize_outputs(
361 &source_cache_key,
362 &hermetic_root,
363 self.config.cache_dir.as_deref(),
364 )?;
365 tracing::info!(
366 task = %name,
367 source_task = %source_task,
368 materialized = materialized,
369 "Materialized outputs from dependent task"
370 );
371 }
372 }
373
374 let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
376
377 let mut cmd = if let Some(script) = &task.script {
379 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
381 (
382 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
383 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
384 )
385 } else {
386 ("bash".to_string(), "-c".to_string())
388 };
389
390 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
391 let mut cmd = Command::new(&resolved_shell);
392 cmd.arg(&shell_flag);
393 cmd.arg(script);
394 cmd
395 } else {
396 let resolved_command = self.config.environment.resolve_command(&task.command);
402
403 if let Some(shell) = &task.shell {
404 if shell.command.is_some() && shell.flag.is_some() {
405 let shell_command = shell.command.as_ref().unwrap();
406 let shell_flag = shell.flag.as_ref().unwrap();
407 let resolved_shell = self.config.environment.resolve_command(shell_command);
409 let mut cmd = Command::new(&resolved_shell);
410 cmd.arg(shell_flag);
411 if task.args.is_empty() {
412 cmd.arg(&resolved_command);
413 } else {
414 let full_command = if task.command.is_empty() {
415 task.args.join(" ")
416 } else {
417 format!("{} {}", resolved_command, task.args.join(" "))
418 };
419 cmd.arg(full_command);
420 }
421 cmd
422 } else {
423 let mut cmd = Command::new(&resolved_command);
424 for arg in &task.args {
425 cmd.arg(arg);
426 }
427 cmd
428 }
429 } else {
430 let mut cmd = Command::new(&resolved_command);
431 for arg in &task.args {
432 cmd.arg(arg);
433 }
434 cmd
435 }
436 };
437
438 let workdir = if let Some(dir) = &self.config.working_dir {
439 dir.clone()
440 } else if let Some(prefix) = project_prefix.as_ref() {
441 hermetic_root.join(prefix)
442 } else {
443 hermetic_root.clone()
444 };
445 std::fs::create_dir_all(&workdir).map_err(|e| Error::Io {
446 source: e,
447 path: Some(workdir.clone().into()),
448 operation: "create_dir_all".into(),
449 })?;
450 cmd.current_dir(&workdir);
451 let env_vars = self.config.environment.merge_with_system();
453 if task_trace_enabled() {
454 tracing::info!(
455 task = %name,
456 hermetic_root = %hermetic_root.display(),
457 workdir = %workdir.display(),
458 command = %task.command,
459 args = ?task.args,
460 env_count = env_vars.len(),
461 "Launching task command"
462 );
463 }
464 for (k, v) in env_vars {
465 cmd.env(k, v);
466 }
467
468 cmd.stdout(Stdio::piped());
471 cmd.stderr(Stdio::piped());
472
473 let stream_logs = !self.config.capture_output;
474 if stream_logs {
475 let cmd_str = if task.command.is_empty() {
476 task.args.join(" ")
477 } else {
478 format!("{} {}", task.command, task.args.join(" "))
479 };
480 cuenv_events::emit_task_started!(name, cmd_str, true);
481 }
482
483 let start = std::time::Instant::now();
484 let mut child = cmd.spawn().map_err(|e| {
485 Error::configuration(format!("Failed to spawn task '{}': {}", name, e))
486 })?;
487
488 let stdout_handle = child.stdout.take();
489 let stderr_handle = child.stderr.take();
490
491 let stdout_task = async move {
492 if let Some(mut stdout) = stdout_handle {
493 let mut output = Vec::new();
494 let mut buf = [0u8; 4096];
495 use std::io::Write;
496 use tokio::io::AsyncReadExt;
497
498 loop {
499 match stdout.read(&mut buf).await {
500 Ok(0) => break, Ok(n) => {
502 let chunk = &buf[0..n];
503 if stream_logs {
504 let mut handle = std::io::stdout().lock();
505 let _ = handle.write_all(chunk);
506 let _ = handle.flush();
507 }
508 output.extend_from_slice(chunk);
509 }
510 Err(_) => break,
511 }
512 }
513 String::from_utf8_lossy(&output).to_string()
514 } else {
515 String::new()
516 }
517 };
518
519 let stderr_task = async move {
520 if let Some(mut stderr) = stderr_handle {
521 let mut output = Vec::new();
522 let mut buf = [0u8; 4096];
523 use std::io::Write;
524 use tokio::io::AsyncReadExt;
525
526 loop {
527 match stderr.read(&mut buf).await {
528 Ok(0) => break, Ok(n) => {
530 let chunk = &buf[0..n];
531 if stream_logs {
532 let mut handle = std::io::stderr().lock();
533 let _ = handle.write_all(chunk);
534 let _ = handle.flush();
535 }
536 output.extend_from_slice(chunk);
537 }
538 Err(_) => break,
539 }
540 }
541 String::from_utf8_lossy(&output).to_string()
542 } else {
543 String::new()
544 }
545 };
546
547 let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
548
549 let status = child.wait().await.map_err(|e| {
550 Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
551 })?;
552 let duration = start.elapsed();
553
554 let exit_code = status.code().unwrap_or(1);
555 let success = status.success();
556 if !success {
557 tracing::warn!(task = %name, exit = exit_code, "Task failed");
558 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
559 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
560 } else {
561 tracing::info!(task = %name, "Task completed successfully");
562 }
563
564 let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
566 task.outputs
567 .iter()
568 .map(|o| prefix.join(o).to_string_lossy().to_string())
569 .collect()
570 } else {
571 task.outputs.clone()
572 };
573 let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
574 let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
575 let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
576
577 let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
579 if outputs_stage.exists() {
580 let _ = std::fs::remove_dir_all(&outputs_stage);
581 }
582 std::fs::create_dir_all(&outputs_stage).map_err(|e| Error::Io {
583 source: e,
584 path: Some(outputs_stage.clone().into()),
585 operation: "create_dir_all".into(),
586 })?;
587
588 for rel in &outputs {
589 let rel_for_project = project_prefix
590 .as_ref()
591 .and_then(|prefix| rel.strip_prefix(prefix).ok())
592 .unwrap_or(rel)
593 .to_path_buf();
594 let src = hermetic_root.join(rel);
595 #[allow(clippy::collapsible_if)]
598 if let Ok(meta) = std::fs::metadata(&src) {
599 if meta.is_file() {
600 let dst = outputs_stage.join(&rel_for_project);
601 if let Some(parent) = dst.parent() {
602 std::fs::create_dir_all(parent).map_err(|e| Error::Io {
603 source: e,
604 path: Some(parent.into()),
605 operation: "create_dir_all".into(),
606 })?;
607 }
608 std::fs::copy(&src, &dst).map_err(|e| Error::Io {
609 source: e,
610 path: Some(dst.into()),
611 operation: "copy".into(),
612 })?;
613 let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
614 output_index.push(task_cache::OutputIndexEntry {
615 rel_path: rel_for_project.to_string_lossy().to_string(),
616 size: meta.len(),
617 sha256: sha,
618 });
619 }
620 }
621 }
622
623 let mut warned = false;
625 for entry in walkdir::WalkDir::new(&hermetic_root)
626 .into_iter()
627 .filter_map(|e| e.ok())
628 {
629 let p = entry.path();
630 if p.is_dir() {
631 continue;
632 }
633 let rel = match p.strip_prefix(&hermetic_root) {
634 Ok(r) => r.to_path_buf(),
635 Err(_) => continue,
636 };
637 let rel_str = rel.to_string_lossy().to_string();
638 let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
639 let initial = initial_hashes.get(&rel_str);
640 let changed = match initial {
641 None => true,
642 Some(prev) => prev != &sha,
643 };
644 if changed && !outputs_set.contains(&rel) {
645 if !warned {
646 tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
647 warned = true;
648 }
649 tracing::debug!(path = %rel_str, "Undeclared write");
650 }
651 }
652
653 if success {
655 let meta = task_cache::TaskResultMeta {
656 task_name: name.to_string(),
657 command: task.command.clone(),
658 args: task.args.clone(),
659 env_summary,
660 inputs_summary: inputs_summary.clone(),
661 created_at: Utc::now(),
662 cuenv_version,
663 platform,
664 duration_ms: duration.as_millis(),
665 exit_code,
666 cache_key_envelope: envelope_json.clone(),
667 output_index,
668 };
669 let logs = task_cache::TaskLogs {
670 stdout: Some(stdout.clone()),
673 stderr: Some(stderr.clone()),
674 };
675 let cache_span = tracing::info_span!("cache.save", key = %cache_key);
676 {
677 let _g = cache_span.enter();
678 task_cache::save_result(
679 &cache_key,
680 &meta,
681 &outputs_stage,
682 &hermetic_root,
683 logs,
684 self.config.cache_dir.as_deref(),
685 )?;
686 }
687
688 task_cache::record_latest(
690 &self.config.project_root,
691 name,
692 &cache_key,
693 self.config.cache_dir.as_deref(),
694 )?;
695
696 } else {
698 }
700
701 Ok(TaskResult {
702 name: name.to_string(),
703 exit_code: Some(exit_code),
704 stdout,
705 stderr,
706 success,
707 })
708 }
709 }
710
711 async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
715 if task.is_task_ref() && task.project_root.is_none() {
717 return Err(Error::configuration(format!(
718 "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
719 This usually means:\n\
720 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
721 - The referenced task '{}' doesn't exist in that project\n\
722 - There was an error loading the referenced project's env.cue\n\
723 Run with RUST_LOG=debug for more details.",
724 name,
725 task.task_ref.as_deref().unwrap_or("unknown"),
726 task.task_ref
727 .as_deref()
728 .and_then(|r| r.split(':').next_back())
729 .unwrap_or("unknown")
730 )));
731 }
732
733 let workdir = if let Some(ref dir) = task.directory {
740 self.config
742 .cue_module_root
743 .as_ref()
744 .unwrap_or(&self.config.project_root)
745 .join(dir)
746 } else if let Some(ref project_root) = task.project_root {
747 project_root.clone()
749 } else if let Some(ref source) = task.source {
750 if let Some(dir) = source.directory() {
752 self.config
753 .cue_module_root
754 .as_ref()
755 .unwrap_or(&self.config.project_root)
756 .join(dir)
757 } else {
758 self.config
762 .cue_module_root
763 .clone()
764 .unwrap_or_else(|| self.config.project_root.clone())
765 }
766 } else if !task.hermetic && !task.workspaces.is_empty() {
767 let workspace_name = &task.workspaces[0];
769 let manager = match workspace_name.as_str() {
770 "bun" => PackageManager::Bun,
771 "npm" => PackageManager::Npm,
772 "pnpm" => PackageManager::Pnpm,
773 "yarn" => PackageManager::YarnModern,
774 "cargo" => PackageManager::Cargo,
775 _ => PackageManager::Npm, };
777 find_workspace_root(manager, &self.config.project_root)
778 } else {
779 self.config.project_root.clone()
780 };
781
782 tracing::info!(
783 task = %name,
784 workdir = %workdir.display(),
785 hermetic = false,
786 "Executing non-hermetic task"
787 );
788
789 let cmd_str = if let Some(script) = &task.script {
791 format!("[script: {} bytes]", script.len())
792 } else if task.command.is_empty() {
793 task.args.join(" ")
794 } else {
795 format!("{} {}", task.command, task.args.join(" "))
796 };
797 if !self.config.capture_output {
798 cuenv_events::emit_task_started!(name, cmd_str, false);
799 }
800
801 let mut cmd = if let Some(script) = &task.script {
803 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
805 (
806 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
807 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
808 )
809 } else {
810 ("bash".to_string(), "-c".to_string())
812 };
813
814 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
815 let mut cmd = Command::new(&resolved_shell);
816 cmd.arg(&shell_flag);
817 cmd.arg(script);
818 cmd
819 } else {
820 let resolved_command = self.config.environment.resolve_command(&task.command);
822
823 if let Some(shell) = &task.shell {
824 if shell.command.is_some() && shell.flag.is_some() {
825 let shell_command = shell.command.as_ref().unwrap();
826 let shell_flag = shell.flag.as_ref().unwrap();
827 let resolved_shell = self.config.environment.resolve_command(shell_command);
828 let mut cmd = Command::new(&resolved_shell);
829 cmd.arg(shell_flag);
830 if task.args.is_empty() {
831 cmd.arg(&resolved_command);
832 } else {
833 let full_command = if task.command.is_empty() {
834 task.args.join(" ")
835 } else {
836 format!("{} {}", resolved_command, task.args.join(" "))
837 };
838 cmd.arg(full_command);
839 }
840 cmd
841 } else {
842 let mut cmd = Command::new(&resolved_command);
843 for arg in &task.args {
844 cmd.arg(arg);
845 }
846 cmd
847 }
848 } else {
849 let mut cmd = Command::new(&resolved_command);
850 for arg in &task.args {
851 cmd.arg(arg);
852 }
853 cmd
854 }
855 };
856
857 cmd.current_dir(&workdir);
859 let env_vars = self.config.environment.merge_with_system();
860 for (k, v) in &env_vars {
861 cmd.env(k, v);
862 }
863
864 if self.config.capture_output {
867 let output = cmd
868 .stdout(Stdio::piped())
869 .stderr(Stdio::piped())
870 .output()
871 .await
872 .map_err(|e| Error::Io {
873 source: e,
874 path: None,
875 operation: format!("spawn task {}", name),
876 })?;
877
878 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
879 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
880 let exit_code = output.status.code().unwrap_or(-1);
881 let success = output.status.success();
882
883 if !success {
884 tracing::warn!(task = %name, exit = exit_code, "Task failed");
885 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
886 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
887 }
888
889 Ok(TaskResult {
890 name: name.to_string(),
891 exit_code: Some(exit_code),
892 stdout,
893 stderr,
894 success,
895 })
896 } else {
897 let status = cmd
899 .stdout(Stdio::inherit())
900 .stderr(Stdio::inherit())
901 .status()
902 .await
903 .map_err(|e| Error::Io {
904 source: e,
905 path: None,
906 operation: format!("spawn task {}", name),
907 })?;
908
909 let exit_code = status.code().unwrap_or(-1);
910 let success = status.success();
911
912 if !success {
913 tracing::warn!(task = %name, exit = exit_code, "Task failed");
914 }
915
916 Ok(TaskResult {
917 name: name.to_string(),
918 exit_code: Some(exit_code),
919 stdout: String::new(), stderr: String::new(),
921 success,
922 })
923 }
924 }
925
926 #[async_recursion]
928 pub async fn execute_definition(
929 &self,
930 name: &str,
931 definition: &TaskDefinition,
932 all_tasks: &Tasks,
933 ) -> Result<Vec<TaskResult>> {
934 match definition {
935 TaskDefinition::Single(task) => {
936 let result = self.execute_task(name, task.as_ref()).await?;
937 Ok(vec![result])
938 }
939 TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
940 }
941 }
942
943 async fn execute_group(
944 &self,
945 prefix: &str,
946 group: &TaskGroup,
947 all_tasks: &Tasks,
948 ) -> Result<Vec<TaskResult>> {
949 match group {
950 TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
951 TaskGroup::Parallel(group) => self.execute_parallel(prefix, group, all_tasks).await,
952 }
953 }
954
955 async fn execute_sequential(
956 &self,
957 prefix: &str,
958 tasks: &[TaskDefinition],
959 all_tasks: &Tasks,
960 ) -> Result<Vec<TaskResult>> {
961 if !self.config.capture_output {
962 cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
963 }
964 let mut results = Vec::new();
965 for (i, task_def) in tasks.iter().enumerate() {
966 let task_name = format!("{}[{}]", prefix, i);
967 let task_results = self
968 .execute_definition(&task_name, task_def, all_tasks)
969 .await?;
970 for result in &task_results {
971 if !result.success {
972 let message = format!(
973 "Sequential task group '{prefix}' halted.\n\n{}",
974 summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
975 );
976 return Err(Error::configuration(message));
977 }
978 }
979 results.extend(task_results);
980 }
981 Ok(results)
982 }
983
984 async fn execute_parallel(
985 &self,
986 prefix: &str,
987 group: &ParallelGroup,
988 all_tasks: &Tasks,
989 ) -> Result<Vec<TaskResult>> {
990 if let Some(default_task) = group.tasks.get("default") {
992 if !self.config.capture_output {
993 cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
994 }
995 let task_name = format!("{}.default", prefix);
998 return self
999 .execute_definition(&task_name, default_task, all_tasks)
1000 .await;
1001 }
1002
1003 if !self.config.capture_output {
1004 cuenv_events::emit_task_group_started!(prefix, false, group.tasks.len());
1005 }
1006 let mut join_set = JoinSet::new();
1007 let all_tasks = Arc::new(all_tasks.clone());
1008 let mut all_results = Vec::new();
1009 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
1010 if let Some(failed) = results.iter().find(|r| !r.success) {
1011 let message = format!(
1012 "Parallel task group '{prefix}' halted.\n\n{}",
1013 summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
1014 );
1015 return Err(Error::configuration(message));
1016 }
1017 all_results.extend(results);
1018 Ok(())
1019 };
1020 for (name, task_def) in &group.tasks {
1021 let task_name = format!("{}.{}", prefix, name);
1022 let task_def = task_def.clone();
1023 let all_tasks = Arc::clone(&all_tasks);
1024 let executor = self.clone_with_config();
1025 join_set.spawn(async move {
1026 executor
1027 .execute_definition(&task_name, &task_def, &all_tasks)
1028 .await
1029 });
1030 if self.config.max_parallel > 0
1031 && join_set.len() >= self.config.max_parallel
1032 && let Some(result) = join_set.join_next().await
1033 {
1034 match result {
1035 Ok(Ok(results)) => merge_results(results)?,
1036 Ok(Err(e)) => return Err(e),
1037 Err(e) => {
1038 return Err(Error::configuration(format!(
1039 "Task execution panicked: {}",
1040 e
1041 )));
1042 }
1043 }
1044 }
1045 }
1046 while let Some(result) = join_set.join_next().await {
1047 match result {
1048 Ok(Ok(results)) => merge_results(results)?,
1049 Ok(Err(e)) => return Err(e),
1050 Err(e) => {
1051 return Err(Error::configuration(format!(
1052 "Task execution panicked: {}",
1053 e
1054 )));
1055 }
1056 }
1057 }
1058 Ok(all_results)
1059 }
1060
1061 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
1062 let parallel_groups = graph.get_parallel_groups()?;
1063 let mut all_results = Vec::new();
1064
1065 for mut group in parallel_groups {
1073 let mut join_set = JoinSet::new();
1074
1075 while !group.is_empty() || !join_set.is_empty() {
1076 while let Some(node) = group.pop() {
1078 let task = node.task.clone();
1079 let name = node.name.clone();
1080 let executor = self.clone_with_config();
1081 join_set.spawn(async move { executor.execute_task(&name, &task).await });
1082
1083 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
1084 break;
1085 }
1086 }
1087
1088 if let Some(result) = join_set.join_next().await {
1089 match result {
1090 Ok(Ok(task_result)) => {
1091 if !task_result.success {
1092 join_set.abort_all();
1093 let message = format!(
1094 "Task graph execution halted.\n\n{}",
1095 summarize_task_failure(
1096 &task_result,
1097 TASK_FAILURE_SNIPPET_LINES,
1098 )
1099 );
1100 return Err(Error::configuration(message));
1101 }
1102 all_results.push(task_result);
1103 }
1104 Ok(Err(e)) => {
1105 join_set.abort_all();
1106 return Err(e);
1107 }
1108 Err(e) => {
1109 join_set.abort_all();
1110 return Err(Error::configuration(format!(
1111 "Task execution panicked: {}",
1112 e
1113 )));
1114 }
1115 }
1116 }
1117 }
1118 }
1119
1120 Ok(all_results)
1121 }
1122
1123 async fn resolve_workspace(
1124 &self,
1125 _task_name: &str,
1126 task: &Task,
1127 workspace_name: &str,
1128 config: &WorkspaceConfig,
1129 ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
1130 let root = self.config.project_root.clone();
1131 let task_label = _task_name.to_string();
1132 let command = task.command.clone();
1133 let config_pm = config.package_manager.clone();
1134 let config_root = config.root.clone();
1135 let mut packages = Vec::new();
1143
1144 let lockfile_override: Option<String> = None; let mut traverse_workspace_deps = true;
1151
1152 let workspace_name_owned = workspace_name.to_string();
1154 tokio::task::spawn_blocking(move || {
1155 let override_for_detection = lockfile_override.as_ref().map(|lock| {
1156 let candidate = PathBuf::from(lock);
1157 if candidate.is_absolute() {
1158 candidate
1159 } else {
1160 root.join(lock)
1161 }
1162 });
1163
1164 let manager = if let Some(pm_str) = config_pm {
1167 match pm_str.as_str() {
1168 "npm" => PackageManager::Npm,
1169 "bun" => PackageManager::Bun,
1170 "pnpm" => PackageManager::Pnpm,
1171 "yarn" => PackageManager::YarnModern,
1172 "yarn-classic" => PackageManager::YarnClassic,
1173 "cargo" => PackageManager::Cargo,
1174 _ => {
1175 return Err(Error::configuration(format!(
1176 "Unknown package manager: {}",
1177 pm_str
1178 )));
1179 }
1180 }
1181 } else {
1182 match workspace_name_owned.as_str() {
1184 "npm" => PackageManager::Npm,
1185 "bun" => PackageManager::Bun,
1186 "pnpm" => PackageManager::Pnpm,
1187 "yarn" => PackageManager::YarnModern,
1188 "cargo" => PackageManager::Cargo,
1189 _ => {
1190 let hint = detect_from_command(&command);
1193 let detected = match detect_package_managers(&root) {
1194 Ok(list) => list,
1195 Err(e) => {
1196 if override_for_detection.is_some() {
1197 Vec::new()
1198 } else {
1199 return Err(Error::configuration(format!(
1200 "Failed to detect package managers: {}",
1201 e
1202 )));
1203 }
1204 }
1205 };
1206
1207 if let Some(h) = hint {
1208 if detected.contains(&h) {
1209 h
1210 } else if !detected.is_empty() {
1211 detected[0]
1213 } else if let Some(ref override_path) = override_for_detection {
1214 infer_manager_from_lockfile(override_path).ok_or_else(|| {
1215 Error::configuration(
1216 "Unable to infer package manager from lockfile override",
1217 )
1218 })?
1219 } else {
1220 return Err(Error::configuration(
1221 format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
1222 ));
1223 }
1224 } else if !detected.is_empty() {
1225 detected[0]
1226 } else if let Some(ref override_path) = override_for_detection {
1227 infer_manager_from_lockfile(override_path).ok_or_else(|| {
1228 Error::configuration(
1229 "Unable to infer package manager from lockfile override",
1230 )
1231 })?
1232 } else {
1233 return Err(Error::configuration(
1234 "Could not detect package manager for workspace resolution",
1235 ));
1236 }
1237 }
1238 }
1239 };
1240
1241 let workspace_root = if let Some(root_override) = &config_root {
1245 root.join(root_override)
1246 } else {
1247 find_workspace_root(manager, &root)
1248 };
1249
1250 if task_trace_enabled() {
1251 tracing::info!(
1252 task = %task_label,
1253 manager = %manager,
1254 project_root = %root.display(),
1255 workspace_root = %workspace_root.display(),
1256 "Resolved workspace root for package manager"
1257 );
1258 }
1259
1260 let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
1261 let candidate = PathBuf::from(lock);
1262 if candidate.is_absolute() {
1263 candidate
1264 } else {
1265 workspace_root.join(lock)
1266 }
1267 });
1268
1269 let discovery: Box<dyn WorkspaceDiscovery> = match manager {
1271 PackageManager::Npm
1272 | PackageManager::Bun
1273 | PackageManager::YarnClassic
1274 | PackageManager::YarnModern
1275 | PackageManager::Deno => Box::new(PackageJsonDiscovery),
1276 PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
1277 PackageManager::Cargo => Box::new(CargoTomlDiscovery),
1278 };
1279
1280 let workspace = discovery.discover(&workspace_root).map_err(|e| {
1281 Error::configuration(format!("Failed to discover workspace: {}", e))
1282 })?;
1283
1284 let lockfile_path = if let Some(path) = lockfile_override_path {
1286 if !path.exists() {
1287 return Err(Error::configuration(format!(
1288 "Workspace lockfile override does not exist: {}",
1289 path.display()
1290 )));
1291 }
1292 path
1293 } else {
1294 workspace.lockfile.clone().ok_or_else(|| {
1295 Error::configuration("Workspace resolution requires a lockfile")
1296 })?
1297 };
1298
1299 let parser: Box<dyn LockfileParser> = match manager {
1300 PackageManager::Npm => Box::new(NpmLockfileParser),
1301 PackageManager::Bun => Box::new(BunLockfileParser),
1302 PackageManager::Pnpm => Box::new(PnpmLockfileParser),
1303 PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
1304 PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
1305 PackageManager::Cargo => Box::new(CargoLockfileParser),
1306 PackageManager::Deno => Box::new(NpmLockfileParser), };
1308
1309 let entries = parser
1310 .parse(&lockfile_path)
1311 .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
1312 if task_trace_enabled() {
1313 tracing::info!(
1314 task = %task_label,
1315 lockfile = %lockfile_path.display(),
1316 members = entries.len(),
1317 "Parsed workspace lockfile"
1318 );
1319 }
1320
1321 let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
1323
1324 if packages.is_empty() {
1328 let current_member = workspace
1329 .members
1330 .iter()
1331 .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
1332 if let Some(member) = current_member {
1333 let inferred = vec![member.name.clone()];
1334 if task_trace_enabled() {
1335 tracing::info!(
1336 task = %task_label,
1337 inferred_packages = ?inferred,
1338 "Inferred workspace packages from current project"
1339 );
1340 }
1341 packages = inferred;
1342 traverse_workspace_deps = true;
1343 }
1344 }
1345
1346 let mut member_paths = Vec::new();
1348
1349 member_paths.push(manager.workspace_config_name().to_string());
1351 if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
1352 member_paths.push(rel.to_string_lossy().to_string());
1353 } else {
1354 member_paths.push(lockfile_path.to_string_lossy().to_string());
1355 }
1356
1357 if packages.is_empty() {
1358 for member in &workspace.members {
1359 let manifest_rel = member
1360 .path
1361 .join(manager.workspace_config_name());
1362 member_paths.push(manifest_rel.to_string_lossy().to_string());
1363 }
1364 } else {
1365 let mut to_visit: Vec<String> = packages.clone();
1366 let mut visited = HashSet::new();
1367
1368 while let Some(pkg_name) = to_visit.pop() {
1369 if visited.contains(&pkg_name) {
1370 continue;
1371 }
1372 visited.insert(pkg_name.clone());
1373
1374 if let Some(member) = workspace.find_member(&pkg_name) {
1375 let manifest_rel = member
1376 .path
1377 .join(manager.workspace_config_name());
1378 member_paths.push(manifest_rel.to_string_lossy().to_string());
1379
1380 if traverse_workspace_deps {
1382 let mut dependency_candidates: HashSet<String> = HashSet::new();
1383
1384 if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1385 for dep in &entry.dependencies {
1386 if entries
1387 .iter()
1388 .any(|e| e.name == dep.name && e.is_workspace_member)
1389 {
1390 dependency_candidates.insert(dep.name.clone());
1391 }
1392 }
1393 }
1394
1395 for dep_name in &member.dependencies {
1396 if workspace.find_member(dep_name).is_some() {
1397 dependency_candidates.insert(dep_name.clone());
1398 }
1399 }
1400
1401 for dep_name in dependency_candidates {
1402 to_visit.push(dep_name);
1403 }
1404 }
1405 }
1406 }
1407 }
1408
1409 if task_trace_enabled() {
1410 tracing::info!(
1411 task = %task_label,
1412 members = ?member_paths,
1413 "Workspace input member paths selected"
1414 );
1415 }
1416
1417 Ok((workspace, entries, member_paths, Some(hash)))
1418 })
1419 .await
1420 .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1421 }
1422
1423 async fn materialize_workspace(
1424 &self,
1425 workspace: &Workspace,
1426 entries: &[LockfileEntry],
1427 target_dir: &Path,
1428 ) -> Result<()> {
1429 let materializer: Box<dyn Materializer> = match workspace.manager {
1431 PackageManager::Npm
1432 | PackageManager::Bun
1433 | PackageManager::Pnpm
1434 | PackageManager::YarnClassic
1435 | PackageManager::YarnModern
1436 | PackageManager::Deno => Box::new(NodeModulesMaterializer),
1437 PackageManager::Cargo => Box::new(CargoMaterializer),
1438 };
1439
1440 materializer
1441 .materialize(workspace, entries, target_dir)
1442 .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1443 }
1444
1445 fn clone_with_config(&self) -> Self {
1446 Self::with_shared_backend(self.config.clone(), self.backend.clone())
1448 }
1449}
1450
1451fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1452 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1453
1454 loop {
1455 let is_root = match manager {
1456 PackageManager::Npm
1457 | PackageManager::Bun
1458 | PackageManager::YarnClassic
1459 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
1460 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1461 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
1462 PackageManager::Deno => deno_json_has_workspace(¤t),
1463 };
1464
1465 if is_root {
1466 return current;
1467 }
1468
1469 if let Some(parent) = current.parent() {
1470 current = parent.to_path_buf();
1471 } else {
1472 return start.to_path_buf();
1473 }
1474 }
1475}
1476
1477fn package_json_has_workspaces(dir: &Path) -> bool {
1478 let path = dir.join("package.json");
1479 let content = std::fs::read_to_string(&path);
1480 let Ok(json) = content.and_then(|s| {
1481 serde_json::from_str::<serde_json::Value>(&s)
1482 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1483 }) else {
1484 return false;
1485 };
1486
1487 match json.get("workspaces") {
1488 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1489 Some(serde_json::Value::Object(map)) => map
1490 .get("packages")
1491 .and_then(|packages| packages.as_array())
1492 .map(|arr| !arr.is_empty())
1493 .unwrap_or(false),
1494 _ => false,
1495 }
1496}
1497
1498fn cargo_toml_has_workspace(dir: &Path) -> bool {
1499 let path = dir.join("Cargo.toml");
1500 let Ok(content) = std::fs::read_to_string(&path) else {
1501 return false;
1502 };
1503
1504 content.contains("[workspace]")
1505}
1506
1507fn deno_json_has_workspace(dir: &Path) -> bool {
1508 let path = dir.join("deno.json");
1509 let content = std::fs::read_to_string(&path);
1510 let Ok(json) = content.and_then(|s| {
1511 serde_json::from_str::<serde_json::Value>(&s)
1512 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1513 }) else {
1514 return false;
1515 };
1516
1517 match json.get("workspace") {
1519 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1520 Some(serde_json::Value::Object(_)) => true,
1521 _ => false,
1522 }
1523}
1524
1525fn task_trace_enabled() -> bool {
1526 static ENABLED: OnceLock<bool> = OnceLock::new();
1527 *ENABLED.get_or_init(|| {
1528 matches!(
1529 std::env::var("CUENV_TRACE_TASKS")
1530 .unwrap_or_default()
1531 .trim()
1532 .to_ascii_lowercase()
1533 .as_str(),
1534 "1" | "true" | "yes" | "on"
1535 )
1536 })
1537}
1538
1539pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1542 let exit_code = result
1543 .exit_code
1544 .map(|c| c.to_string())
1545 .unwrap_or_else(|| "unknown".to_string());
1546
1547 let mut sections = Vec::new();
1548 sections.push(format!(
1549 "Task '{}' failed with exit code {}.",
1550 result.name, exit_code
1551 ));
1552
1553 let output = format_failure_streams(result, max_output_lines);
1554 if output.is_empty() {
1555 sections.push(
1556 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1557 .to_string(),
1558 );
1559 } else {
1560 sections.push(output);
1561 }
1562
1563 sections.join("\n\n")
1564}
1565
1566fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1567 let mut streams = Vec::new();
1568
1569 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1570 streams.push(stdout);
1571 }
1572
1573 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1574 streams.push(stderr);
1575 }
1576
1577 streams.join("\n\n")
1578}
1579
1580fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1581 let normalized = content.trim_end();
1582 if normalized.is_empty() {
1583 return None;
1584 }
1585
1586 let lines: Vec<&str> = normalized.lines().collect();
1587 let total = lines.len();
1588 let start = total.saturating_sub(max_output_lines);
1589 let snippet = lines[start..].join("\n");
1590
1591 let header = if total > max_output_lines {
1592 format!("{label} (last {max_output_lines} of {total} lines):")
1593 } else {
1594 format!("{label}:")
1595 };
1596
1597 Some(format!("{header}\n{snippet}"))
1598}
1599
1600fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1601 match path.file_name().and_then(|n| n.to_str())? {
1602 "package-lock.json" => Some(PackageManager::Npm),
1603 "bun.lock" => Some(PackageManager::Bun),
1604 "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1605 "yarn.lock" => Some(PackageManager::YarnModern),
1606 "Cargo.lock" => Some(PackageManager::Cargo),
1607 _ => None,
1608 }
1609}
1610
1611fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1612 let sanitized_task = task_name
1615 .chars()
1616 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1617 .collect::<String>();
1618
1619 let base = std::env::temp_dir().join(format!(
1620 "cuenv-work-{}-{}",
1621 sanitized_task,
1622 &key[..12.min(key.len())]
1623 ));
1624
1625 if base.exists()
1628 && let Err(e) = std::fs::remove_dir_all(&base)
1629 {
1630 let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1633 let fallback = std::env::temp_dir().join(format!(
1634 "cuenv-work-{}-{}-{}",
1635 sanitized_task,
1636 &key[..12.min(key.len())],
1637 ts
1638 ));
1639 tracing::warn!(
1640 previous = %base.display(),
1641 fallback = %fallback.display(),
1642 error = %e,
1643 "Failed to clean previous hermetic workdir; using fresh fallback directory"
1644 );
1645 std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1646 source: e,
1647 path: Some(fallback.clone().into()),
1648 operation: "create_dir_all".into(),
1649 })?;
1650 return Ok(fallback);
1651 }
1652
1653 std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1654 source: e,
1655 path: Some(base.clone().into()),
1656 operation: "create_dir_all".into(),
1657 })?;
1658 Ok(base)
1659}
1660
1661pub async fn execute_command(
1663 command: &str,
1664 args: &[String],
1665 environment: &Environment,
1666) -> Result<i32> {
1667 tracing::info!("Executing command: {} {:?}", command, args);
1668 let mut cmd = Command::new(command);
1669 cmd.args(args);
1670 let env_vars = environment.merge_with_system();
1671 for (key, value) in env_vars {
1672 cmd.env(key, value);
1673 }
1674 cmd.stdout(Stdio::inherit());
1675 cmd.stderr(Stdio::inherit());
1676 cmd.stdin(Stdio::inherit());
1677 let status = cmd.status().await.map_err(|e| {
1678 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1679 })?;
1680 Ok(status.code().unwrap_or(1))
1681}
1682
1683#[cfg(test)]
1684mod tests {
1685 use super::*;
1686 use crate::tasks::Input;
1687 use std::fs;
1688 use tempfile::TempDir;
1689
1690 #[tokio::test]
1691 async fn test_executor_config_default() {
1692 let config = ExecutorConfig::default();
1693 assert!(!config.capture_output);
1694 assert_eq!(config.max_parallel, 0);
1695 assert!(config.environment.is_empty());
1696 }
1697
1698 #[tokio::test]
1699 async fn test_task_result() {
1700 let result = TaskResult {
1701 name: "test".to_string(),
1702 exit_code: Some(0),
1703 stdout: "output".to_string(),
1704 stderr: String::new(),
1705 success: true,
1706 };
1707 assert_eq!(result.name, "test");
1708 assert_eq!(result.exit_code, Some(0));
1709 assert!(result.success);
1710 assert_eq!(result.stdout, "output");
1711 }
1712
1713 #[tokio::test]
1714 async fn test_execute_simple_task() {
1715 let config = ExecutorConfig {
1716 capture_output: true,
1717 ..Default::default()
1718 };
1719 let executor = TaskExecutor::new(config);
1720 let task = Task {
1721 command: "echo".to_string(),
1722 args: vec!["hello".to_string()],
1723 description: Some("Hello task".to_string()),
1724 ..Default::default()
1725 };
1726 let result = executor.execute_task("test", &task).await.unwrap();
1727 assert!(result.success);
1728 assert_eq!(result.exit_code, Some(0));
1729 assert!(result.stdout.contains("hello"));
1730 }
1731
1732 #[tokio::test]
1733 async fn test_execute_with_environment() {
1734 let mut config = ExecutorConfig {
1735 capture_output: true,
1736 ..Default::default()
1737 };
1738 config
1739 .environment
1740 .set("TEST_VAR".to_string(), "test_value".to_string());
1741 let executor = TaskExecutor::new(config);
1742 let task = Task {
1743 command: "printenv".to_string(),
1744 args: vec!["TEST_VAR".to_string()],
1745 description: Some("Print env task".to_string()),
1746 ..Default::default()
1747 };
1748 let result = executor.execute_task("test", &task).await.unwrap();
1749 assert!(result.success);
1750 assert!(result.stdout.contains("test_value"));
1751 }
1752
1753 #[tokio::test]
1754 async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1755 let tmp = TempDir::new().unwrap();
1756 let root = tmp.path();
1757
1758 fs::write(
1760 root.join("package.json"),
1761 r#"{
1762 "name": "root-app",
1763 "version": "0.0.0",
1764 "workspaces": ["packages/*", "apps/*"],
1765 "dependencies": {
1766 "@rawkodeacademy/content-technologies": "workspace:*"
1767 }
1768}"#,
1769 )
1770 .unwrap();
1771 fs::write(
1774 root.join("bun.lock"),
1775 r#"{
1776 "lockfileVersion": 1,
1777 "workspaces": {
1778 "": {
1779 "name": "root-app",
1780 "dependencies": {
1781 "@rawkodeacademy/content-technologies": "workspace:*"
1782 }
1783 },
1784 "packages/content-technologies": {
1785 "name": "@rawkodeacademy/content-technologies",
1786 "version": "0.0.1"
1787 },
1788 "apps/site": {
1789 "version": "0.0.0",
1790 "dependencies": {
1791 "@rawkodeacademy/content-technologies": "workspace:*"
1792 }
1793 }
1794 },
1795 "packages": {}
1796}"#,
1797 )
1798 .unwrap();
1799
1800 fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1802 fs::write(
1803 root.join("packages/content-technologies/package.json"),
1804 r#"{
1805 "name": "@rawkodeacademy/content-technologies",
1806 "version": "0.0.1"
1807}"#,
1808 )
1809 .unwrap();
1810
1811 fs::create_dir_all(root.join("apps/site")).unwrap();
1812 fs::write(
1813 root.join("apps/site/package.json"),
1814 r#"{
1815 "name": "site",
1816 "version": "0.0.0",
1817 "dependencies": {
1818 "@rawkodeacademy/content-technologies": "workspace:*"
1819 }
1820}"#,
1821 )
1822 .unwrap();
1823
1824 let mut workspaces = HashMap::new();
1825 workspaces.insert(
1826 "bun".to_string(),
1827 WorkspaceConfig {
1828 enabled: true,
1829 package_manager: Some("bun".to_string()),
1830 root: None,
1831 hooks: None,
1832 },
1833 );
1834
1835 let config = ExecutorConfig {
1836 capture_output: true,
1837 project_root: root.join("apps/site"),
1838 workspaces: Some(workspaces),
1839 ..Default::default()
1840 };
1841 let executor = TaskExecutor::new(config);
1842
1843 let task = Task {
1844 command: "sh".to_string(),
1845 args: vec![
1846 "-c".to_string(),
1847 "find ../.. -maxdepth 4 -type d | sort".to_string(),
1848 ],
1849 inputs: vec![Input::Path("package.json".to_string())],
1850 workspaces: vec!["bun".to_string()],
1851 ..Default::default()
1852 };
1853
1854 let result = executor.execute_task("install", &task).await.unwrap();
1855 assert!(
1856 result.success,
1857 "command failed stdout='{}' stderr='{}'",
1858 result.stdout, result.stderr
1859 );
1860 assert!(
1861 result
1862 .stdout
1863 .split_whitespace()
1864 .any(|line| line.ends_with("packages/content-technologies")),
1865 "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1866 result.stdout,
1867 result.stderr
1868 );
1869 }
1870
1871 #[tokio::test]
1872 async fn test_execute_failing_task() {
1873 let config = ExecutorConfig {
1874 capture_output: true,
1875 ..Default::default()
1876 };
1877 let executor = TaskExecutor::new(config);
1878 let task = Task {
1879 command: "false".to_string(),
1880 description: Some("Failing task".to_string()),
1881 ..Default::default()
1882 };
1883 let result = executor.execute_task("test", &task).await.unwrap();
1884 assert!(!result.success);
1885 assert_eq!(result.exit_code, Some(1));
1886 }
1887
1888 #[tokio::test]
1889 async fn test_execute_sequential_group() {
1890 let config = ExecutorConfig {
1891 capture_output: true,
1892 ..Default::default()
1893 };
1894 let executor = TaskExecutor::new(config);
1895 let task1 = Task {
1896 command: "echo".to_string(),
1897 args: vec!["first".to_string()],
1898 description: Some("First task".to_string()),
1899 ..Default::default()
1900 };
1901 let task2 = Task {
1902 command: "echo".to_string(),
1903 args: vec!["second".to_string()],
1904 description: Some("Second task".to_string()),
1905 ..Default::default()
1906 };
1907 let group = TaskGroup::Sequential(vec![
1908 TaskDefinition::Single(Box::new(task1)),
1909 TaskDefinition::Single(Box::new(task2)),
1910 ]);
1911 let all_tasks = Tasks::new();
1912 let results = executor
1913 .execute_group("seq", &group, &all_tasks)
1914 .await
1915 .unwrap();
1916 assert_eq!(results.len(), 2);
1917 assert!(results[0].stdout.contains("first"));
1918 assert!(results[1].stdout.contains("second"));
1919 }
1920
1921 #[tokio::test]
1922 async fn test_command_injection_prevention() {
1923 let config = ExecutorConfig {
1924 capture_output: true,
1925 ..Default::default()
1926 };
1927 let executor = TaskExecutor::new(config);
1928 let malicious_task = Task {
1929 command: "echo".to_string(),
1930 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1931 description: Some("Malicious task test".to_string()),
1932 ..Default::default()
1933 };
1934 let result = executor
1935 .execute_task("malicious", &malicious_task)
1936 .await
1937 .unwrap();
1938 assert!(result.success);
1939 assert!(result.stdout.contains("hello ; rm -rf /"));
1940 }
1941
1942 #[tokio::test]
1943 async fn test_special_characters_in_args() {
1944 let config = ExecutorConfig {
1945 capture_output: true,
1946 ..Default::default()
1947 };
1948 let executor = TaskExecutor::new(config);
1949 let special_chars = vec![
1950 "$USER",
1951 "$(whoami)",
1952 "`whoami`",
1953 "&& echo hacked",
1954 "|| echo failed",
1955 "> /tmp/hack",
1956 "| cat",
1957 ];
1958 for special_arg in special_chars {
1959 let task = Task {
1960 command: "echo".to_string(),
1961 args: vec!["safe".to_string(), special_arg.to_string()],
1962 description: Some("Special character test".to_string()),
1963 ..Default::default()
1964 };
1965 let result = executor.execute_task("special", &task).await.unwrap();
1966 assert!(result.success);
1967 assert!(result.stdout.contains("safe"));
1968 assert!(result.stdout.contains(special_arg));
1969 }
1970 }
1971
1972 #[tokio::test]
1973 async fn test_environment_variable_safety() {
1974 let mut config = ExecutorConfig {
1975 capture_output: true,
1976 ..Default::default()
1977 };
1978 config
1979 .environment
1980 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1981 let executor = TaskExecutor::new(config);
1982 let task = Task {
1983 command: "printenv".to_string(),
1984 args: vec!["DANGEROUS_VAR".to_string()],
1985 description: Some("Environment variable safety test".to_string()),
1986 ..Default::default()
1987 };
1988 let result = executor.execute_task("env_test", &task).await.unwrap();
1989 assert!(result.success);
1990 assert!(result.stdout.contains("; rm -rf /"));
1991 }
1992
1993 #[tokio::test]
1994 async fn test_execute_graph_parallel_groups() {
1995 let config = ExecutorConfig {
1997 capture_output: true,
1998 max_parallel: 2,
1999 ..Default::default()
2000 };
2001 let executor = TaskExecutor::new(config);
2002 let mut graph = TaskGraph::new();
2003
2004 let t1 = Task {
2005 command: "echo".into(),
2006 args: vec!["A".into()],
2007 ..Default::default()
2008 };
2009 let t2 = Task {
2010 command: "echo".into(),
2011 args: vec!["B".into()],
2012 ..Default::default()
2013 };
2014
2015 graph.add_task("t1", t1).unwrap();
2016 graph.add_task("t2", t2).unwrap();
2017 let results = executor.execute_graph(&graph).await.unwrap();
2018 assert_eq!(results.len(), 2);
2019 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
2020 assert!(joined.contains("A") && joined.contains("B"));
2021 }
2022
2023 #[tokio::test]
2024 async fn test_execute_graph_respects_dependency_levels() {
2025 let tmp = TempDir::new().unwrap();
2026 let root = tmp.path();
2027
2028 let config = ExecutorConfig {
2029 capture_output: true,
2030 max_parallel: 2,
2031 project_root: root.to_path_buf(),
2032 ..Default::default()
2033 };
2034 let executor = TaskExecutor::new(config);
2035
2036 let mut tasks = Tasks::new();
2037 tasks.tasks.insert(
2038 "dep".into(),
2039 TaskDefinition::Single(Box::new(Task {
2040 command: "sh".into(),
2041 args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
2042 ..Default::default()
2043 })),
2044 );
2045 tasks.tasks.insert(
2046 "consumer".into(),
2047 TaskDefinition::Single(Box::new(Task {
2048 command: "sh".into(),
2049 args: vec!["-c".into(), "cat marker.txt".into()],
2050 depends_on: vec!["dep".into()],
2051 ..Default::default()
2052 })),
2053 );
2054
2055 let mut graph = TaskGraph::new();
2056 graph.build_for_task("consumer", &tasks).unwrap();
2057
2058 let results = executor.execute_graph(&graph).await.unwrap();
2059 assert_eq!(results.len(), 2);
2060
2061 let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
2062 assert!(consumer.success);
2063 assert!(consumer.stdout.contains("ok"));
2064 }
2065}