1use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
9use super::{Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
10use crate::cache::tasks as task_cache;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::manifest::WorkspaceConfig;
14use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
15use crate::{Error, Result};
16use async_recursion::async_recursion;
17use chrono::Utc;
18use cuenv_workspaces::{
19 BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
20 NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
21 PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
22 YarnModernLockfileParser, detect_from_command, detect_package_managers,
23 materializer::{
24 Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
25 },
26};
27use std::collections::{BTreeMap, HashMap, HashSet};
28use std::path::{Path, PathBuf};
29use std::process::Stdio;
30use std::sync::{Arc, OnceLock};
31use tokio::process::Command;
32use tokio::task::JoinSet;
33
34#[derive(Debug, Clone)]
36pub struct TaskResult {
37 pub name: String,
38 pub exit_code: Option<i32>,
39 pub stdout: String,
40 pub stderr: String,
41 pub success: bool,
42}
43
44pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
46
47#[derive(Debug, Clone)]
49pub struct ExecutorConfig {
50 pub capture_output: bool,
52 pub max_parallel: usize,
54 pub environment: Environment,
56 pub working_dir: Option<PathBuf>,
58 pub project_root: PathBuf,
60 pub materialize_outputs: Option<PathBuf>,
62 pub cache_dir: Option<PathBuf>,
64 pub show_cache_path: bool,
66 pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
68 pub backend_config: Option<BackendConfig>,
70 pub cli_backend: Option<String>,
72}
73
74impl Default for ExecutorConfig {
75 fn default() -> Self {
76 Self {
77 capture_output: false,
78 max_parallel: 0,
79 environment: Environment::new(),
80 working_dir: None,
81 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
82 materialize_outputs: None,
83 cache_dir: None,
84 show_cache_path: false,
85 workspaces: None,
86 backend_config: None,
87 cli_backend: None,
88 }
89 }
90}
91
92pub struct TaskExecutor {
94 config: ExecutorConfig,
95 backend: Arc<dyn TaskBackend>,
96}
97impl TaskExecutor {
98 pub fn new(config: ExecutorConfig) -> Self {
100 Self::with_dagger_factory(config, None)
101 }
102
103 pub fn with_dagger_factory(
107 config: ExecutorConfig,
108 dagger_factory: Option<BackendFactory>,
109 ) -> Self {
110 let backend = create_backend_with_factory(
111 config.backend_config.as_ref(),
112 config.project_root.clone(),
113 config.cli_backend.as_deref(),
114 dagger_factory,
115 );
116 Self { config, backend }
117 }
118
119 fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
121 Self { config, backend }
122 }
123
124 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
126 if self.backend.name() == "dagger" {
135 return self
136 .backend
137 .execute(
138 name,
139 task,
140 &self.config.environment,
141 &self.config.project_root,
142 self.config.capture_output,
143 )
144 .await;
145 }
146
147 return self.execute_task_non_hermetic(name, task).await;
149
150 #[allow(unreachable_code)]
152 {
153 let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
155 let mut workspace_input_patterns = Vec::new();
156 let mut workspace_lockfile_hashes = BTreeMap::new();
157
158 for workspace_name in &task.workspaces {
159 if let Some(global_workspaces) = &self.config.workspaces {
160 if let Some(ws_config) = global_workspaces.get(workspace_name) {
161 if ws_config.enabled {
162 let (ws, entries, paths, hash) = self
163 .resolve_workspace(name, task, workspace_name, ws_config)
164 .await?;
165 workspace_ctxs.push((ws, entries));
166 workspace_input_patterns.extend(paths);
167 if let Some(h) = hash {
168 workspace_lockfile_hashes.insert(workspace_name.clone(), h);
169 }
170 }
171 } else {
172 tracing::warn!(
173 task = %name,
174 workspace = %workspace_name,
175 "Workspace not found in global configuration"
176 );
177 }
178 }
179 }
180
181 let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
186
187 let project_prefix = primary_workspace_root
188 .as_ref()
189 .and_then(|root| self.config.project_root.strip_prefix(root).ok())
190 .map(|p| p.to_path_buf());
191 let input_root = primary_workspace_root
192 .clone()
193 .unwrap_or_else(|| self.config.project_root.clone());
194
195 let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
196 let resolved_inputs = {
197 let _g = span_inputs.enter();
198 let resolver = InputResolver::new(&input_root);
199 let mut all_inputs = task.collect_all_inputs_with_prefix(project_prefix.as_deref());
200 all_inputs.extend(workspace_input_patterns.iter().cloned());
201 resolver.resolve(&all_inputs)?
202 };
203 if task_trace_enabled() {
204 tracing::info!(
205 task = %name,
206 input_root = %input_root.display(),
207 project_root = %self.config.project_root.display(),
208 inputs_count = resolved_inputs.files.len(),
209 workspace_inputs = workspace_input_patterns.len(),
210 "Resolved task inputs"
211 );
212 }
213
214 let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
216 let env_summary: BTreeMap<String, String> = self
218 .config
219 .environment
220 .vars
221 .iter()
222 .map(|(k, v)| (k.clone(), v.clone()))
223 .collect();
224 let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
225 let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
226 let shell_json = serde_json::to_value(&task.shell).ok();
227 let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
228 None
229 } else {
230 Some(workspace_lockfile_hashes)
231 };
232
233 let envelope = task_cache::CacheKeyEnvelope {
234 inputs: inputs_summary.clone(),
235 command: task.command.clone(),
236 args: task.args.clone(),
237 shell: shell_json,
238 env: env_summary.clone(),
239 cuenv_version: cuenv_version.clone(),
240 platform: platform.clone(),
241 workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
242 workspace_package_hashes: None,
244 };
245 let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
246
247 let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
249 let cache_hit = {
250 let _g = span_cache.enter();
251 task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
252 };
253
254 if let Some(hit) = cache_hit {
255 tracing::info!(
256 task = %name,
257 key = %cache_key,
258 path = %hit.path.display(),
259 "Task {} cache hit: {}. Skipping execution.",
260 name,
261 cache_key
262 );
263 if self.config.show_cache_path {
264 tracing::info!(cache_path = %hit.path.display(), "Cache path");
265 }
266 if let Some(dest) = &self.config.materialize_outputs {
267 let count = task_cache::materialize_outputs(
268 &cache_key,
269 dest,
270 self.config.cache_dir.as_deref(),
271 )?;
272 tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
273 }
274 let stdout_path = hit.path.join("logs").join("stdout.log");
279 let stderr_path = hit.path.join("logs").join("stderr.log");
280 let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
281 let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
282 if !(stdout.is_empty() && stderr.is_empty()) {
286 if !self.config.capture_output {
287 cuenv_events::emit_task_cache_hit!(name, cache_key);
288 if !stdout.is_empty() {
290 cuenv_events::emit_task_output!(name, "stdout", stdout);
291 }
292 if !stderr.is_empty() {
293 cuenv_events::emit_task_output!(name, "stderr", stderr);
294 }
295 }
296 return Ok(TaskResult {
297 name: name.to_string(),
298 exit_code: Some(0),
299 stdout,
300 stderr,
301 success: true,
302 });
303 } else {
304 tracing::info!(
305 task = %name,
306 key = %cache_key,
307 "Cache entry lacks logs; executing to backfill logs"
308 );
309 }
310 }
311
312 tracing::info!(
313 task = %name,
314 key = %cache_key,
315 "Task {} executing hermetically… key {}",
316 name,
317 cache_key
318 );
319
320 let hermetic_root = create_hermetic_dir(name, &cache_key)?;
321 if self.config.show_cache_path {
322 tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
323 }
324
325 let span_populate =
327 tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
328 {
329 let _g = span_populate.enter();
330 populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
331 }
332
333 for (ws, entries) in workspace_ctxs {
335 self.materialize_workspace(&ws, &entries, &hermetic_root)
336 .await?;
337 }
338
339 if let Some(ref inputs_from) = task.inputs_from {
341 for task_output in inputs_from {
342 let source_task = &task_output.task;
343 let source_cache_key = task_cache::lookup_latest(
344 &self.config.project_root,
345 source_task,
346 self.config.cache_dir.as_deref(),
347 )
348 .ok_or_else(|| {
349 Error::configuration(format!(
350 "Task '{}' depends on outputs from '{}' but no cached result found. \
351 Ensure '{}' runs before this task (add it to dependsOn).",
352 name, source_task, source_task
353 ))
354 })?;
355
356 let materialized = task_cache::materialize_outputs(
358 &source_cache_key,
359 &hermetic_root,
360 self.config.cache_dir.as_deref(),
361 )?;
362 tracing::info!(
363 task = %name,
364 source_task = %source_task,
365 materialized = materialized,
366 "Materialized outputs from dependent task"
367 );
368 }
369 }
370
371 let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
373
374 let resolved_command = self.config.environment.resolve_command(&task.command);
379
380 let mut cmd = if let Some(shell) = &task.shell {
382 if shell.command.is_some() && shell.flag.is_some() {
383 let shell_command = shell.command.as_ref().unwrap();
384 let shell_flag = shell.flag.as_ref().unwrap();
385 let resolved_shell = self.config.environment.resolve_command(shell_command);
387 let mut cmd = Command::new(&resolved_shell);
388 cmd.arg(shell_flag);
389 if task.args.is_empty() {
390 cmd.arg(&resolved_command);
391 } else {
392 let full_command = if task.command.is_empty() {
393 task.args.join(" ")
394 } else {
395 format!("{} {}", resolved_command, task.args.join(" "))
396 };
397 cmd.arg(full_command);
398 }
399 cmd
400 } else {
401 let mut cmd = Command::new(&resolved_command);
402 for arg in &task.args {
403 cmd.arg(arg);
404 }
405 cmd
406 }
407 } else {
408 let mut cmd = Command::new(&resolved_command);
409 for arg in &task.args {
410 cmd.arg(arg);
411 }
412 cmd
413 };
414
415 let workdir = if let Some(dir) = &self.config.working_dir {
416 dir.clone()
417 } else if let Some(prefix) = project_prefix.as_ref() {
418 hermetic_root.join(prefix)
419 } else {
420 hermetic_root.clone()
421 };
422 std::fs::create_dir_all(&workdir).map_err(|e| Error::Io {
423 source: e,
424 path: Some(workdir.clone().into()),
425 operation: "create_dir_all".into(),
426 })?;
427 cmd.current_dir(&workdir);
428 let env_vars = self.config.environment.merge_with_system();
430 if task_trace_enabled() {
431 tracing::info!(
432 task = %name,
433 hermetic_root = %hermetic_root.display(),
434 workdir = %workdir.display(),
435 command = %task.command,
436 args = ?task.args,
437 env_count = env_vars.len(),
438 "Launching task command"
439 );
440 }
441 for (k, v) in env_vars {
442 cmd.env(k, v);
443 }
444
445 cmd.stdout(Stdio::piped());
448 cmd.stderr(Stdio::piped());
449
450 let stream_logs = !self.config.capture_output;
451 if stream_logs {
452 let cmd_str = if task.command.is_empty() {
453 task.args.join(" ")
454 } else {
455 format!("{} {}", task.command, task.args.join(" "))
456 };
457 cuenv_events::emit_task_started!(name, cmd_str, true);
458 }
459
460 let start = std::time::Instant::now();
461 let mut child = cmd.spawn().map_err(|e| {
462 Error::configuration(format!("Failed to spawn task '{}': {}", name, e))
463 })?;
464
465 let stdout_handle = child.stdout.take();
466 let stderr_handle = child.stderr.take();
467
468 let stdout_task = async move {
469 if let Some(mut stdout) = stdout_handle {
470 let mut output = Vec::new();
471 let mut buf = [0u8; 4096];
472 use std::io::Write;
473 use tokio::io::AsyncReadExt;
474
475 loop {
476 match stdout.read(&mut buf).await {
477 Ok(0) => break, Ok(n) => {
479 let chunk = &buf[0..n];
480 if stream_logs {
481 let mut handle = std::io::stdout().lock();
482 let _ = handle.write_all(chunk);
483 let _ = handle.flush();
484 }
485 output.extend_from_slice(chunk);
486 }
487 Err(_) => break,
488 }
489 }
490 String::from_utf8_lossy(&output).to_string()
491 } else {
492 String::new()
493 }
494 };
495
496 let stderr_task = async move {
497 if let Some(mut stderr) = stderr_handle {
498 let mut output = Vec::new();
499 let mut buf = [0u8; 4096];
500 use std::io::Write;
501 use tokio::io::AsyncReadExt;
502
503 loop {
504 match stderr.read(&mut buf).await {
505 Ok(0) => break, Ok(n) => {
507 let chunk = &buf[0..n];
508 if stream_logs {
509 let mut handle = std::io::stderr().lock();
510 let _ = handle.write_all(chunk);
511 let _ = handle.flush();
512 }
513 output.extend_from_slice(chunk);
514 }
515 Err(_) => break,
516 }
517 }
518 String::from_utf8_lossy(&output).to_string()
519 } else {
520 String::new()
521 }
522 };
523
524 let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
525
526 let status = child.wait().await.map_err(|e| {
527 Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
528 })?;
529 let duration = start.elapsed();
530
531 let exit_code = status.code().unwrap_or(1);
532 let success = status.success();
533 if !success {
534 tracing::warn!(task = %name, exit = exit_code, "Task failed");
535 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
536 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
537 } else {
538 tracing::info!(task = %name, "Task completed successfully");
539 }
540
541 let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
543 task.outputs
544 .iter()
545 .map(|o| prefix.join(o).to_string_lossy().to_string())
546 .collect()
547 } else {
548 task.outputs.clone()
549 };
550 let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
551 let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
552 let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
553
554 let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
556 if outputs_stage.exists() {
557 let _ = std::fs::remove_dir_all(&outputs_stage);
558 }
559 std::fs::create_dir_all(&outputs_stage).map_err(|e| Error::Io {
560 source: e,
561 path: Some(outputs_stage.clone().into()),
562 operation: "create_dir_all".into(),
563 })?;
564
565 for rel in &outputs {
566 let rel_for_project = project_prefix
567 .as_ref()
568 .and_then(|prefix| rel.strip_prefix(prefix).ok())
569 .unwrap_or(rel)
570 .to_path_buf();
571 let src = hermetic_root.join(rel);
572 #[allow(clippy::collapsible_if)]
575 if let Ok(meta) = std::fs::metadata(&src) {
576 if meta.is_file() {
577 let dst = outputs_stage.join(&rel_for_project);
578 if let Some(parent) = dst.parent() {
579 std::fs::create_dir_all(parent).map_err(|e| Error::Io {
580 source: e,
581 path: Some(parent.into()),
582 operation: "create_dir_all".into(),
583 })?;
584 }
585 std::fs::copy(&src, &dst).map_err(|e| Error::Io {
586 source: e,
587 path: Some(dst.into()),
588 operation: "copy".into(),
589 })?;
590 let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
591 output_index.push(task_cache::OutputIndexEntry {
592 rel_path: rel_for_project.to_string_lossy().to_string(),
593 size: meta.len(),
594 sha256: sha,
595 });
596 }
597 }
598 }
599
600 let mut warned = false;
602 for entry in walkdir::WalkDir::new(&hermetic_root)
603 .into_iter()
604 .filter_map(|e| e.ok())
605 {
606 let p = entry.path();
607 if p.is_dir() {
608 continue;
609 }
610 let rel = match p.strip_prefix(&hermetic_root) {
611 Ok(r) => r.to_path_buf(),
612 Err(_) => continue,
613 };
614 let rel_str = rel.to_string_lossy().to_string();
615 let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
616 let initial = initial_hashes.get(&rel_str);
617 let changed = match initial {
618 None => true,
619 Some(prev) => prev != &sha,
620 };
621 if changed && !outputs_set.contains(&rel) {
622 if !warned {
623 tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
624 warned = true;
625 }
626 tracing::debug!(path = %rel_str, "Undeclared write");
627 }
628 }
629
630 if success {
632 let meta = task_cache::TaskResultMeta {
633 task_name: name.to_string(),
634 command: task.command.clone(),
635 args: task.args.clone(),
636 env_summary,
637 inputs_summary: inputs_summary.clone(),
638 created_at: Utc::now(),
639 cuenv_version,
640 platform,
641 duration_ms: duration.as_millis(),
642 exit_code,
643 cache_key_envelope: envelope_json.clone(),
644 output_index,
645 };
646 let logs = task_cache::TaskLogs {
647 stdout: Some(stdout.clone()),
650 stderr: Some(stderr.clone()),
651 };
652 let cache_span = tracing::info_span!("cache.save", key = %cache_key);
653 {
654 let _g = cache_span.enter();
655 task_cache::save_result(
656 &cache_key,
657 &meta,
658 &outputs_stage,
659 &hermetic_root,
660 logs,
661 self.config.cache_dir.as_deref(),
662 )?;
663 }
664
665 task_cache::record_latest(
667 &self.config.project_root,
668 name,
669 &cache_key,
670 self.config.cache_dir.as_deref(),
671 )?;
672
673 } else {
675 }
677
678 Ok(TaskResult {
679 name: name.to_string(),
680 exit_code: Some(exit_code),
681 stdout,
682 stderr,
683 success,
684 })
685 }
686 }
687
688 async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
692 let workdir = if !task.hermetic && !task.workspaces.is_empty() {
696 let workspace_name = &task.workspaces[0];
698 let manager = match workspace_name.as_str() {
699 "bun" => PackageManager::Bun,
700 "npm" => PackageManager::Npm,
701 "pnpm" => PackageManager::Pnpm,
702 "yarn" => PackageManager::YarnModern,
703 "cargo" => PackageManager::Cargo,
704 _ => PackageManager::Npm, };
706 find_workspace_root(manager, &self.config.project_root)
707 } else {
708 self.config.project_root.clone()
709 };
710
711 tracing::info!(
712 task = %name,
713 workdir = %workdir.display(),
714 hermetic = false,
715 "Executing non-hermetic task"
716 );
717
718 let cmd_str = if task.command.is_empty() {
720 task.args.join(" ")
721 } else {
722 format!("{} {}", task.command, task.args.join(" "))
723 };
724 if !self.config.capture_output {
725 cuenv_events::emit_task_started!(name, cmd_str, false);
726 }
727
728 let resolved_command = self.config.environment.resolve_command(&task.command);
730
731 let mut cmd = if let Some(shell) = &task.shell {
733 if shell.command.is_some() && shell.flag.is_some() {
734 let shell_command = shell.command.as_ref().unwrap();
735 let shell_flag = shell.flag.as_ref().unwrap();
736 let resolved_shell = self.config.environment.resolve_command(shell_command);
737 let mut cmd = Command::new(&resolved_shell);
738 cmd.arg(shell_flag);
739 if task.args.is_empty() {
740 cmd.arg(&resolved_command);
741 } else {
742 let full_command = if task.command.is_empty() {
743 task.args.join(" ")
744 } else {
745 format!("{} {}", resolved_command, task.args.join(" "))
746 };
747 cmd.arg(full_command);
748 }
749 cmd
750 } else {
751 let mut cmd = Command::new(&resolved_command);
752 for arg in &task.args {
753 cmd.arg(arg);
754 }
755 cmd
756 }
757 } else {
758 let mut cmd = Command::new(&resolved_command);
759 for arg in &task.args {
760 cmd.arg(arg);
761 }
762 cmd
763 };
764
765 cmd.current_dir(&workdir);
767 let env_vars = self.config.environment.merge_with_system();
768 for (k, v) in &env_vars {
769 cmd.env(k, v);
770 }
771
772 if self.config.capture_output {
775 let output = cmd
776 .stdout(Stdio::piped())
777 .stderr(Stdio::piped())
778 .output()
779 .await
780 .map_err(|e| Error::Io {
781 source: e,
782 path: None,
783 operation: format!("spawn task {}", name),
784 })?;
785
786 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
787 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
788 let exit_code = output.status.code().unwrap_or(-1);
789 let success = output.status.success();
790
791 if !success {
792 tracing::warn!(task = %name, exit = exit_code, "Task failed");
793 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
794 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
795 }
796
797 Ok(TaskResult {
798 name: name.to_string(),
799 exit_code: Some(exit_code),
800 stdout,
801 stderr,
802 success,
803 })
804 } else {
805 let status = cmd
807 .stdout(Stdio::inherit())
808 .stderr(Stdio::inherit())
809 .status()
810 .await
811 .map_err(|e| Error::Io {
812 source: e,
813 path: None,
814 operation: format!("spawn task {}", name),
815 })?;
816
817 let exit_code = status.code().unwrap_or(-1);
818 let success = status.success();
819
820 if !success {
821 tracing::warn!(task = %name, exit = exit_code, "Task failed");
822 }
823
824 Ok(TaskResult {
825 name: name.to_string(),
826 exit_code: Some(exit_code),
827 stdout: String::new(), stderr: String::new(),
829 success,
830 })
831 }
832 }
833
834 #[async_recursion]
836 pub async fn execute_definition(
837 &self,
838 name: &str,
839 definition: &TaskDefinition,
840 all_tasks: &Tasks,
841 ) -> Result<Vec<TaskResult>> {
842 match definition {
843 TaskDefinition::Single(task) => {
844 let result = self.execute_task(name, task.as_ref()).await?;
845 Ok(vec![result])
846 }
847 TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
848 }
849 }
850
851 async fn execute_group(
852 &self,
853 prefix: &str,
854 group: &TaskGroup,
855 all_tasks: &Tasks,
856 ) -> Result<Vec<TaskResult>> {
857 match group {
858 TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
859 TaskGroup::Parallel(tasks) => self.execute_parallel(prefix, tasks, all_tasks).await,
860 }
861 }
862
863 async fn execute_sequential(
864 &self,
865 prefix: &str,
866 tasks: &[TaskDefinition],
867 all_tasks: &Tasks,
868 ) -> Result<Vec<TaskResult>> {
869 if !self.config.capture_output {
870 cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
871 }
872 let mut results = Vec::new();
873 for (i, task_def) in tasks.iter().enumerate() {
874 let task_name = format!("{}[{}]", prefix, i);
875 let task_results = self
876 .execute_definition(&task_name, task_def, all_tasks)
877 .await?;
878 for result in &task_results {
879 if !result.success {
880 let message = format!(
881 "Sequential task group '{prefix}' halted.\n\n{}",
882 summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
883 );
884 return Err(Error::configuration(message));
885 }
886 }
887 results.extend(task_results);
888 }
889 Ok(results)
890 }
891
892 async fn execute_parallel(
893 &self,
894 prefix: &str,
895 tasks: &HashMap<String, TaskDefinition>,
896 all_tasks: &Tasks,
897 ) -> Result<Vec<TaskResult>> {
898 if let Some(default_task) = tasks.get("default") {
900 if !self.config.capture_output {
901 cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
902 }
903 let task_name = format!("{}.default", prefix);
906 return self
907 .execute_definition(&task_name, default_task, all_tasks)
908 .await;
909 }
910
911 if !self.config.capture_output {
912 cuenv_events::emit_task_group_started!(prefix, false, tasks.len());
913 }
914 let mut join_set = JoinSet::new();
915 let all_tasks = Arc::new(all_tasks.clone());
916 let mut all_results = Vec::new();
917 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
918 if let Some(failed) = results.iter().find(|r| !r.success) {
919 let message = format!(
920 "Parallel task group '{prefix}' halted.\n\n{}",
921 summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
922 );
923 return Err(Error::configuration(message));
924 }
925 all_results.extend(results);
926 Ok(())
927 };
928 for (name, task_def) in tasks {
929 let task_name = format!("{}.{}", prefix, name);
930 let task_def = task_def.clone();
931 let all_tasks = Arc::clone(&all_tasks);
932 let executor = self.clone_with_config();
933 join_set.spawn(async move {
934 executor
935 .execute_definition(&task_name, &task_def, &all_tasks)
936 .await
937 });
938 if self.config.max_parallel > 0
939 && join_set.len() >= self.config.max_parallel
940 && let Some(result) = join_set.join_next().await
941 {
942 match result {
943 Ok(Ok(results)) => merge_results(results)?,
944 Ok(Err(e)) => return Err(e),
945 Err(e) => {
946 return Err(Error::configuration(format!(
947 "Task execution panicked: {}",
948 e
949 )));
950 }
951 }
952 }
953 }
954 while let Some(result) = join_set.join_next().await {
955 match result {
956 Ok(Ok(results)) => merge_results(results)?,
957 Ok(Err(e)) => return Err(e),
958 Err(e) => {
959 return Err(Error::configuration(format!(
960 "Task execution panicked: {}",
961 e
962 )));
963 }
964 }
965 }
966 Ok(all_results)
967 }
968
969 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
970 let parallel_groups = graph.get_parallel_groups()?;
971 let mut all_results = Vec::new();
972 let mut join_set = JoinSet::new();
973 let mut group_iter = parallel_groups.into_iter();
974 let mut current_group = group_iter.next();
975 while current_group.is_some() || !join_set.is_empty() {
976 if let Some(group) = current_group.as_mut() {
977 while let Some(node) = group.pop() {
978 let task = node.task.clone();
979 let name = node.name.clone();
980 let executor = self.clone_with_config();
981 join_set.spawn(async move { executor.execute_task(&name, &task).await });
982 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
983 break;
984 }
985 }
986 if group.is_empty() {
987 current_group = group_iter.next();
988 }
989 }
990 if let Some(result) = join_set.join_next().await {
991 match result {
992 Ok(Ok(task_result)) => {
993 if !task_result.success {
994 let message = format!(
995 "Task graph execution halted.\n\n{}",
996 summarize_task_failure(&task_result, TASK_FAILURE_SNIPPET_LINES)
997 );
998 return Err(Error::configuration(message));
999 }
1000 all_results.push(task_result);
1001 }
1002 Ok(Err(e)) => return Err(e),
1003 Err(e) => {
1004 return Err(Error::configuration(format!(
1005 "Task execution panicked: {}",
1006 e
1007 )));
1008 }
1009 }
1010 }
1011 }
1012 Ok(all_results)
1013 }
1014
1015 async fn resolve_workspace(
1016 &self,
1017 _task_name: &str,
1018 task: &Task,
1019 workspace_name: &str,
1020 config: &WorkspaceConfig,
1021 ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
1022 let root = self.config.project_root.clone();
1023 let task_label = _task_name.to_string();
1024 let command = task.command.clone();
1025 let config_pm = config.package_manager.clone();
1026 let config_root = config.root.clone();
1027 let mut packages = Vec::new();
1035
1036 let lockfile_override: Option<String> = None; let mut traverse_workspace_deps = true;
1043
1044 let workspace_name_owned = workspace_name.to_string();
1046 tokio::task::spawn_blocking(move || {
1047 let override_for_detection = lockfile_override.as_ref().map(|lock| {
1048 let candidate = PathBuf::from(lock);
1049 if candidate.is_absolute() {
1050 candidate
1051 } else {
1052 root.join(lock)
1053 }
1054 });
1055
1056 let manager = if let Some(pm_str) = config_pm {
1059 match pm_str.as_str() {
1060 "npm" => PackageManager::Npm,
1061 "bun" => PackageManager::Bun,
1062 "pnpm" => PackageManager::Pnpm,
1063 "yarn" => PackageManager::YarnModern,
1064 "yarn-classic" => PackageManager::YarnClassic,
1065 "cargo" => PackageManager::Cargo,
1066 _ => {
1067 return Err(Error::configuration(format!(
1068 "Unknown package manager: {}",
1069 pm_str
1070 )));
1071 }
1072 }
1073 } else {
1074 match workspace_name_owned.as_str() {
1076 "npm" => PackageManager::Npm,
1077 "bun" => PackageManager::Bun,
1078 "pnpm" => PackageManager::Pnpm,
1079 "yarn" => PackageManager::YarnModern,
1080 "cargo" => PackageManager::Cargo,
1081 _ => {
1082 let hint = detect_from_command(&command);
1085 let detected = match detect_package_managers(&root) {
1086 Ok(list) => list,
1087 Err(e) => {
1088 if override_for_detection.is_some() {
1089 Vec::new()
1090 } else {
1091 return Err(Error::configuration(format!(
1092 "Failed to detect package managers: {}",
1093 e
1094 )));
1095 }
1096 }
1097 };
1098
1099 if let Some(h) = hint {
1100 if detected.contains(&h) {
1101 h
1102 } else if !detected.is_empty() {
1103 detected[0]
1105 } else if let Some(ref override_path) = override_for_detection {
1106 infer_manager_from_lockfile(override_path).ok_or_else(|| {
1107 Error::configuration(
1108 "Unable to infer package manager from lockfile override",
1109 )
1110 })?
1111 } else {
1112 return Err(Error::configuration(
1113 format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
1114 ));
1115 }
1116 } else if !detected.is_empty() {
1117 detected[0]
1118 } else if let Some(ref override_path) = override_for_detection {
1119 infer_manager_from_lockfile(override_path).ok_or_else(|| {
1120 Error::configuration(
1121 "Unable to infer package manager from lockfile override",
1122 )
1123 })?
1124 } else {
1125 return Err(Error::configuration(
1126 "Could not detect package manager for workspace resolution",
1127 ));
1128 }
1129 }
1130 }
1131 };
1132
1133 let workspace_root = if let Some(root_override) = &config_root {
1137 root.join(root_override)
1138 } else {
1139 find_workspace_root(manager, &root)
1140 };
1141
1142 if task_trace_enabled() {
1143 tracing::info!(
1144 task = %task_label,
1145 manager = %manager,
1146 project_root = %root.display(),
1147 workspace_root = %workspace_root.display(),
1148 "Resolved workspace root for package manager"
1149 );
1150 }
1151
1152 let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
1153 let candidate = PathBuf::from(lock);
1154 if candidate.is_absolute() {
1155 candidate
1156 } else {
1157 workspace_root.join(lock)
1158 }
1159 });
1160
1161 let discovery: Box<dyn WorkspaceDiscovery> = match manager {
1163 PackageManager::Npm
1164 | PackageManager::Bun
1165 | PackageManager::YarnClassic
1166 | PackageManager::YarnModern => Box::new(PackageJsonDiscovery),
1167 PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
1168 PackageManager::Cargo => Box::new(CargoTomlDiscovery),
1169 };
1170
1171 let workspace = discovery.discover(&workspace_root).map_err(|e| {
1172 Error::configuration(format!("Failed to discover workspace: {}", e))
1173 })?;
1174
1175 let lockfile_path = if let Some(path) = lockfile_override_path {
1177 if !path.exists() {
1178 return Err(Error::configuration(format!(
1179 "Workspace lockfile override does not exist: {}",
1180 path.display()
1181 )));
1182 }
1183 path
1184 } else {
1185 workspace.lockfile.clone().ok_or_else(|| {
1186 Error::configuration("Workspace resolution requires a lockfile")
1187 })?
1188 };
1189
1190 let parser: Box<dyn LockfileParser> = match manager {
1191 PackageManager::Npm => Box::new(NpmLockfileParser),
1192 PackageManager::Bun => Box::new(BunLockfileParser),
1193 PackageManager::Pnpm => Box::new(PnpmLockfileParser),
1194 PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
1195 PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
1196 PackageManager::Cargo => Box::new(CargoLockfileParser),
1197 };
1198
1199 let entries = parser
1200 .parse(&lockfile_path)
1201 .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
1202 if task_trace_enabled() {
1203 tracing::info!(
1204 task = %task_label,
1205 lockfile = %lockfile_path.display(),
1206 members = entries.len(),
1207 "Parsed workspace lockfile"
1208 );
1209 }
1210
1211 let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
1213
1214 if packages.is_empty() {
1218 let current_member = workspace
1219 .members
1220 .iter()
1221 .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
1222 if let Some(member) = current_member {
1223 let inferred = vec![member.name.clone()];
1224 if task_trace_enabled() {
1225 tracing::info!(
1226 task = %task_label,
1227 inferred_packages = ?inferred,
1228 "Inferred workspace packages from current project"
1229 );
1230 }
1231 packages = inferred;
1232 traverse_workspace_deps = true;
1233 }
1234 }
1235
1236 let mut member_paths = Vec::new();
1238
1239 member_paths.push(manager.workspace_config_name().to_string());
1241 if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
1242 member_paths.push(rel.to_string_lossy().to_string());
1243 } else {
1244 member_paths.push(lockfile_path.to_string_lossy().to_string());
1245 }
1246
1247 if packages.is_empty() {
1248 for member in &workspace.members {
1249 let manifest_rel = member
1250 .path
1251 .join(manager.workspace_config_name());
1252 member_paths.push(manifest_rel.to_string_lossy().to_string());
1253 }
1254 } else {
1255 let mut to_visit: Vec<String> = packages.clone();
1256 let mut visited = HashSet::new();
1257
1258 while let Some(pkg_name) = to_visit.pop() {
1259 if visited.contains(&pkg_name) {
1260 continue;
1261 }
1262 visited.insert(pkg_name.clone());
1263
1264 if let Some(member) = workspace.find_member(&pkg_name) {
1265 let manifest_rel = member
1266 .path
1267 .join(manager.workspace_config_name());
1268 member_paths.push(manifest_rel.to_string_lossy().to_string());
1269
1270 if traverse_workspace_deps {
1272 let mut dependency_candidates: HashSet<String> = HashSet::new();
1273
1274 if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1275 for dep in &entry.dependencies {
1276 if entries
1277 .iter()
1278 .any(|e| e.name == dep.name && e.is_workspace_member)
1279 {
1280 dependency_candidates.insert(dep.name.clone());
1281 }
1282 }
1283 }
1284
1285 for dep_name in &member.dependencies {
1286 if workspace.find_member(dep_name).is_some() {
1287 dependency_candidates.insert(dep_name.clone());
1288 }
1289 }
1290
1291 for dep_name in dependency_candidates {
1292 to_visit.push(dep_name);
1293 }
1294 }
1295 }
1296 }
1297 }
1298
1299 if task_trace_enabled() {
1300 tracing::info!(
1301 task = %task_label,
1302 members = ?member_paths,
1303 "Workspace input member paths selected"
1304 );
1305 }
1306
1307 Ok((workspace, entries, member_paths, Some(hash)))
1308 })
1309 .await
1310 .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1311 }
1312
1313 async fn materialize_workspace(
1314 &self,
1315 workspace: &Workspace,
1316 entries: &[LockfileEntry],
1317 target_dir: &Path,
1318 ) -> Result<()> {
1319 let materializer: Box<dyn Materializer> = match workspace.manager {
1321 PackageManager::Npm
1322 | PackageManager::Bun
1323 | PackageManager::Pnpm
1324 | PackageManager::YarnClassic
1325 | PackageManager::YarnModern => Box::new(NodeModulesMaterializer),
1326 PackageManager::Cargo => Box::new(CargoMaterializer),
1327 };
1328
1329 materializer
1330 .materialize(workspace, entries, target_dir)
1331 .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1332 }
1333
1334 fn clone_with_config(&self) -> Self {
1335 Self::with_shared_backend(self.config.clone(), self.backend.clone())
1337 }
1338}
1339
1340fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1341 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1342
1343 loop {
1344 let is_root = match manager {
1345 PackageManager::Npm
1346 | PackageManager::Bun
1347 | PackageManager::YarnClassic
1348 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
1349 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1350 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
1351 };
1352
1353 if is_root {
1354 return current;
1355 }
1356
1357 if let Some(parent) = current.parent() {
1358 current = parent.to_path_buf();
1359 } else {
1360 return start.to_path_buf();
1361 }
1362 }
1363}
1364
1365fn package_json_has_workspaces(dir: &Path) -> bool {
1366 let path = dir.join("package.json");
1367 let content = std::fs::read_to_string(&path);
1368 let Ok(json) = content.and_then(|s| {
1369 serde_json::from_str::<serde_json::Value>(&s)
1370 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1371 }) else {
1372 return false;
1373 };
1374
1375 match json.get("workspaces") {
1376 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1377 Some(serde_json::Value::Object(map)) => map
1378 .get("packages")
1379 .and_then(|packages| packages.as_array())
1380 .map(|arr| !arr.is_empty())
1381 .unwrap_or(false),
1382 _ => false,
1383 }
1384}
1385
1386fn cargo_toml_has_workspace(dir: &Path) -> bool {
1387 let path = dir.join("Cargo.toml");
1388 let Ok(content) = std::fs::read_to_string(&path) else {
1389 return false;
1390 };
1391
1392 content.contains("[workspace]")
1393}
1394
1395fn task_trace_enabled() -> bool {
1396 static ENABLED: OnceLock<bool> = OnceLock::new();
1397 *ENABLED.get_or_init(|| {
1398 matches!(
1399 std::env::var("CUENV_TRACE_TASKS")
1400 .unwrap_or_default()
1401 .trim()
1402 .to_ascii_lowercase()
1403 .as_str(),
1404 "1" | "true" | "yes" | "on"
1405 )
1406 })
1407}
1408
1409pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1412 let exit_code = result
1413 .exit_code
1414 .map(|c| c.to_string())
1415 .unwrap_or_else(|| "unknown".to_string());
1416
1417 let mut sections = Vec::new();
1418 sections.push(format!(
1419 "Task '{}' failed with exit code {}.",
1420 result.name, exit_code
1421 ));
1422
1423 let output = format_failure_streams(result, max_output_lines);
1424 if output.is_empty() {
1425 sections.push(
1426 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1427 .to_string(),
1428 );
1429 } else {
1430 sections.push(output);
1431 }
1432
1433 sections.join("\n\n")
1434}
1435
1436fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1437 let mut streams = Vec::new();
1438
1439 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1440 streams.push(stdout);
1441 }
1442
1443 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1444 streams.push(stderr);
1445 }
1446
1447 streams.join("\n\n")
1448}
1449
1450fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1451 let normalized = content.trim_end();
1452 if normalized.is_empty() {
1453 return None;
1454 }
1455
1456 let lines: Vec<&str> = normalized.lines().collect();
1457 let total = lines.len();
1458 let start = total.saturating_sub(max_output_lines);
1459 let snippet = lines[start..].join("\n");
1460
1461 let header = if total > max_output_lines {
1462 format!("{label} (last {max_output_lines} of {total} lines):")
1463 } else {
1464 format!("{label}:")
1465 };
1466
1467 Some(format!("{header}\n{snippet}"))
1468}
1469
1470fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1471 match path.file_name().and_then(|n| n.to_str())? {
1472 "package-lock.json" => Some(PackageManager::Npm),
1473 "bun.lock" => Some(PackageManager::Bun),
1474 "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1475 "yarn.lock" => Some(PackageManager::YarnModern),
1476 "Cargo.lock" => Some(PackageManager::Cargo),
1477 _ => None,
1478 }
1479}
1480
1481fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1482 let sanitized_task = task_name
1485 .chars()
1486 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1487 .collect::<String>();
1488
1489 let base = std::env::temp_dir().join(format!(
1490 "cuenv-work-{}-{}",
1491 sanitized_task,
1492 &key[..12.min(key.len())]
1493 ));
1494
1495 if base.exists()
1498 && let Err(e) = std::fs::remove_dir_all(&base)
1499 {
1500 let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1503 let fallback = std::env::temp_dir().join(format!(
1504 "cuenv-work-{}-{}-{}",
1505 sanitized_task,
1506 &key[..12.min(key.len())],
1507 ts
1508 ));
1509 tracing::warn!(
1510 previous = %base.display(),
1511 fallback = %fallback.display(),
1512 error = %e,
1513 "Failed to clean previous hermetic workdir; using fresh fallback directory"
1514 );
1515 std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1516 source: e,
1517 path: Some(fallback.clone().into()),
1518 operation: "create_dir_all".into(),
1519 })?;
1520 return Ok(fallback);
1521 }
1522
1523 std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1524 source: e,
1525 path: Some(base.clone().into()),
1526 operation: "create_dir_all".into(),
1527 })?;
1528 Ok(base)
1529}
1530
1531pub async fn execute_command(
1533 command: &str,
1534 args: &[String],
1535 environment: &Environment,
1536) -> Result<i32> {
1537 tracing::info!("Executing command: {} {:?}", command, args);
1538 let mut cmd = Command::new(command);
1539 cmd.args(args);
1540 let env_vars = environment.merge_with_system();
1541 for (key, value) in env_vars {
1542 cmd.env(key, value);
1543 }
1544 cmd.stdout(Stdio::inherit());
1545 cmd.stderr(Stdio::inherit());
1546 cmd.stdin(Stdio::inherit());
1547 let status = cmd.status().await.map_err(|e| {
1548 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1549 })?;
1550 Ok(status.code().unwrap_or(1))
1551}
1552
1553#[cfg(test)]
1554mod tests {
1555 use super::*;
1556 use crate::tasks::Input;
1557 use std::fs;
1558 use tempfile::TempDir;
1559
1560 #[tokio::test]
1561 async fn test_executor_config_default() {
1562 let config = ExecutorConfig::default();
1563 assert!(!config.capture_output);
1564 assert_eq!(config.max_parallel, 0);
1565 assert!(config.environment.is_empty());
1566 }
1567
1568 #[tokio::test]
1569 async fn test_task_result() {
1570 let result = TaskResult {
1571 name: "test".to_string(),
1572 exit_code: Some(0),
1573 stdout: "output".to_string(),
1574 stderr: String::new(),
1575 success: true,
1576 };
1577 assert_eq!(result.name, "test");
1578 assert_eq!(result.exit_code, Some(0));
1579 assert!(result.success);
1580 assert_eq!(result.stdout, "output");
1581 }
1582
1583 #[tokio::test]
1584 async fn test_execute_simple_task() {
1585 let config = ExecutorConfig {
1586 capture_output: true,
1587 ..Default::default()
1588 };
1589 let executor = TaskExecutor::new(config);
1590 let task = Task {
1591 command: "echo".to_string(),
1592 args: vec!["hello".to_string()],
1593 description: Some("Hello task".to_string()),
1594 ..Default::default()
1595 };
1596 let result = executor.execute_task("test", &task).await.unwrap();
1597 assert!(result.success);
1598 assert_eq!(result.exit_code, Some(0));
1599 assert!(result.stdout.contains("hello"));
1600 }
1601
1602 #[tokio::test]
1603 async fn test_execute_with_environment() {
1604 let mut config = ExecutorConfig {
1605 capture_output: true,
1606 ..Default::default()
1607 };
1608 config
1609 .environment
1610 .set("TEST_VAR".to_string(), "test_value".to_string());
1611 let executor = TaskExecutor::new(config);
1612 let task = Task {
1613 command: "printenv".to_string(),
1614 args: vec!["TEST_VAR".to_string()],
1615 description: Some("Print env task".to_string()),
1616 ..Default::default()
1617 };
1618 let result = executor.execute_task("test", &task).await.unwrap();
1619 assert!(result.success);
1620 assert!(result.stdout.contains("test_value"));
1621 }
1622
1623 #[tokio::test]
1624 async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1625 let tmp = TempDir::new().unwrap();
1626 let root = tmp.path();
1627
1628 fs::write(
1630 root.join("package.json"),
1631 r#"{
1632 "name": "root-app",
1633 "version": "0.0.0",
1634 "workspaces": ["packages/*", "apps/*"],
1635 "dependencies": {
1636 "@rawkodeacademy/content-technologies": "workspace:*"
1637 }
1638}"#,
1639 )
1640 .unwrap();
1641 fs::write(
1644 root.join("bun.lock"),
1645 r#"{
1646 "lockfileVersion": 1,
1647 "workspaces": {
1648 "": {
1649 "name": "root-app",
1650 "dependencies": {
1651 "@rawkodeacademy/content-technologies": "workspace:*"
1652 }
1653 },
1654 "packages/content-technologies": {
1655 "name": "@rawkodeacademy/content-technologies",
1656 "version": "0.0.1"
1657 },
1658 "apps/site": {
1659 "version": "0.0.0",
1660 "dependencies": {
1661 "@rawkodeacademy/content-technologies": "workspace:*"
1662 }
1663 }
1664 },
1665 "packages": {}
1666}"#,
1667 )
1668 .unwrap();
1669
1670 fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1672 fs::write(
1673 root.join("packages/content-technologies/package.json"),
1674 r#"{
1675 "name": "@rawkodeacademy/content-technologies",
1676 "version": "0.0.1"
1677}"#,
1678 )
1679 .unwrap();
1680
1681 fs::create_dir_all(root.join("apps/site")).unwrap();
1682 fs::write(
1683 root.join("apps/site/package.json"),
1684 r#"{
1685 "name": "site",
1686 "version": "0.0.0",
1687 "dependencies": {
1688 "@rawkodeacademy/content-technologies": "workspace:*"
1689 }
1690}"#,
1691 )
1692 .unwrap();
1693
1694 let mut workspaces = HashMap::new();
1695 workspaces.insert(
1696 "bun".to_string(),
1697 WorkspaceConfig {
1698 enabled: true,
1699 package_manager: Some("bun".to_string()),
1700 root: None,
1701 },
1702 );
1703
1704 let config = ExecutorConfig {
1705 capture_output: true,
1706 project_root: root.join("apps/site"),
1707 workspaces: Some(workspaces),
1708 ..Default::default()
1709 };
1710 let executor = TaskExecutor::new(config);
1711
1712 let task = Task {
1713 command: "sh".to_string(),
1714 args: vec![
1715 "-c".to_string(),
1716 "find ../.. -maxdepth 4 -type d | sort".to_string(),
1717 ],
1718 inputs: vec![Input::Path("package.json".to_string())],
1719 workspaces: vec!["bun".to_string()],
1720 ..Default::default()
1721 };
1722
1723 let result = executor.execute_task("install", &task).await.unwrap();
1724 assert!(
1725 result.success,
1726 "command failed stdout='{}' stderr='{}'",
1727 result.stdout, result.stderr
1728 );
1729 assert!(
1730 result
1731 .stdout
1732 .split_whitespace()
1733 .any(|line| line.ends_with("packages/content-technologies")),
1734 "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1735 result.stdout,
1736 result.stderr
1737 );
1738 }
1739
1740 #[tokio::test]
1741 async fn test_execute_failing_task() {
1742 let config = ExecutorConfig {
1743 capture_output: true,
1744 ..Default::default()
1745 };
1746 let executor = TaskExecutor::new(config);
1747 let task = Task {
1748 command: "false".to_string(),
1749 description: Some("Failing task".to_string()),
1750 ..Default::default()
1751 };
1752 let result = executor.execute_task("test", &task).await.unwrap();
1753 assert!(!result.success);
1754 assert_eq!(result.exit_code, Some(1));
1755 }
1756
1757 #[tokio::test]
1758 async fn test_execute_sequential_group() {
1759 let config = ExecutorConfig {
1760 capture_output: true,
1761 ..Default::default()
1762 };
1763 let executor = TaskExecutor::new(config);
1764 let task1 = Task {
1765 command: "echo".to_string(),
1766 args: vec!["first".to_string()],
1767 description: Some("First task".to_string()),
1768 ..Default::default()
1769 };
1770 let task2 = Task {
1771 command: "echo".to_string(),
1772 args: vec!["second".to_string()],
1773 description: Some("Second task".to_string()),
1774 ..Default::default()
1775 };
1776 let group = TaskGroup::Sequential(vec![
1777 TaskDefinition::Single(Box::new(task1)),
1778 TaskDefinition::Single(Box::new(task2)),
1779 ]);
1780 let all_tasks = Tasks::new();
1781 let results = executor
1782 .execute_group("seq", &group, &all_tasks)
1783 .await
1784 .unwrap();
1785 assert_eq!(results.len(), 2);
1786 assert!(results[0].stdout.contains("first"));
1787 assert!(results[1].stdout.contains("second"));
1788 }
1789
1790 #[tokio::test]
1791 async fn test_command_injection_prevention() {
1792 let config = ExecutorConfig {
1793 capture_output: true,
1794 ..Default::default()
1795 };
1796 let executor = TaskExecutor::new(config);
1797 let malicious_task = Task {
1798 command: "echo".to_string(),
1799 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1800 description: Some("Malicious task test".to_string()),
1801 ..Default::default()
1802 };
1803 let result = executor
1804 .execute_task("malicious", &malicious_task)
1805 .await
1806 .unwrap();
1807 assert!(result.success);
1808 assert!(result.stdout.contains("hello ; rm -rf /"));
1809 }
1810
1811 #[tokio::test]
1812 async fn test_special_characters_in_args() {
1813 let config = ExecutorConfig {
1814 capture_output: true,
1815 ..Default::default()
1816 };
1817 let executor = TaskExecutor::new(config);
1818 let special_chars = vec![
1819 "$USER",
1820 "$(whoami)",
1821 "`whoami`",
1822 "&& echo hacked",
1823 "|| echo failed",
1824 "> /tmp/hack",
1825 "| cat",
1826 ];
1827 for special_arg in special_chars {
1828 let task = Task {
1829 command: "echo".to_string(),
1830 args: vec!["safe".to_string(), special_arg.to_string()],
1831 description: Some("Special character test".to_string()),
1832 ..Default::default()
1833 };
1834 let result = executor.execute_task("special", &task).await.unwrap();
1835 assert!(result.success);
1836 assert!(result.stdout.contains("safe"));
1837 assert!(result.stdout.contains(special_arg));
1838 }
1839 }
1840
1841 #[tokio::test]
1842 async fn test_environment_variable_safety() {
1843 let mut config = ExecutorConfig {
1844 capture_output: true,
1845 ..Default::default()
1846 };
1847 config
1848 .environment
1849 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1850 let executor = TaskExecutor::new(config);
1851 let task = Task {
1852 command: "printenv".to_string(),
1853 args: vec!["DANGEROUS_VAR".to_string()],
1854 description: Some("Environment variable safety test".to_string()),
1855 ..Default::default()
1856 };
1857 let result = executor.execute_task("env_test", &task).await.unwrap();
1858 assert!(result.success);
1859 assert!(result.stdout.contains("; rm -rf /"));
1860 }
1861
1862 #[tokio::test]
1863 async fn test_execute_graph_parallel_groups() {
1864 let config = ExecutorConfig {
1866 capture_output: true,
1867 max_parallel: 2,
1868 ..Default::default()
1869 };
1870 let executor = TaskExecutor::new(config);
1871 let mut graph = TaskGraph::new();
1872
1873 let t1 = Task {
1874 command: "echo".into(),
1875 args: vec!["A".into()],
1876 ..Default::default()
1877 };
1878 let t2 = Task {
1879 command: "echo".into(),
1880 args: vec!["B".into()],
1881 ..Default::default()
1882 };
1883
1884 graph.add_task("t1", t1).unwrap();
1885 graph.add_task("t2", t2).unwrap();
1886 let results = executor.execute_graph(&graph).await.unwrap();
1887 assert_eq!(results.len(), 2);
1888 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1889 assert!(joined.contains("A") && joined.contains("B"));
1890 }
1891}