1use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
9use super::{ParallelGroup, Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
10use crate::cache::tasks as task_cache;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::manifest::WorkspaceConfig;
14use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
15use crate::{Error, Result};
16use async_recursion::async_recursion;
17use chrono::Utc;
18use cuenv_workspaces::{
19 BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
20 NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
21 PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
22 YarnModernLockfileParser, detect_from_command, detect_package_managers,
23 materializer::{
24 Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
25 },
26};
27use std::collections::{BTreeMap, HashMap, HashSet};
28use std::path::{Path, PathBuf};
29use std::process::Stdio;
30use std::sync::{Arc, OnceLock};
31use tokio::process::Command;
32use tokio::task::JoinSet;
33
34#[derive(Debug, Clone)]
36pub struct TaskResult {
37 pub name: String,
38 pub exit_code: Option<i32>,
39 pub stdout: String,
40 pub stderr: String,
41 pub success: bool,
42}
43
44pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
46
47#[derive(Debug, Clone)]
49pub struct ExecutorConfig {
50 pub capture_output: bool,
52 pub max_parallel: usize,
54 pub environment: Environment,
56 pub working_dir: Option<PathBuf>,
58 pub project_root: PathBuf,
60 pub cue_module_root: Option<PathBuf>,
62 pub materialize_outputs: Option<PathBuf>,
64 pub cache_dir: Option<PathBuf>,
66 pub show_cache_path: bool,
68 pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
70 pub backend_config: Option<BackendConfig>,
72 pub cli_backend: Option<String>,
74}
75
76impl Default for ExecutorConfig {
77 fn default() -> Self {
78 Self {
79 capture_output: false,
80 max_parallel: 0,
81 environment: Environment::new(),
82 working_dir: None,
83 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
84 cue_module_root: None,
85 materialize_outputs: None,
86 cache_dir: None,
87 show_cache_path: false,
88 workspaces: None,
89 backend_config: None,
90 cli_backend: None,
91 }
92 }
93}
94
95pub struct TaskExecutor {
97 config: ExecutorConfig,
98 backend: Arc<dyn TaskBackend>,
99}
100impl TaskExecutor {
101 pub fn new(config: ExecutorConfig) -> Self {
103 Self::with_dagger_factory(config, None)
104 }
105
106 pub fn with_dagger_factory(
110 config: ExecutorConfig,
111 dagger_factory: Option<BackendFactory>,
112 ) -> Self {
113 let backend = create_backend_with_factory(
114 config.backend_config.as_ref(),
115 config.project_root.clone(),
116 config.cli_backend.as_deref(),
117 dagger_factory,
118 );
119 Self { config, backend }
120 }
121
122 fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
124 Self { config, backend }
125 }
126
127 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
129 if self.backend.name() == "dagger" {
138 return self
139 .backend
140 .execute(
141 name,
142 task,
143 &self.config.environment,
144 &self.config.project_root,
145 self.config.capture_output,
146 )
147 .await;
148 }
149
150 return self.execute_task_non_hermetic(name, task).await;
152
153 #[allow(unreachable_code)]
155 {
156 let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
158 let mut workspace_input_patterns = Vec::new();
159 let mut workspace_lockfile_hashes = BTreeMap::new();
160
161 for workspace_name in &task.workspaces {
162 if let Some(global_workspaces) = &self.config.workspaces {
163 if let Some(ws_config) = global_workspaces.get(workspace_name) {
164 if ws_config.enabled {
165 let (ws, entries, paths, hash) = self
166 .resolve_workspace(name, task, workspace_name, ws_config)
167 .await?;
168 workspace_ctxs.push((ws, entries));
169 workspace_input_patterns.extend(paths);
170 if let Some(h) = hash {
171 workspace_lockfile_hashes.insert(workspace_name.clone(), h);
172 }
173 }
174 } else {
175 tracing::warn!(
176 task = %name,
177 workspace = %workspace_name,
178 "Workspace not found in global configuration"
179 );
180 }
181 }
182 }
183
184 let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
189
190 let project_prefix = primary_workspace_root
191 .as_ref()
192 .and_then(|root| self.config.project_root.strip_prefix(root).ok())
193 .map(|p| p.to_path_buf());
194 let input_root = primary_workspace_root
195 .clone()
196 .unwrap_or_else(|| self.config.project_root.clone());
197
198 let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
199 let resolved_inputs = {
200 let _g = span_inputs.enter();
201 let resolver = InputResolver::new(&input_root);
202 let mut all_inputs = task.collect_all_inputs_with_prefix(project_prefix.as_deref());
203 all_inputs.extend(workspace_input_patterns.iter().cloned());
204 resolver.resolve(&all_inputs)?
205 };
206 if task_trace_enabled() {
207 tracing::info!(
208 task = %name,
209 input_root = %input_root.display(),
210 project_root = %self.config.project_root.display(),
211 inputs_count = resolved_inputs.files.len(),
212 workspace_inputs = workspace_input_patterns.len(),
213 "Resolved task inputs"
214 );
215 }
216
217 let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
219 let env_summary: BTreeMap<String, String> = self
221 .config
222 .environment
223 .vars
224 .iter()
225 .map(|(k, v)| (k.clone(), v.clone()))
226 .collect();
227 let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
228 let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
229 let shell_json = serde_json::to_value(&task.shell).ok();
230 let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
231 None
232 } else {
233 Some(workspace_lockfile_hashes)
234 };
235
236 let envelope = task_cache::CacheKeyEnvelope {
237 inputs: inputs_summary.clone(),
238 command: task.command.clone(),
239 args: task.args.clone(),
240 shell: shell_json,
241 env: env_summary.clone(),
242 cuenv_version: cuenv_version.clone(),
243 platform: platform.clone(),
244 workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
245 workspace_package_hashes: None,
247 };
248 let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
249
250 let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
252 let cache_hit = {
253 let _g = span_cache.enter();
254 task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
255 };
256
257 if let Some(hit) = cache_hit {
258 tracing::info!(
259 task = %name,
260 key = %cache_key,
261 path = %hit.path.display(),
262 "Task {} cache hit: {}. Skipping execution.",
263 name,
264 cache_key
265 );
266 if self.config.show_cache_path {
267 tracing::info!(cache_path = %hit.path.display(), "Cache path");
268 }
269 if let Some(dest) = &self.config.materialize_outputs {
270 let count = task_cache::materialize_outputs(
271 &cache_key,
272 dest,
273 self.config.cache_dir.as_deref(),
274 )?;
275 tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
276 }
277 let stdout_path = hit.path.join("logs").join("stdout.log");
282 let stderr_path = hit.path.join("logs").join("stderr.log");
283 let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
284 let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
285 if !(stdout.is_empty() && stderr.is_empty()) {
289 if !self.config.capture_output {
290 cuenv_events::emit_task_cache_hit!(name, cache_key);
291 if !stdout.is_empty() {
293 cuenv_events::emit_task_output!(name, "stdout", stdout);
294 }
295 if !stderr.is_empty() {
296 cuenv_events::emit_task_output!(name, "stderr", stderr);
297 }
298 }
299 return Ok(TaskResult {
300 name: name.to_string(),
301 exit_code: Some(0),
302 stdout,
303 stderr,
304 success: true,
305 });
306 } else {
307 tracing::info!(
308 task = %name,
309 key = %cache_key,
310 "Cache entry lacks logs; executing to backfill logs"
311 );
312 }
313 }
314
315 tracing::info!(
316 task = %name,
317 key = %cache_key,
318 "Task {} executing hermetically… key {}",
319 name,
320 cache_key
321 );
322
323 let hermetic_root = create_hermetic_dir(name, &cache_key)?;
324 if self.config.show_cache_path {
325 tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
326 }
327
328 let span_populate =
330 tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
331 {
332 let _g = span_populate.enter();
333 populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
334 }
335
336 for (ws, entries) in workspace_ctxs {
338 self.materialize_workspace(&ws, &entries, &hermetic_root)
339 .await?;
340 }
341
342 if let Some(ref inputs_from) = task.inputs_from {
344 for task_output in inputs_from {
345 let source_task = &task_output.task;
346 let source_cache_key = task_cache::lookup_latest(
347 &self.config.project_root,
348 source_task,
349 self.config.cache_dir.as_deref(),
350 )
351 .ok_or_else(|| {
352 Error::configuration(format!(
353 "Task '{}' depends on outputs from '{}' but no cached result found. \
354 Ensure '{}' runs before this task (add it to dependsOn).",
355 name, source_task, source_task
356 ))
357 })?;
358
359 let materialized = task_cache::materialize_outputs(
361 &source_cache_key,
362 &hermetic_root,
363 self.config.cache_dir.as_deref(),
364 )?;
365 tracing::info!(
366 task = %name,
367 source_task = %source_task,
368 materialized = materialized,
369 "Materialized outputs from dependent task"
370 );
371 }
372 }
373
374 let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
376
377 let mut cmd = if let Some(script) = &task.script {
379 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
381 (
382 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
383 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
384 )
385 } else {
386 ("bash".to_string(), "-c".to_string())
388 };
389
390 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
391 let mut cmd = Command::new(&resolved_shell);
392 cmd.arg(&shell_flag);
393 cmd.arg(script);
394 cmd
395 } else {
396 let resolved_command = self.config.environment.resolve_command(&task.command);
402
403 if let Some(shell) = &task.shell {
404 if shell.command.is_some() && shell.flag.is_some() {
405 let shell_command = shell.command.as_ref().expect("checked is_some above");
406 let shell_flag = shell.flag.as_ref().expect("checked is_some above");
407 let resolved_shell = self.config.environment.resolve_command(shell_command);
409 let mut cmd = Command::new(&resolved_shell);
410 cmd.arg(shell_flag);
411 if task.args.is_empty() {
412 cmd.arg(&resolved_command);
413 } else {
414 let full_command = if task.command.is_empty() {
415 task.args.join(" ")
416 } else {
417 format!("{} {}", resolved_command, task.args.join(" "))
418 };
419 cmd.arg(full_command);
420 }
421 cmd
422 } else {
423 let mut cmd = Command::new(&resolved_command);
424 for arg in &task.args {
425 cmd.arg(arg);
426 }
427 cmd
428 }
429 } else {
430 let mut cmd = Command::new(&resolved_command);
431 for arg in &task.args {
432 cmd.arg(arg);
433 }
434 cmd
435 }
436 };
437
438 let workdir = if let Some(dir) = &self.config.working_dir {
439 dir.clone()
440 } else if let Some(prefix) = project_prefix.as_ref() {
441 hermetic_root.join(prefix)
442 } else {
443 hermetic_root.clone()
444 };
445 std::fs::create_dir_all(&workdir).map_err(|e| Error::Io {
446 source: e,
447 path: Some(workdir.clone().into()),
448 operation: "create_dir_all".into(),
449 })?;
450 cmd.current_dir(&workdir);
451 let env_vars = self.config.environment.merge_with_system();
453 if task_trace_enabled() {
454 tracing::info!(
455 task = %name,
456 hermetic_root = %hermetic_root.display(),
457 workdir = %workdir.display(),
458 command = %task.command,
459 args = ?task.args,
460 env_count = env_vars.len(),
461 "Launching task command"
462 );
463 }
464 for (k, v) in env_vars {
465 cmd.env(k, v);
466 }
467
468 cmd.stdout(Stdio::piped());
471 cmd.stderr(Stdio::piped());
472
473 let stream_logs = !self.config.capture_output;
474 if stream_logs {
475 let cmd_str = if task.command.is_empty() {
476 task.args.join(" ")
477 } else {
478 format!("{} {}", task.command, task.args.join(" "))
479 };
480 cuenv_events::emit_task_started!(name, cmd_str, true);
481 }
482
483 let start = std::time::Instant::now();
484 let mut child = cmd.spawn().map_err(|e| {
485 Error::configuration(format!("Failed to spawn task '{}': {}", name, e))
486 })?;
487
488 let stdout_handle = child.stdout.take();
489 let stderr_handle = child.stderr.take();
490
491 let stdout_task = async move {
492 if let Some(mut stdout) = stdout_handle {
493 let mut output = Vec::new();
494 let mut buf = [0u8; 4096];
495 use std::io::Write;
496 use tokio::io::AsyncReadExt;
497
498 loop {
499 match stdout.read(&mut buf).await {
500 Ok(0) => break, Ok(n) => {
502 let chunk = &buf[0..n];
503 if stream_logs {
504 let mut handle = std::io::stdout().lock();
505 let _ = handle.write_all(chunk);
506 let _ = handle.flush();
507 }
508 output.extend_from_slice(chunk);
509 }
510 Err(_) => break,
511 }
512 }
513 String::from_utf8_lossy(&output).to_string()
514 } else {
515 String::new()
516 }
517 };
518
519 let stderr_task = async move {
520 if let Some(mut stderr) = stderr_handle {
521 let mut output = Vec::new();
522 let mut buf = [0u8; 4096];
523 use std::io::Write;
524 use tokio::io::AsyncReadExt;
525
526 loop {
527 match stderr.read(&mut buf).await {
528 Ok(0) => break, Ok(n) => {
530 let chunk = &buf[0..n];
531 if stream_logs {
532 let mut handle = std::io::stderr().lock();
533 let _ = handle.write_all(chunk);
534 let _ = handle.flush();
535 }
536 output.extend_from_slice(chunk);
537 }
538 Err(_) => break,
539 }
540 }
541 String::from_utf8_lossy(&output).to_string()
542 } else {
543 String::new()
544 }
545 };
546
547 let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
548
549 let status = child.wait().await.map_err(|e| {
550 Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
551 })?;
552 let duration = start.elapsed();
553
554 let exit_code = status.code().unwrap_or(1);
555 let success = status.success();
556 if !success {
557 tracing::warn!(task = %name, exit = exit_code, "Task failed");
558 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
559 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
560 } else {
561 tracing::info!(task = %name, "Task completed successfully");
562 }
563
564 let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
566 task.outputs
567 .iter()
568 .map(|o| prefix.join(o).to_string_lossy().to_string())
569 .collect()
570 } else {
571 task.outputs.clone()
572 };
573 let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
574 let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
575 let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
576
577 let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
579 if outputs_stage.exists() {
580 let _ = std::fs::remove_dir_all(&outputs_stage);
581 }
582 std::fs::create_dir_all(&outputs_stage).map_err(|e| Error::Io {
583 source: e,
584 path: Some(outputs_stage.clone().into()),
585 operation: "create_dir_all".into(),
586 })?;
587
588 for rel in &outputs {
589 let rel_for_project = project_prefix
590 .as_ref()
591 .and_then(|prefix| rel.strip_prefix(prefix).ok())
592 .unwrap_or(rel)
593 .to_path_buf();
594 let src = hermetic_root.join(rel);
595 #[allow(clippy::collapsible_if)]
598 if let Ok(meta) = std::fs::metadata(&src) {
599 if meta.is_file() {
600 let dst = outputs_stage.join(&rel_for_project);
601 if let Some(parent) = dst.parent() {
602 std::fs::create_dir_all(parent).map_err(|e| Error::Io {
603 source: e,
604 path: Some(parent.into()),
605 operation: "create_dir_all".into(),
606 })?;
607 }
608 std::fs::copy(&src, &dst).map_err(|e| Error::Io {
609 source: e,
610 path: Some(dst.into()),
611 operation: "copy".into(),
612 })?;
613 let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
614 output_index.push(task_cache::OutputIndexEntry {
615 rel_path: rel_for_project.to_string_lossy().to_string(),
616 size: meta.len(),
617 sha256: sha,
618 });
619 }
620 }
621 }
622
623 let mut warned = false;
625 for entry in walkdir::WalkDir::new(&hermetic_root)
626 .into_iter()
627 .filter_map(|e| e.ok())
628 {
629 let p = entry.path();
630 if p.is_dir() {
631 continue;
632 }
633 let rel = match p.strip_prefix(&hermetic_root) {
634 Ok(r) => r.to_path_buf(),
635 Err(_) => continue,
636 };
637 let rel_str = rel.to_string_lossy().to_string();
638 let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
639 let initial = initial_hashes.get(&rel_str);
640 let changed = match initial {
641 None => true,
642 Some(prev) => prev != &sha,
643 };
644 if changed && !outputs_set.contains(&rel) {
645 if !warned {
646 tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
647 warned = true;
648 }
649 tracing::debug!(path = %rel_str, "Undeclared write");
650 }
651 }
652
653 if success {
655 let meta = task_cache::TaskResultMeta {
656 task_name: name.to_string(),
657 command: task.command.clone(),
658 args: task.args.clone(),
659 env_summary,
660 inputs_summary: inputs_summary.clone(),
661 created_at: Utc::now(),
662 cuenv_version,
663 platform,
664 duration_ms: duration.as_millis(),
665 exit_code,
666 cache_key_envelope: envelope_json.clone(),
667 output_index,
668 };
669 let logs = task_cache::TaskLogs {
670 stdout: Some(stdout.clone()),
673 stderr: Some(stderr.clone()),
674 };
675 let cache_span = tracing::info_span!("cache.save", key = %cache_key);
676 {
677 let _g = cache_span.enter();
678 task_cache::save_result(
679 &cache_key,
680 &meta,
681 &outputs_stage,
682 &hermetic_root,
683 logs,
684 self.config.cache_dir.as_deref(),
685 )?;
686 }
687
688 task_cache::record_latest(
690 &self.config.project_root,
691 name,
692 &cache_key,
693 self.config.cache_dir.as_deref(),
694 )?;
695
696 } else {
698 }
700
701 Ok(TaskResult {
702 name: name.to_string(),
703 exit_code: Some(exit_code),
704 stdout,
705 stderr,
706 success,
707 })
708 }
709 }
710
711 async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
715 if task.is_task_ref() && task.project_root.is_none() {
717 return Err(Error::configuration(format!(
718 "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
719 This usually means:\n\
720 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
721 - The referenced task '{}' doesn't exist in that project\n\
722 - There was an error loading the referenced project's env.cue\n\
723 Run with RUST_LOG=debug for more details.",
724 name,
725 task.task_ref.as_deref().unwrap_or("unknown"),
726 task.task_ref
727 .as_deref()
728 .and_then(|r| r.split(':').next_back())
729 .unwrap_or("unknown")
730 )));
731 }
732
733 let workdir = if let Some(ref dir) = task.directory {
740 self.config
742 .cue_module_root
743 .as_ref()
744 .unwrap_or(&self.config.project_root)
745 .join(dir)
746 } else if let Some(ref project_root) = task.project_root {
747 project_root.clone()
749 } else if let Some(ref source) = task.source {
750 if let Some(dir) = source.directory() {
752 self.config
753 .cue_module_root
754 .as_ref()
755 .unwrap_or(&self.config.project_root)
756 .join(dir)
757 } else {
758 self.config
762 .cue_module_root
763 .clone()
764 .unwrap_or_else(|| self.config.project_root.clone())
765 }
766 } else if !task.hermetic && !task.workspaces.is_empty() {
767 let workspace_name = &task.workspaces[0];
769 let manager = match workspace_name.as_str() {
770 "bun" => PackageManager::Bun,
771 "npm" => PackageManager::Npm,
772 "pnpm" => PackageManager::Pnpm,
773 "yarn" => PackageManager::YarnModern,
774 "cargo" => PackageManager::Cargo,
775 _ => PackageManager::Npm, };
777 find_workspace_root(manager, &self.config.project_root)
778 } else {
779 self.config.project_root.clone()
780 };
781
782 tracing::info!(
783 task = %name,
784 workdir = %workdir.display(),
785 hermetic = false,
786 "Executing non-hermetic task"
787 );
788
789 let cmd_str = if let Some(script) = &task.script {
792 format!("[script: {} bytes]", script.len())
793 } else if task.command.is_empty() {
794 task.args.join(" ")
795 } else {
796 format!("{} {}", task.command, task.args.join(" "))
797 };
798
799 cuenv_events::emit_task_started!(name, cmd_str, false);
800
801 let mut cmd = if let Some(script) = &task.script {
803 let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
805 (
806 shell.command.clone().unwrap_or_else(|| "bash".to_string()),
807 shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
808 )
809 } else {
810 ("bash".to_string(), "-c".to_string())
812 };
813
814 let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
815 let mut cmd = Command::new(&resolved_shell);
816 cmd.arg(&shell_flag);
817 cmd.arg(script);
818 cmd
819 } else {
820 let resolved_command = self.config.environment.resolve_command(&task.command);
822
823 if let Some(shell) = &task.shell {
824 if shell.command.is_some() && shell.flag.is_some() {
825 let shell_command = shell.command.as_ref().expect("checked is_some above");
826 let shell_flag = shell.flag.as_ref().expect("checked is_some above");
827 let resolved_shell = self.config.environment.resolve_command(shell_command);
828 let mut cmd = Command::new(&resolved_shell);
829 cmd.arg(shell_flag);
830 if task.args.is_empty() {
831 cmd.arg(&resolved_command);
832 } else {
833 let full_command = if task.command.is_empty() {
834 task.args.join(" ")
835 } else {
836 format!("{} {}", resolved_command, task.args.join(" "))
837 };
838 cmd.arg(full_command);
839 }
840 cmd
841 } else {
842 let mut cmd = Command::new(&resolved_command);
843 for arg in &task.args {
844 cmd.arg(arg);
845 }
846 cmd
847 }
848 } else {
849 let mut cmd = Command::new(&resolved_command);
850 for arg in &task.args {
851 cmd.arg(arg);
852 }
853 cmd
854 }
855 };
856
857 cmd.current_dir(&workdir);
859 let env_vars = self.config.environment.merge_with_system();
860 for (k, v) in &env_vars {
861 cmd.env(k, v);
862 }
863
864 if self.config.capture_output {
867 use tokio::io::{AsyncBufReadExt, BufReader};
868
869 let start_time = std::time::Instant::now();
870
871 let mut child = cmd
873 .stdout(Stdio::piped())
874 .stderr(Stdio::piped())
875 .spawn()
876 .map_err(|e| Error::Io {
877 source: e,
878 path: None,
879 operation: format!("spawn task {}", name),
880 })?;
881
882 let stdout_handle = child.stdout.take();
884 let stderr_handle = child.stderr.take();
885
886 let mut stdout_lines = Vec::new();
888 let mut stderr_lines = Vec::new();
889
890 let name_for_stdout = name.to_string();
892 let stdout_task = tokio::spawn(async move {
893 let mut lines = Vec::new();
894 if let Some(stdout) = stdout_handle {
895 let mut reader = BufReader::new(stdout).lines();
896 while let Ok(Some(line)) = reader.next_line().await {
897 cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
898 lines.push(line);
899 }
900 }
901 lines
902 });
903
904 let name_for_stderr = name.to_string();
906 let stderr_task = tokio::spawn(async move {
907 let mut lines = Vec::new();
908 if let Some(stderr) = stderr_handle {
909 let mut reader = BufReader::new(stderr).lines();
910 while let Ok(Some(line)) = reader.next_line().await {
911 cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
912 lines.push(line);
913 }
914 }
915 lines
916 });
917
918 let status = child.wait().await.map_err(|e| Error::Io {
920 source: e,
921 path: None,
922 operation: format!("wait for task {}", name),
923 })?;
924
925 if let Ok(lines) = stdout_task.await {
927 stdout_lines = lines;
928 }
929 if let Ok(lines) = stderr_task.await {
930 stderr_lines = lines;
931 }
932
933 let duration_ms = start_time.elapsed().as_millis() as u64;
934 let stdout = stdout_lines.join("\n");
935 let stderr = stderr_lines.join("\n");
936 let exit_code = status.code().unwrap_or(-1);
937 let success = status.success();
938
939 cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
941
942 if !success {
943 tracing::warn!(task = %name, exit = exit_code, "Task failed");
944 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
945 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
946 }
947
948 Ok(TaskResult {
949 name: name.to_string(),
950 exit_code: Some(exit_code),
951 stdout,
952 stderr,
953 success,
954 })
955 } else {
956 let status = cmd
958 .stdout(Stdio::inherit())
959 .stderr(Stdio::inherit())
960 .status()
961 .await
962 .map_err(|e| Error::Io {
963 source: e,
964 path: None,
965 operation: format!("spawn task {}", name),
966 })?;
967
968 let exit_code = status.code().unwrap_or(-1);
969 let success = status.success();
970
971 if !success {
972 tracing::warn!(task = %name, exit = exit_code, "Task failed");
973 }
974
975 Ok(TaskResult {
976 name: name.to_string(),
977 exit_code: Some(exit_code),
978 stdout: String::new(), stderr: String::new(),
980 success,
981 })
982 }
983 }
984
985 #[async_recursion]
987 pub async fn execute_definition(
988 &self,
989 name: &str,
990 definition: &TaskDefinition,
991 all_tasks: &Tasks,
992 ) -> Result<Vec<TaskResult>> {
993 match definition {
994 TaskDefinition::Single(task) => {
995 let result = self.execute_task(name, task.as_ref()).await?;
996 Ok(vec![result])
997 }
998 TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
999 }
1000 }
1001
1002 async fn execute_group(
1003 &self,
1004 prefix: &str,
1005 group: &TaskGroup,
1006 all_tasks: &Tasks,
1007 ) -> Result<Vec<TaskResult>> {
1008 match group {
1009 TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
1010 TaskGroup::Parallel(group) => self.execute_parallel(prefix, group, all_tasks).await,
1011 }
1012 }
1013
1014 async fn execute_sequential(
1015 &self,
1016 prefix: &str,
1017 tasks: &[TaskDefinition],
1018 all_tasks: &Tasks,
1019 ) -> Result<Vec<TaskResult>> {
1020 if !self.config.capture_output {
1021 cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
1022 }
1023 let mut results = Vec::new();
1024 for (i, task_def) in tasks.iter().enumerate() {
1025 let task_name = format!("{}[{}]", prefix, i);
1026 let task_results = self
1027 .execute_definition(&task_name, task_def, all_tasks)
1028 .await?;
1029 for result in &task_results {
1030 if !result.success {
1031 let message = format!(
1032 "Sequential task group '{prefix}' halted.\n\n{}",
1033 summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
1034 );
1035 return Err(Error::configuration(message));
1036 }
1037 }
1038 results.extend(task_results);
1039 }
1040 Ok(results)
1041 }
1042
1043 async fn execute_parallel(
1044 &self,
1045 prefix: &str,
1046 group: &ParallelGroup,
1047 all_tasks: &Tasks,
1048 ) -> Result<Vec<TaskResult>> {
1049 if let Some(default_task) = group.tasks.get("default") {
1051 if !self.config.capture_output {
1052 cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
1053 }
1054 let task_name = format!("{}.default", prefix);
1057 return self
1058 .execute_definition(&task_name, default_task, all_tasks)
1059 .await;
1060 }
1061
1062 if !self.config.capture_output {
1063 cuenv_events::emit_task_group_started!(prefix, false, group.tasks.len());
1064 }
1065 let mut join_set = JoinSet::new();
1066 let all_tasks = Arc::new(all_tasks.clone());
1067 let mut all_results = Vec::new();
1068 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
1069 if let Some(failed) = results.iter().find(|r| !r.success) {
1070 let message = format!(
1071 "Parallel task group '{prefix}' halted.\n\n{}",
1072 summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
1073 );
1074 return Err(Error::configuration(message));
1075 }
1076 all_results.extend(results);
1077 Ok(())
1078 };
1079 for (name, task_def) in &group.tasks {
1080 let task_name = format!("{}.{}", prefix, name);
1081 let task_def = task_def.clone();
1082 let all_tasks = Arc::clone(&all_tasks);
1083 let executor = self.clone_with_config();
1084 join_set.spawn(async move {
1085 executor
1086 .execute_definition(&task_name, &task_def, &all_tasks)
1087 .await
1088 });
1089 if self.config.max_parallel > 0
1090 && join_set.len() >= self.config.max_parallel
1091 && let Some(result) = join_set.join_next().await
1092 {
1093 match result {
1094 Ok(Ok(results)) => merge_results(results)?,
1095 Ok(Err(e)) => return Err(e),
1096 Err(e) => {
1097 return Err(Error::configuration(format!(
1098 "Task execution panicked: {}",
1099 e
1100 )));
1101 }
1102 }
1103 }
1104 }
1105 while let Some(result) = join_set.join_next().await {
1106 match result {
1107 Ok(Ok(results)) => merge_results(results)?,
1108 Ok(Err(e)) => return Err(e),
1109 Err(e) => {
1110 return Err(Error::configuration(format!(
1111 "Task execution panicked: {}",
1112 e
1113 )));
1114 }
1115 }
1116 }
1117 Ok(all_results)
1118 }
1119
1120 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
1121 let parallel_groups = graph.get_parallel_groups()?;
1122 let mut all_results = Vec::new();
1123
1124 for mut group in parallel_groups {
1132 let mut join_set = JoinSet::new();
1133
1134 while !group.is_empty() || !join_set.is_empty() {
1135 while let Some(node) = group.pop() {
1137 let task = node.task.clone();
1138 let name = node.name.clone();
1139 let executor = self.clone_with_config();
1140 join_set.spawn(async move { executor.execute_task(&name, &task).await });
1141
1142 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
1143 break;
1144 }
1145 }
1146
1147 if let Some(result) = join_set.join_next().await {
1148 match result {
1149 Ok(Ok(task_result)) => {
1150 if !task_result.success {
1151 join_set.abort_all();
1152 let message = format!(
1153 "Task graph execution halted.\n\n{}",
1154 summarize_task_failure(
1155 &task_result,
1156 TASK_FAILURE_SNIPPET_LINES,
1157 )
1158 );
1159 return Err(Error::configuration(message));
1160 }
1161 all_results.push(task_result);
1162 }
1163 Ok(Err(e)) => {
1164 join_set.abort_all();
1165 return Err(e);
1166 }
1167 Err(e) => {
1168 join_set.abort_all();
1169 return Err(Error::configuration(format!(
1170 "Task execution panicked: {}",
1171 e
1172 )));
1173 }
1174 }
1175 }
1176 }
1177 }
1178
1179 Ok(all_results)
1180 }
1181
1182 async fn resolve_workspace(
1183 &self,
1184 _task_name: &str,
1185 task: &Task,
1186 workspace_name: &str,
1187 config: &WorkspaceConfig,
1188 ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
1189 let root = self.config.project_root.clone();
1190 let task_label = _task_name.to_string();
1191 let command = task.command.clone();
1192 let config_pm = config.package_manager.clone();
1193 let config_root = config.root.clone();
1194 let mut packages = Vec::new();
1202
1203 let lockfile_override: Option<String> = None; let mut traverse_workspace_deps = true;
1210
1211 let workspace_name_owned = workspace_name.to_string();
1213 tokio::task::spawn_blocking(move || {
1214 let override_for_detection = lockfile_override.as_ref().map(|lock| {
1215 let candidate = PathBuf::from(lock);
1216 if candidate.is_absolute() {
1217 candidate
1218 } else {
1219 root.join(lock)
1220 }
1221 });
1222
1223 let manager = if let Some(pm_str) = config_pm {
1226 match pm_str.as_str() {
1227 "npm" => PackageManager::Npm,
1228 "bun" => PackageManager::Bun,
1229 "pnpm" => PackageManager::Pnpm,
1230 "yarn" => PackageManager::YarnModern,
1231 "yarn-classic" => PackageManager::YarnClassic,
1232 "cargo" => PackageManager::Cargo,
1233 _ => {
1234 return Err(Error::configuration(format!(
1235 "Unknown package manager: {}",
1236 pm_str
1237 )));
1238 }
1239 }
1240 } else {
1241 match workspace_name_owned.as_str() {
1243 "npm" => PackageManager::Npm,
1244 "bun" => PackageManager::Bun,
1245 "pnpm" => PackageManager::Pnpm,
1246 "yarn" => PackageManager::YarnModern,
1247 "cargo" => PackageManager::Cargo,
1248 _ => {
1249 let hint = detect_from_command(&command);
1252 let detected = match detect_package_managers(&root) {
1253 Ok(list) => list,
1254 Err(e) => {
1255 if override_for_detection.is_some() {
1256 Vec::new()
1257 } else {
1258 return Err(Error::configuration(format!(
1259 "Failed to detect package managers: {}",
1260 e
1261 )));
1262 }
1263 }
1264 };
1265
1266 if let Some(h) = hint {
1267 if detected.contains(&h) {
1268 h
1269 } else if !detected.is_empty() {
1270 detected[0]
1272 } else if let Some(ref override_path) = override_for_detection {
1273 infer_manager_from_lockfile(override_path).ok_or_else(|| {
1274 Error::configuration(
1275 "Unable to infer package manager from lockfile override",
1276 )
1277 })?
1278 } else {
1279 return Err(Error::configuration(
1280 format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
1281 ));
1282 }
1283 } else if !detected.is_empty() {
1284 detected[0]
1285 } else if let Some(ref override_path) = override_for_detection {
1286 infer_manager_from_lockfile(override_path).ok_or_else(|| {
1287 Error::configuration(
1288 "Unable to infer package manager from lockfile override",
1289 )
1290 })?
1291 } else {
1292 return Err(Error::configuration(
1293 "Could not detect package manager for workspace resolution",
1294 ));
1295 }
1296 }
1297 }
1298 };
1299
1300 let workspace_root = if let Some(root_override) = &config_root {
1304 root.join(root_override)
1305 } else {
1306 find_workspace_root(manager, &root)
1307 };
1308
1309 if task_trace_enabled() {
1310 tracing::info!(
1311 task = %task_label,
1312 manager = %manager,
1313 project_root = %root.display(),
1314 workspace_root = %workspace_root.display(),
1315 "Resolved workspace root for package manager"
1316 );
1317 }
1318
1319 let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
1320 let candidate = PathBuf::from(lock);
1321 if candidate.is_absolute() {
1322 candidate
1323 } else {
1324 workspace_root.join(lock)
1325 }
1326 });
1327
1328 let discovery: Box<dyn WorkspaceDiscovery> = match manager {
1330 PackageManager::Npm
1331 | PackageManager::Bun
1332 | PackageManager::YarnClassic
1333 | PackageManager::YarnModern
1334 | PackageManager::Deno => Box::new(PackageJsonDiscovery),
1335 PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
1336 PackageManager::Cargo => Box::new(CargoTomlDiscovery),
1337 };
1338
1339 let workspace = discovery.discover(&workspace_root).map_err(|e| {
1340 Error::configuration(format!("Failed to discover workspace: {}", e))
1341 })?;
1342
1343 let lockfile_path = if let Some(path) = lockfile_override_path {
1345 if !path.exists() {
1346 return Err(Error::configuration(format!(
1347 "Workspace lockfile override does not exist: {}",
1348 path.display()
1349 )));
1350 }
1351 path
1352 } else {
1353 workspace.lockfile.clone().ok_or_else(|| {
1354 Error::configuration("Workspace resolution requires a lockfile")
1355 })?
1356 };
1357
1358 let parser: Box<dyn LockfileParser> = match manager {
1359 PackageManager::Npm => Box::new(NpmLockfileParser),
1360 PackageManager::Bun => Box::new(BunLockfileParser),
1361 PackageManager::Pnpm => Box::new(PnpmLockfileParser),
1362 PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
1363 PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
1364 PackageManager::Cargo => Box::new(CargoLockfileParser),
1365 PackageManager::Deno => Box::new(NpmLockfileParser), };
1367
1368 let entries = parser
1369 .parse(&lockfile_path)
1370 .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
1371 if task_trace_enabled() {
1372 tracing::info!(
1373 task = %task_label,
1374 lockfile = %lockfile_path.display(),
1375 members = entries.len(),
1376 "Parsed workspace lockfile"
1377 );
1378 }
1379
1380 let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
1382
1383 if packages.is_empty() {
1387 let current_member = workspace
1388 .members
1389 .iter()
1390 .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
1391 if let Some(member) = current_member {
1392 let inferred = vec![member.name.clone()];
1393 if task_trace_enabled() {
1394 tracing::info!(
1395 task = %task_label,
1396 inferred_packages = ?inferred,
1397 "Inferred workspace packages from current project"
1398 );
1399 }
1400 packages = inferred;
1401 traverse_workspace_deps = true;
1402 }
1403 }
1404
1405 let mut member_paths = Vec::new();
1407
1408 member_paths.push(manager.workspace_config_name().to_string());
1410 if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
1411 member_paths.push(rel.to_string_lossy().to_string());
1412 } else {
1413 member_paths.push(lockfile_path.to_string_lossy().to_string());
1414 }
1415
1416 if packages.is_empty() {
1417 for member in &workspace.members {
1418 let manifest_rel = member
1419 .path
1420 .join(manager.workspace_config_name());
1421 member_paths.push(manifest_rel.to_string_lossy().to_string());
1422 }
1423 } else {
1424 let mut to_visit: Vec<String> = packages.clone();
1425 let mut visited = HashSet::new();
1426
1427 while let Some(pkg_name) = to_visit.pop() {
1428 if visited.contains(&pkg_name) {
1429 continue;
1430 }
1431 visited.insert(pkg_name.clone());
1432
1433 if let Some(member) = workspace.find_member(&pkg_name) {
1434 let manifest_rel = member
1435 .path
1436 .join(manager.workspace_config_name());
1437 member_paths.push(manifest_rel.to_string_lossy().to_string());
1438
1439 if traverse_workspace_deps {
1441 let mut dependency_candidates: HashSet<String> = HashSet::new();
1442
1443 if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1444 for dep in &entry.dependencies {
1445 if entries
1446 .iter()
1447 .any(|e| e.name == dep.name && e.is_workspace_member)
1448 {
1449 dependency_candidates.insert(dep.name.clone());
1450 }
1451 }
1452 }
1453
1454 for dep_name in &member.dependencies {
1455 if workspace.find_member(dep_name).is_some() {
1456 dependency_candidates.insert(dep_name.clone());
1457 }
1458 }
1459
1460 for dep_name in dependency_candidates {
1461 to_visit.push(dep_name);
1462 }
1463 }
1464 }
1465 }
1466 }
1467
1468 if task_trace_enabled() {
1469 tracing::info!(
1470 task = %task_label,
1471 members = ?member_paths,
1472 "Workspace input member paths selected"
1473 );
1474 }
1475
1476 Ok((workspace, entries, member_paths, Some(hash)))
1477 })
1478 .await
1479 .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1480 }
1481
1482 async fn materialize_workspace(
1483 &self,
1484 workspace: &Workspace,
1485 entries: &[LockfileEntry],
1486 target_dir: &Path,
1487 ) -> Result<()> {
1488 let materializer: Box<dyn Materializer> = match workspace.manager {
1490 PackageManager::Npm
1491 | PackageManager::Bun
1492 | PackageManager::Pnpm
1493 | PackageManager::YarnClassic
1494 | PackageManager::YarnModern
1495 | PackageManager::Deno => Box::new(NodeModulesMaterializer),
1496 PackageManager::Cargo => Box::new(CargoMaterializer),
1497 };
1498
1499 materializer
1500 .materialize(workspace, entries, target_dir)
1501 .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1502 }
1503
1504 fn clone_with_config(&self) -> Self {
1505 Self::with_shared_backend(self.config.clone(), self.backend.clone())
1507 }
1508}
1509
1510fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1511 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1512
1513 loop {
1514 let is_root = match manager {
1515 PackageManager::Npm
1516 | PackageManager::Bun
1517 | PackageManager::YarnClassic
1518 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
1519 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1520 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
1521 PackageManager::Deno => deno_json_has_workspace(¤t),
1522 };
1523
1524 if is_root {
1525 return current;
1526 }
1527
1528 if let Some(parent) = current.parent() {
1529 current = parent.to_path_buf();
1530 } else {
1531 return start.to_path_buf();
1532 }
1533 }
1534}
1535
1536fn package_json_has_workspaces(dir: &Path) -> bool {
1537 let path = dir.join("package.json");
1538 let content = std::fs::read_to_string(&path);
1539 let Ok(json) = content.and_then(|s| {
1540 serde_json::from_str::<serde_json::Value>(&s)
1541 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1542 }) else {
1543 return false;
1544 };
1545
1546 match json.get("workspaces") {
1547 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1548 Some(serde_json::Value::Object(map)) => map
1549 .get("packages")
1550 .and_then(|packages| packages.as_array())
1551 .map(|arr| !arr.is_empty())
1552 .unwrap_or(false),
1553 _ => false,
1554 }
1555}
1556
1557fn cargo_toml_has_workspace(dir: &Path) -> bool {
1558 let path = dir.join("Cargo.toml");
1559 let Ok(content) = std::fs::read_to_string(&path) else {
1560 return false;
1561 };
1562
1563 content.contains("[workspace]")
1564}
1565
1566fn deno_json_has_workspace(dir: &Path) -> bool {
1567 let path = dir.join("deno.json");
1568 let content = std::fs::read_to_string(&path);
1569 let Ok(json) = content.and_then(|s| {
1570 serde_json::from_str::<serde_json::Value>(&s)
1571 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1572 }) else {
1573 return false;
1574 };
1575
1576 match json.get("workspace") {
1578 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1579 Some(serde_json::Value::Object(_)) => true,
1580 _ => false,
1581 }
1582}
1583
1584fn task_trace_enabled() -> bool {
1585 static ENABLED: OnceLock<bool> = OnceLock::new();
1586 *ENABLED.get_or_init(|| {
1587 matches!(
1588 std::env::var("CUENV_TRACE_TASKS")
1589 .unwrap_or_default()
1590 .trim()
1591 .to_ascii_lowercase()
1592 .as_str(),
1593 "1" | "true" | "yes" | "on"
1594 )
1595 })
1596}
1597
1598pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1601 let exit_code = result
1602 .exit_code
1603 .map(|c| c.to_string())
1604 .unwrap_or_else(|| "unknown".to_string());
1605
1606 let mut sections = Vec::new();
1607 sections.push(format!(
1608 "Task '{}' failed with exit code {}.",
1609 result.name, exit_code
1610 ));
1611
1612 let output = format_failure_streams(result, max_output_lines);
1613 if output.is_empty() {
1614 sections.push(
1615 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1616 .to_string(),
1617 );
1618 } else {
1619 sections.push(output);
1620 }
1621
1622 sections.join("\n\n")
1623}
1624
1625fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1626 let mut streams = Vec::new();
1627
1628 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1629 streams.push(stdout);
1630 }
1631
1632 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1633 streams.push(stderr);
1634 }
1635
1636 streams.join("\n\n")
1637}
1638
1639fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1640 let normalized = content.trim_end();
1641 if normalized.is_empty() {
1642 return None;
1643 }
1644
1645 let lines: Vec<&str> = normalized.lines().collect();
1646 let total = lines.len();
1647 let start = total.saturating_sub(max_output_lines);
1648 let snippet = lines[start..].join("\n");
1649
1650 let header = if total > max_output_lines {
1651 format!("{label} (last {max_output_lines} of {total} lines):")
1652 } else {
1653 format!("{label}:")
1654 };
1655
1656 Some(format!("{header}\n{snippet}"))
1657}
1658
1659fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1660 match path.file_name().and_then(|n| n.to_str())? {
1661 "package-lock.json" => Some(PackageManager::Npm),
1662 "bun.lock" => Some(PackageManager::Bun),
1663 "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1664 "yarn.lock" => Some(PackageManager::YarnModern),
1665 "Cargo.lock" => Some(PackageManager::Cargo),
1666 _ => None,
1667 }
1668}
1669
1670fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1671 let sanitized_task = task_name
1674 .chars()
1675 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1676 .collect::<String>();
1677
1678 let base = std::env::temp_dir().join(format!(
1679 "cuenv-work-{}-{}",
1680 sanitized_task,
1681 &key[..12.min(key.len())]
1682 ));
1683
1684 if base.exists()
1687 && let Err(e) = std::fs::remove_dir_all(&base)
1688 {
1689 let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1692 let fallback = std::env::temp_dir().join(format!(
1693 "cuenv-work-{}-{}-{}",
1694 sanitized_task,
1695 &key[..12.min(key.len())],
1696 ts
1697 ));
1698 tracing::warn!(
1699 previous = %base.display(),
1700 fallback = %fallback.display(),
1701 error = %e,
1702 "Failed to clean previous hermetic workdir; using fresh fallback directory"
1703 );
1704 std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1705 source: e,
1706 path: Some(fallback.clone().into()),
1707 operation: "create_dir_all".into(),
1708 })?;
1709 return Ok(fallback);
1710 }
1711
1712 std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1713 source: e,
1714 path: Some(base.clone().into()),
1715 operation: "create_dir_all".into(),
1716 })?;
1717 Ok(base)
1718}
1719
1720pub async fn execute_command(
1722 command: &str,
1723 args: &[String],
1724 environment: &Environment,
1725) -> Result<i32> {
1726 tracing::info!("Executing command: {} {:?}", command, args);
1727 let mut cmd = Command::new(command);
1728 cmd.args(args);
1729 let env_vars = environment.merge_with_system();
1730 for (key, value) in env_vars {
1731 cmd.env(key, value);
1732 }
1733 cmd.stdout(Stdio::inherit());
1734 cmd.stderr(Stdio::inherit());
1735 cmd.stdin(Stdio::inherit());
1736 let status = cmd.status().await.map_err(|e| {
1737 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1738 })?;
1739 Ok(status.code().unwrap_or(1))
1740}
1741
1742#[cfg(test)]
1743mod tests {
1744 use super::*;
1745 use crate::tasks::Input;
1746 use std::fs;
1747 use tempfile::TempDir;
1748
1749 #[tokio::test]
1750 async fn test_executor_config_default() {
1751 let config = ExecutorConfig::default();
1752 assert!(!config.capture_output);
1753 assert_eq!(config.max_parallel, 0);
1754 assert!(config.environment.is_empty());
1755 }
1756
1757 #[tokio::test]
1758 async fn test_task_result() {
1759 let result = TaskResult {
1760 name: "test".to_string(),
1761 exit_code: Some(0),
1762 stdout: "output".to_string(),
1763 stderr: String::new(),
1764 success: true,
1765 };
1766 assert_eq!(result.name, "test");
1767 assert_eq!(result.exit_code, Some(0));
1768 assert!(result.success);
1769 assert_eq!(result.stdout, "output");
1770 }
1771
1772 #[tokio::test]
1773 async fn test_execute_simple_task() {
1774 let config = ExecutorConfig {
1775 capture_output: true,
1776 ..Default::default()
1777 };
1778 let executor = TaskExecutor::new(config);
1779 let task = Task {
1780 command: "echo".to_string(),
1781 args: vec!["hello".to_string()],
1782 description: Some("Hello task".to_string()),
1783 ..Default::default()
1784 };
1785 let result = executor.execute_task("test", &task).await.unwrap();
1786 assert!(result.success);
1787 assert_eq!(result.exit_code, Some(0));
1788 assert!(result.stdout.contains("hello"));
1789 }
1790
1791 #[tokio::test]
1792 async fn test_execute_with_environment() {
1793 let mut config = ExecutorConfig {
1794 capture_output: true,
1795 ..Default::default()
1796 };
1797 config
1798 .environment
1799 .set("TEST_VAR".to_string(), "test_value".to_string());
1800 let executor = TaskExecutor::new(config);
1801 let task = Task {
1802 command: "printenv".to_string(),
1803 args: vec!["TEST_VAR".to_string()],
1804 description: Some("Print env task".to_string()),
1805 ..Default::default()
1806 };
1807 let result = executor.execute_task("test", &task).await.unwrap();
1808 assert!(result.success);
1809 assert!(result.stdout.contains("test_value"));
1810 }
1811
1812 #[tokio::test]
1813 async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1814 let tmp = TempDir::new().unwrap();
1815 let root = tmp.path();
1816
1817 fs::write(
1819 root.join("package.json"),
1820 r#"{
1821 "name": "root-app",
1822 "version": "0.0.0",
1823 "workspaces": ["packages/*", "apps/*"],
1824 "dependencies": {
1825 "@rawkodeacademy/content-technologies": "workspace:*"
1826 }
1827}"#,
1828 )
1829 .unwrap();
1830 fs::write(
1833 root.join("bun.lock"),
1834 r#"{
1835 "lockfileVersion": 1,
1836 "workspaces": {
1837 "": {
1838 "name": "root-app",
1839 "dependencies": {
1840 "@rawkodeacademy/content-technologies": "workspace:*"
1841 }
1842 },
1843 "packages/content-technologies": {
1844 "name": "@rawkodeacademy/content-technologies",
1845 "version": "0.0.1"
1846 },
1847 "apps/site": {
1848 "version": "0.0.0",
1849 "dependencies": {
1850 "@rawkodeacademy/content-technologies": "workspace:*"
1851 }
1852 }
1853 },
1854 "packages": {}
1855}"#,
1856 )
1857 .unwrap();
1858
1859 fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1861 fs::write(
1862 root.join("packages/content-technologies/package.json"),
1863 r#"{
1864 "name": "@rawkodeacademy/content-technologies",
1865 "version": "0.0.1"
1866}"#,
1867 )
1868 .unwrap();
1869
1870 fs::create_dir_all(root.join("apps/site")).unwrap();
1871 fs::write(
1872 root.join("apps/site/package.json"),
1873 r#"{
1874 "name": "site",
1875 "version": "0.0.0",
1876 "dependencies": {
1877 "@rawkodeacademy/content-technologies": "workspace:*"
1878 }
1879}"#,
1880 )
1881 .unwrap();
1882
1883 let mut workspaces = HashMap::new();
1884 workspaces.insert(
1885 "bun".to_string(),
1886 WorkspaceConfig {
1887 enabled: true,
1888 package_manager: Some("bun".to_string()),
1889 root: None,
1890 hooks: None,
1891 },
1892 );
1893
1894 let config = ExecutorConfig {
1895 capture_output: true,
1896 project_root: root.join("apps/site"),
1897 workspaces: Some(workspaces),
1898 ..Default::default()
1899 };
1900 let executor = TaskExecutor::new(config);
1901
1902 let task = Task {
1903 command: "sh".to_string(),
1904 args: vec![
1905 "-c".to_string(),
1906 "find ../.. -maxdepth 4 -type d | sort".to_string(),
1907 ],
1908 inputs: vec![Input::Path("package.json".to_string())],
1909 workspaces: vec!["bun".to_string()],
1910 ..Default::default()
1911 };
1912
1913 let result = executor.execute_task("install", &task).await.unwrap();
1914 assert!(
1915 result.success,
1916 "command failed stdout='{}' stderr='{}'",
1917 result.stdout, result.stderr
1918 );
1919 assert!(
1920 result
1921 .stdout
1922 .split_whitespace()
1923 .any(|line| line.ends_with("packages/content-technologies")),
1924 "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1925 result.stdout,
1926 result.stderr
1927 );
1928 }
1929
1930 #[tokio::test]
1931 async fn test_execute_failing_task() {
1932 let config = ExecutorConfig {
1933 capture_output: true,
1934 ..Default::default()
1935 };
1936 let executor = TaskExecutor::new(config);
1937 let task = Task {
1938 command: "false".to_string(),
1939 description: Some("Failing task".to_string()),
1940 ..Default::default()
1941 };
1942 let result = executor.execute_task("test", &task).await.unwrap();
1943 assert!(!result.success);
1944 assert_eq!(result.exit_code, Some(1));
1945 }
1946
1947 #[tokio::test]
1948 async fn test_execute_sequential_group() {
1949 let config = ExecutorConfig {
1950 capture_output: true,
1951 ..Default::default()
1952 };
1953 let executor = TaskExecutor::new(config);
1954 let task1 = Task {
1955 command: "echo".to_string(),
1956 args: vec!["first".to_string()],
1957 description: Some("First task".to_string()),
1958 ..Default::default()
1959 };
1960 let task2 = Task {
1961 command: "echo".to_string(),
1962 args: vec!["second".to_string()],
1963 description: Some("Second task".to_string()),
1964 ..Default::default()
1965 };
1966 let group = TaskGroup::Sequential(vec![
1967 TaskDefinition::Single(Box::new(task1)),
1968 TaskDefinition::Single(Box::new(task2)),
1969 ]);
1970 let all_tasks = Tasks::new();
1971 let results = executor
1972 .execute_group("seq", &group, &all_tasks)
1973 .await
1974 .unwrap();
1975 assert_eq!(results.len(), 2);
1976 assert!(results[0].stdout.contains("first"));
1977 assert!(results[1].stdout.contains("second"));
1978 }
1979
1980 #[tokio::test]
1981 async fn test_command_injection_prevention() {
1982 let config = ExecutorConfig {
1983 capture_output: true,
1984 ..Default::default()
1985 };
1986 let executor = TaskExecutor::new(config);
1987 let malicious_task = Task {
1988 command: "echo".to_string(),
1989 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1990 description: Some("Malicious task test".to_string()),
1991 ..Default::default()
1992 };
1993 let result = executor
1994 .execute_task("malicious", &malicious_task)
1995 .await
1996 .unwrap();
1997 assert!(result.success);
1998 assert!(result.stdout.contains("hello ; rm -rf /"));
1999 }
2000
2001 #[tokio::test]
2002 async fn test_special_characters_in_args() {
2003 let config = ExecutorConfig {
2004 capture_output: true,
2005 ..Default::default()
2006 };
2007 let executor = TaskExecutor::new(config);
2008 let special_chars = vec![
2009 "$USER",
2010 "$(whoami)",
2011 "`whoami`",
2012 "&& echo hacked",
2013 "|| echo failed",
2014 "> /tmp/hack",
2015 "| cat",
2016 ];
2017 for special_arg in special_chars {
2018 let task = Task {
2019 command: "echo".to_string(),
2020 args: vec!["safe".to_string(), special_arg.to_string()],
2021 description: Some("Special character test".to_string()),
2022 ..Default::default()
2023 };
2024 let result = executor.execute_task("special", &task).await.unwrap();
2025 assert!(result.success);
2026 assert!(result.stdout.contains("safe"));
2027 assert!(result.stdout.contains(special_arg));
2028 }
2029 }
2030
2031 #[tokio::test]
2032 async fn test_environment_variable_safety() {
2033 let mut config = ExecutorConfig {
2034 capture_output: true,
2035 ..Default::default()
2036 };
2037 config
2038 .environment
2039 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
2040 let executor = TaskExecutor::new(config);
2041 let task = Task {
2042 command: "printenv".to_string(),
2043 args: vec!["DANGEROUS_VAR".to_string()],
2044 description: Some("Environment variable safety test".to_string()),
2045 ..Default::default()
2046 };
2047 let result = executor.execute_task("env_test", &task).await.unwrap();
2048 assert!(result.success);
2049 assert!(result.stdout.contains("; rm -rf /"));
2050 }
2051
2052 #[tokio::test]
2053 async fn test_execute_graph_parallel_groups() {
2054 let config = ExecutorConfig {
2056 capture_output: true,
2057 max_parallel: 2,
2058 ..Default::default()
2059 };
2060 let executor = TaskExecutor::new(config);
2061 let mut graph = TaskGraph::new();
2062
2063 let t1 = Task {
2064 command: "echo".into(),
2065 args: vec!["A".into()],
2066 ..Default::default()
2067 };
2068 let t2 = Task {
2069 command: "echo".into(),
2070 args: vec!["B".into()],
2071 ..Default::default()
2072 };
2073
2074 graph.add_task("t1", t1).unwrap();
2075 graph.add_task("t2", t2).unwrap();
2076 let results = executor.execute_graph(&graph).await.unwrap();
2077 assert_eq!(results.len(), 2);
2078 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
2079 assert!(joined.contains("A") && joined.contains("B"));
2080 }
2081
2082 #[tokio::test]
2083 async fn test_execute_graph_respects_dependency_levels() {
2084 let tmp = TempDir::new().unwrap();
2085 let root = tmp.path();
2086
2087 let config = ExecutorConfig {
2088 capture_output: true,
2089 max_parallel: 2,
2090 project_root: root.to_path_buf(),
2091 ..Default::default()
2092 };
2093 let executor = TaskExecutor::new(config);
2094
2095 let mut tasks = Tasks::new();
2096 tasks.tasks.insert(
2097 "dep".into(),
2098 TaskDefinition::Single(Box::new(Task {
2099 command: "sh".into(),
2100 args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
2101 ..Default::default()
2102 })),
2103 );
2104 tasks.tasks.insert(
2105 "consumer".into(),
2106 TaskDefinition::Single(Box::new(Task {
2107 command: "sh".into(),
2108 args: vec!["-c".into(), "cat marker.txt".into()],
2109 depends_on: vec!["dep".into()],
2110 ..Default::default()
2111 })),
2112 );
2113
2114 let mut graph = TaskGraph::new();
2115 graph.build_for_task("consumer", &tasks).unwrap();
2116
2117 let results = executor.execute_graph(&graph).await.unwrap();
2118 assert_eq!(results.len(), 2);
2119
2120 let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
2121 assert!(consumer.success);
2122 assert!(consumer.stdout.contains("ok"));
2123 }
2124}