1use super::{Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
9use crate::cache::tasks as task_cache;
10use crate::environment::Environment;
11use crate::manifest::WorkspaceConfig;
12use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
13use crate::{Error, Result};
14use async_recursion::async_recursion;
15use chrono::Utc;
16use cuenv_workspaces::{
17 BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
18 NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
19 PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
20 YarnModernLockfileParser, detect_from_command, detect_package_managers,
21 materializer::{
22 Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
23 },
24};
25use std::collections::{BTreeMap, HashMap, HashSet};
26use std::path::{Path, PathBuf};
27use std::process::Stdio;
28use std::sync::{Arc, OnceLock};
29use tokio::io::{AsyncBufReadExt, BufReader};
30use tokio::process::Command;
31use tokio::task::JoinSet;
32
33#[derive(Debug, Clone)]
35pub struct TaskResult {
36 pub name: String,
37 pub exit_code: Option<i32>,
38 pub stdout: String,
39 pub stderr: String,
40 pub success: bool,
41}
42
43pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
45
46#[derive(Debug, Clone)]
48pub struct ExecutorConfig {
49 pub capture_output: bool,
51 pub max_parallel: usize,
53 pub environment: Environment,
55 pub working_dir: Option<PathBuf>,
57 pub project_root: PathBuf,
59 pub materialize_outputs: Option<PathBuf>,
61 pub cache_dir: Option<PathBuf>,
63 pub show_cache_path: bool,
65 pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
67}
68
69impl Default for ExecutorConfig {
70 fn default() -> Self {
71 Self {
72 capture_output: false,
73 max_parallel: 0,
74 environment: Environment::new(),
75 working_dir: None,
76 project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
77 materialize_outputs: None,
78 cache_dir: None,
79 show_cache_path: false,
80 workspaces: None,
81 }
82 }
83}
84
85pub struct TaskExecutor {
87 config: ExecutorConfig,
88}
89impl TaskExecutor {
90 pub fn new(config: ExecutorConfig) -> Self {
91 Self { config }
92 }
93
94 pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
96 let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
98 let mut workspace_input_patterns = Vec::new();
99 let mut workspace_lockfile_hashes = BTreeMap::new();
100
101 for workspace_name in &task.workspaces {
102 if let Some(global_workspaces) = &self.config.workspaces {
103 if let Some(ws_config) = global_workspaces.get(workspace_name) {
104 if ws_config.enabled {
105 let (ws, entries, paths, hash) = self
106 .resolve_workspace(name, task, workspace_name, ws_config)
107 .await?;
108 workspace_ctxs.push((ws, entries));
109 workspace_input_patterns.extend(paths);
110 if let Some(h) = hash {
111 workspace_lockfile_hashes.insert(workspace_name.clone(), h);
112 }
113 }
114 } else {
115 tracing::warn!(
116 task = %name,
117 workspace = %workspace_name,
118 "Workspace not found in global configuration"
119 );
120 }
121 }
122 }
123
124 let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
129
130 let project_prefix = primary_workspace_root
131 .as_ref()
132 .and_then(|root| self.config.project_root.strip_prefix(root).ok())
133 .map(|p| p.to_path_buf());
134 let input_root = primary_workspace_root
135 .clone()
136 .unwrap_or_else(|| self.config.project_root.clone());
137
138 let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
139 let resolved_inputs = {
140 let _g = span_inputs.enter();
141 let resolver = InputResolver::new(&input_root);
142 let mut all_inputs: Vec<String> = Vec::new();
143
144 if let Some(prefix) = project_prefix.as_ref() {
145 for input in &task.inputs {
146 all_inputs.push(prefix.join(input).to_string_lossy().to_string());
147 }
148 } else {
149 all_inputs.extend(task.inputs.iter().cloned());
150 }
151
152 all_inputs.extend(workspace_input_patterns.iter().cloned());
153 resolver.resolve(&all_inputs)?
154 };
155 if task_trace_enabled() {
156 tracing::info!(
157 task = %name,
158 input_root = %input_root.display(),
159 project_root = %self.config.project_root.display(),
160 inputs_count = resolved_inputs.files.len(),
161 workspace_inputs = workspace_input_patterns.len(),
162 "Resolved task inputs"
163 );
164 }
165
166 let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
168 let env_summary: BTreeMap<String, String> = self
170 .config
171 .environment
172 .vars
173 .iter()
174 .map(|(k, v)| (k.clone(), v.clone()))
175 .collect();
176 let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
177 let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
178 let shell_json = serde_json::to_value(&task.shell).ok();
179 let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
180 None
181 } else {
182 Some(workspace_lockfile_hashes)
183 };
184
185 let envelope = task_cache::CacheKeyEnvelope {
186 inputs: inputs_summary.clone(),
187 command: task.command.clone(),
188 args: task.args.clone(),
189 shell: shell_json,
190 env: env_summary.clone(),
191 cuenv_version: cuenv_version.clone(),
192 platform: platform.clone(),
193 workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
194 workspace_package_hashes: None,
196 };
197 let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
198
199 let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
201 let cache_hit = {
202 let _g = span_cache.enter();
203 task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
204 };
205
206 if let Some(hit) = cache_hit {
207 tracing::info!(
208 task = %name,
209 key = %cache_key,
210 path = %hit.path.display(),
211 "Task {} cache hit: {}. Skipping execution.",
212 name,
213 cache_key
214 );
215 if self.config.show_cache_path {
216 tracing::info!(cache_path = %hit.path.display(), "Cache path");
217 }
218 if let Some(dest) = &self.config.materialize_outputs {
219 let count = task_cache::materialize_outputs(
220 &cache_key,
221 dest,
222 self.config.cache_dir.as_deref(),
223 )?;
224 tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
225 }
226 let stdout_path = hit.path.join("logs").join("stdout.log");
231 let stderr_path = hit.path.join("logs").join("stderr.log");
232 let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
233 let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
234 if !(stdout.is_empty() && stderr.is_empty()) {
238 if !self.config.capture_output {
239 let cmd_str = if task.command.is_empty() {
240 task.args.join(" ")
241 } else {
242 format!("{} {}", task.command, task.args.join(" "))
243 };
244 eprintln!("> [{name}] {cmd_str} (cached)");
245 }
246 return Ok(TaskResult {
247 name: name.to_string(),
248 exit_code: Some(0),
249 stdout,
250 stderr,
251 success: true,
252 });
253 } else {
254 tracing::info!(
255 task = %name,
256 key = %cache_key,
257 "Cache entry lacks logs; executing to backfill logs"
258 );
259 }
260 }
261
262 tracing::info!(
263 task = %name,
264 key = %cache_key,
265 "Task {} executing hermetically… key {}",
266 name,
267 cache_key
268 );
269
270 let hermetic_root = create_hermetic_dir(name, &cache_key)?;
271 if self.config.show_cache_path {
272 tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
273 }
274
275 let span_populate =
277 tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
278 {
279 let _g = span_populate.enter();
280 populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
281 }
282
283 for (ws, entries) in workspace_ctxs {
285 self.materialize_workspace(&ws, &entries, &hermetic_root)
286 .await?;
287 }
288
289 let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
291
292 let resolved_command = self.config.environment.resolve_command(&task.command);
297
298 let mut cmd = if let Some(shell) = &task.shell {
300 if shell.command.is_some() && shell.flag.is_some() {
301 let shell_command = shell.command.as_ref().unwrap();
302 let shell_flag = shell.flag.as_ref().unwrap();
303 let resolved_shell = self.config.environment.resolve_command(shell_command);
305 let mut cmd = Command::new(&resolved_shell);
306 cmd.arg(shell_flag);
307 if task.args.is_empty() {
308 cmd.arg(&resolved_command);
309 } else {
310 let full_command = if task.command.is_empty() {
311 task.args.join(" ")
312 } else {
313 format!("{} {}", resolved_command, task.args.join(" "))
314 };
315 cmd.arg(full_command);
316 }
317 cmd
318 } else {
319 let mut cmd = Command::new(&resolved_command);
320 for arg in &task.args {
321 cmd.arg(arg);
322 }
323 cmd
324 }
325 } else {
326 let mut cmd = Command::new(&resolved_command);
327 for arg in &task.args {
328 cmd.arg(arg);
329 }
330 cmd
331 };
332
333 let workdir = if let Some(dir) = &self.config.working_dir {
334 dir.clone()
335 } else if let Some(prefix) = project_prefix.as_ref() {
336 hermetic_root.join(prefix)
337 } else {
338 hermetic_root.clone()
339 };
340 let _ = std::fs::create_dir_all(&workdir);
341 cmd.current_dir(&workdir);
342 let env_vars = self.config.environment.merge_with_system();
344 if task_trace_enabled() {
345 tracing::info!(
346 task = %name,
347 hermetic_root = %hermetic_root.display(),
348 workdir = %workdir.display(),
349 command = %task.command,
350 args = ?task.args,
351 env_count = env_vars.len(),
352 "Launching task command"
353 );
354 }
355 for (k, v) in env_vars {
356 cmd.env(k, v);
357 }
358
359 cmd.stdout(Stdio::piped());
362 cmd.stderr(Stdio::piped());
363
364 let stream_logs = !self.config.capture_output;
365 if stream_logs {
366 let cmd_str = if task.command.is_empty() {
367 task.args.join(" ")
368 } else {
369 format!("{} {}", task.command, task.args.join(" "))
370 };
371 eprintln!("> [{name}] {cmd_str}");
372 }
373
374 let start = std::time::Instant::now();
375 let mut child = cmd
376 .spawn()
377 .map_err(|e| Error::configuration(format!("Failed to spawn task '{}': {}", name, e)))?;
378
379 let stdout_handle = child.stdout.take();
380 let stderr_handle = child.stderr.take();
381
382 let stdout_task = async move {
383 if let Some(stdout) = stdout_handle {
384 let reader = BufReader::new(stdout);
385 let mut lines = reader.lines();
386 let mut stdout_lines = Vec::new();
387 while let Ok(Some(line)) = lines.next_line().await {
388 if stream_logs {
389 println!("{line}");
390 }
391 stdout_lines.push(line);
392 }
393 stdout_lines.join("\n")
394 } else {
395 String::new()
396 }
397 };
398
399 let stderr_task = async move {
400 if let Some(stderr) = stderr_handle {
401 let reader = BufReader::new(stderr);
402 let mut lines = reader.lines();
403 let mut stderr_lines = Vec::new();
404 while let Ok(Some(line)) = lines.next_line().await {
405 if stream_logs {
406 eprintln!("{line}");
407 }
408 stderr_lines.push(line);
409 }
410 stderr_lines.join("\n")
411 } else {
412 String::new()
413 }
414 };
415
416 let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
417
418 let status = child.wait().await.map_err(|e| {
419 Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
420 })?;
421 let duration = start.elapsed();
422
423 let exit_code = status.code().unwrap_or(1);
424 let success = status.success();
425 if !success {
426 tracing::warn!(task = %name, exit = exit_code, "Task failed");
427 tracing::error!(task = %name, "Task stdout:\n{}", stdout);
428 tracing::error!(task = %name, "Task stderr:\n{}", stderr);
429 } else {
430 tracing::info!(task = %name, "Task completed successfully");
431 }
432
433 let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
435 task.outputs
436 .iter()
437 .map(|o| prefix.join(o).to_string_lossy().to_string())
438 .collect()
439 } else {
440 task.outputs.clone()
441 };
442 let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
443 let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
444 let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
445
446 let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
448 if outputs_stage.exists() {
449 let _ = std::fs::remove_dir_all(&outputs_stage);
450 }
451 std::fs::create_dir_all(&outputs_stage).ok();
452
453 for rel in &outputs {
454 let rel_for_project = project_prefix
455 .as_ref()
456 .and_then(|prefix| rel.strip_prefix(prefix).ok())
457 .unwrap_or(rel)
458 .to_path_buf();
459 let src = hermetic_root.join(rel);
460 #[allow(clippy::collapsible_if)]
463 if let Ok(meta) = std::fs::metadata(&src) {
464 if meta.is_file() {
465 let dst = outputs_stage.join(&rel_for_project);
466 if let Some(parent) = dst.parent() {
467 let _ = std::fs::create_dir_all(parent);
468 }
469 let _ = std::fs::copy(&src, &dst);
470 let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
471 output_index.push(task_cache::OutputIndexEntry {
472 rel_path: rel_for_project.to_string_lossy().to_string(),
473 size: meta.len(),
474 sha256: sha,
475 });
476 }
477 }
478 }
479
480 let mut warned = false;
482 for entry in walkdir::WalkDir::new(&hermetic_root)
483 .into_iter()
484 .filter_map(|e| e.ok())
485 {
486 let p = entry.path();
487 if p.is_dir() {
488 continue;
489 }
490 let rel = match p.strip_prefix(&hermetic_root) {
491 Ok(r) => r.to_path_buf(),
492 Err(_) => continue,
493 };
494 let rel_str = rel.to_string_lossy().to_string();
495 let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
496 let initial = initial_hashes.get(&rel_str);
497 let changed = match initial {
498 None => true,
499 Some(prev) => prev != &sha,
500 };
501 if changed && !outputs_set.contains(&rel) {
502 if !warned {
503 tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
504 warned = true;
505 }
506 tracing::debug!(path = %rel_str, "Undeclared write");
507 }
508 }
509
510 if success {
512 let meta = task_cache::TaskResultMeta {
513 task_name: name.to_string(),
514 command: task.command.clone(),
515 args: task.args.clone(),
516 env_summary,
517 inputs_summary: inputs_summary.clone(),
518 created_at: Utc::now(),
519 cuenv_version,
520 platform,
521 duration_ms: duration.as_millis(),
522 exit_code,
523 cache_key_envelope: envelope_json.clone(),
524 output_index,
525 };
526 let logs = task_cache::TaskLogs {
527 stdout: Some(stdout.clone()),
530 stderr: Some(stderr.clone()),
531 };
532 let cache_span = tracing::info_span!("cache.save", key = %cache_key);
533 {
534 let _g = cache_span.enter();
535 task_cache::save_result(
536 &cache_key,
537 &meta,
538 &outputs_stage,
539 &hermetic_root,
540 logs,
541 self.config.cache_dir.as_deref(),
542 )?;
543 }
544
545 for rel in &outputs {
547 let rel_for_project = project_prefix
548 .as_ref()
549 .and_then(|prefix| rel.strip_prefix(prefix).ok())
550 .unwrap_or(rel)
551 .to_path_buf();
552 let src = hermetic_root.join(rel);
553 let dst = self.config.project_root.join(&rel_for_project);
554 if src.exists() {
555 if let Some(parent) = dst.parent() {
556 std::fs::create_dir_all(parent).ok();
557 }
558 if let Err(e) = std::fs::copy(&src, &dst) {
561 tracing::warn!(
562 "Failed to copy output {} back to project root: {}",
563 rel.display(),
564 e
565 );
566 }
567 }
568 }
569 } else {
570 }
572
573 Ok(TaskResult {
574 name: name.to_string(),
575 exit_code: Some(exit_code),
576 stdout,
577 stderr,
578 success,
579 })
580 }
581
582 #[async_recursion]
584 pub async fn execute_definition(
585 &self,
586 name: &str,
587 definition: &TaskDefinition,
588 all_tasks: &Tasks,
589 ) -> Result<Vec<TaskResult>> {
590 match definition {
591 TaskDefinition::Single(task) => {
592 let result = self.execute_task(name, task.as_ref()).await?;
593 Ok(vec![result])
594 }
595 TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
596 }
597 }
598
599 async fn execute_group(
600 &self,
601 prefix: &str,
602 group: &TaskGroup,
603 all_tasks: &Tasks,
604 ) -> Result<Vec<TaskResult>> {
605 match group {
606 TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
607 TaskGroup::Parallel(tasks) => self.execute_parallel(prefix, tasks, all_tasks).await,
608 }
609 }
610
611 async fn execute_sequential(
612 &self,
613 prefix: &str,
614 tasks: &[TaskDefinition],
615 all_tasks: &Tasks,
616 ) -> Result<Vec<TaskResult>> {
617 if !self.config.capture_output {
618 eprintln!("> Running sequential group: {prefix}");
619 }
620 let mut results = Vec::new();
621 for (i, task_def) in tasks.iter().enumerate() {
622 let task_name = format!("{}[{}]", prefix, i);
623 let task_results = self
624 .execute_definition(&task_name, task_def, all_tasks)
625 .await?;
626 for result in &task_results {
627 if !result.success {
628 let message = format!(
629 "Sequential task group '{prefix}' halted.\n\n{}",
630 summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
631 );
632 return Err(Error::configuration(message));
633 }
634 }
635 results.extend(task_results);
636 }
637 Ok(results)
638 }
639
640 async fn execute_parallel(
641 &self,
642 prefix: &str,
643 tasks: &HashMap<String, TaskDefinition>,
644 all_tasks: &Tasks,
645 ) -> Result<Vec<TaskResult>> {
646 if let Some(default_task) = tasks.get("default") {
648 if !self.config.capture_output {
649 eprintln!("> Executing default task for group '{prefix}'");
650 }
651 let task_name = format!("{}.default", prefix);
654 return self
655 .execute_definition(&task_name, default_task, all_tasks)
656 .await;
657 }
658
659 if !self.config.capture_output {
660 eprintln!(
661 "> Running parallel group: {prefix} (max_parallel={})",
662 self.config.max_parallel
663 );
664 }
665 let mut join_set = JoinSet::new();
666 let all_tasks = Arc::new(all_tasks.clone());
667 let mut all_results = Vec::new();
668 let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
669 if let Some(failed) = results.iter().find(|r| !r.success) {
670 let message = format!(
671 "Parallel task group '{prefix}' halted.\n\n{}",
672 summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
673 );
674 return Err(Error::configuration(message));
675 }
676 all_results.extend(results);
677 Ok(())
678 };
679 for (name, task_def) in tasks {
680 let task_name = format!("{}.{}", prefix, name);
681 let task_def = task_def.clone();
682 let all_tasks = Arc::clone(&all_tasks);
683 let executor = self.clone_with_config();
684 join_set.spawn(async move {
685 executor
686 .execute_definition(&task_name, &task_def, &all_tasks)
687 .await
688 });
689 if self.config.max_parallel > 0
690 && join_set.len() >= self.config.max_parallel
691 && let Some(result) = join_set.join_next().await
692 {
693 match result {
694 Ok(Ok(results)) => merge_results(results)?,
695 Ok(Err(e)) => return Err(e),
696 Err(e) => {
697 return Err(Error::configuration(format!(
698 "Task execution panicked: {}",
699 e
700 )));
701 }
702 }
703 }
704 }
705 while let Some(result) = join_set.join_next().await {
706 match result {
707 Ok(Ok(results)) => merge_results(results)?,
708 Ok(Err(e)) => return Err(e),
709 Err(e) => {
710 return Err(Error::configuration(format!(
711 "Task execution panicked: {}",
712 e
713 )));
714 }
715 }
716 }
717 Ok(all_results)
718 }
719
720 pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
721 let parallel_groups = graph.get_parallel_groups()?;
722 let mut all_results = Vec::new();
723 let mut join_set = JoinSet::new();
724 let mut group_iter = parallel_groups.into_iter();
725 let mut current_group = group_iter.next();
726 while current_group.is_some() || !join_set.is_empty() {
727 if let Some(group) = current_group.as_mut() {
728 while let Some(node) = group.pop() {
729 let task = node.task.clone();
730 let name = node.name.clone();
731 let executor = self.clone_with_config();
732 join_set.spawn(async move { executor.execute_task(&name, &task).await });
733 if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
734 break;
735 }
736 }
737 if group.is_empty() {
738 current_group = group_iter.next();
739 }
740 }
741 if let Some(result) = join_set.join_next().await {
742 match result {
743 Ok(Ok(task_result)) => {
744 if !task_result.success {
745 let message = format!(
746 "Task graph execution halted.\n\n{}",
747 summarize_task_failure(&task_result, TASK_FAILURE_SNIPPET_LINES)
748 );
749 return Err(Error::configuration(message));
750 }
751 all_results.push(task_result);
752 }
753 Ok(Err(e)) => return Err(e),
754 Err(e) => {
755 return Err(Error::configuration(format!(
756 "Task execution panicked: {}",
757 e
758 )));
759 }
760 }
761 }
762 }
763 Ok(all_results)
764 }
765
766 async fn resolve_workspace(
767 &self,
768 _task_name: &str,
769 task: &Task,
770 workspace_name: &str,
771 config: &WorkspaceConfig,
772 ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
773 let root = self.config.project_root.clone();
774 let task_label = _task_name.to_string();
775 let command = task.command.clone();
776 let config_pm = config.package_manager.clone();
777 let config_root = config.root.clone();
778 let mut packages = Vec::new();
786
787 let lockfile_override: Option<String> = None; let mut traverse_workspace_deps = true;
794
795 let workspace_name_owned = workspace_name.to_string();
797 tokio::task::spawn_blocking(move || {
798 let override_for_detection = lockfile_override.as_ref().map(|lock| {
799 let candidate = PathBuf::from(lock);
800 if candidate.is_absolute() {
801 candidate
802 } else {
803 root.join(lock)
804 }
805 });
806
807 let manager = if let Some(pm_str) = config_pm {
810 match pm_str.as_str() {
811 "npm" => PackageManager::Npm,
812 "bun" => PackageManager::Bun,
813 "pnpm" => PackageManager::Pnpm,
814 "yarn" => PackageManager::YarnModern,
815 "yarn-classic" => PackageManager::YarnClassic,
816 "cargo" => PackageManager::Cargo,
817 _ => {
818 return Err(Error::configuration(format!(
819 "Unknown package manager: {}",
820 pm_str
821 )));
822 }
823 }
824 } else {
825 match workspace_name_owned.as_str() {
827 "npm" => PackageManager::Npm,
828 "bun" => PackageManager::Bun,
829 "pnpm" => PackageManager::Pnpm,
830 "yarn" => PackageManager::YarnModern,
831 "cargo" => PackageManager::Cargo,
832 _ => {
833 let hint = detect_from_command(&command);
836 let detected = match detect_package_managers(&root) {
837 Ok(list) => list,
838 Err(e) => {
839 if override_for_detection.is_some() {
840 Vec::new()
841 } else {
842 return Err(Error::configuration(format!(
843 "Failed to detect package managers: {}",
844 e
845 )));
846 }
847 }
848 };
849
850 if let Some(h) = hint {
851 if detected.contains(&h) {
852 h
853 } else if !detected.is_empty() {
854 detected[0]
856 } else if let Some(ref override_path) = override_for_detection {
857 infer_manager_from_lockfile(override_path).ok_or_else(|| {
858 Error::configuration(
859 "Unable to infer package manager from lockfile override",
860 )
861 })?
862 } else {
863 return Err(Error::configuration(
864 format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
865 ));
866 }
867 } else if !detected.is_empty() {
868 detected[0]
869 } else if let Some(ref override_path) = override_for_detection {
870 infer_manager_from_lockfile(override_path).ok_or_else(|| {
871 Error::configuration(
872 "Unable to infer package manager from lockfile override",
873 )
874 })?
875 } else {
876 return Err(Error::configuration(
877 "Could not detect package manager for workspace resolution",
878 ));
879 }
880 }
881 }
882 };
883
884 let workspace_root = if let Some(root_override) = &config_root {
888 root.join(root_override)
889 } else {
890 find_workspace_root(manager, &root)
891 };
892
893 if task_trace_enabled() {
894 tracing::info!(
895 task = %task_label,
896 manager = %manager,
897 project_root = %root.display(),
898 workspace_root = %workspace_root.display(),
899 "Resolved workspace root for package manager"
900 );
901 }
902
903 let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
904 let candidate = PathBuf::from(lock);
905 if candidate.is_absolute() {
906 candidate
907 } else {
908 workspace_root.join(lock)
909 }
910 });
911
912 let discovery: Box<dyn WorkspaceDiscovery> = match manager {
914 PackageManager::Npm
915 | PackageManager::Bun
916 | PackageManager::YarnClassic
917 | PackageManager::YarnModern => Box::new(PackageJsonDiscovery),
918 PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
919 PackageManager::Cargo => Box::new(CargoTomlDiscovery),
920 };
921
922 let workspace = discovery.discover(&workspace_root).map_err(|e| {
923 Error::configuration(format!("Failed to discover workspace: {}", e))
924 })?;
925
926 let lockfile_path = if let Some(path) = lockfile_override_path {
928 if !path.exists() {
929 return Err(Error::configuration(format!(
930 "Workspace lockfile override does not exist: {}",
931 path.display()
932 )));
933 }
934 path
935 } else {
936 workspace.lockfile.clone().ok_or_else(|| {
937 Error::configuration("Workspace resolution requires a lockfile")
938 })?
939 };
940
941 let parser: Box<dyn LockfileParser> = match manager {
942 PackageManager::Npm => Box::new(NpmLockfileParser),
943 PackageManager::Bun => Box::new(BunLockfileParser),
944 PackageManager::Pnpm => Box::new(PnpmLockfileParser),
945 PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
946 PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
947 PackageManager::Cargo => Box::new(CargoLockfileParser),
948 };
949
950 let entries = parser
951 .parse(&lockfile_path)
952 .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
953 if task_trace_enabled() {
954 tracing::info!(
955 task = %task_label,
956 lockfile = %lockfile_path.display(),
957 members = entries.len(),
958 "Parsed workspace lockfile"
959 );
960 }
961
962 let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
964
965 if packages.is_empty() {
969 let current_member = workspace
970 .members
971 .iter()
972 .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
973 if let Some(member) = current_member {
974 let inferred = vec![member.name.clone()];
975 if task_trace_enabled() {
976 tracing::info!(
977 task = %task_label,
978 inferred_packages = ?inferred,
979 "Inferred workspace packages from current project"
980 );
981 }
982 packages = inferred;
983 traverse_workspace_deps = true;
984 }
985 }
986
987 let mut member_paths = Vec::new();
989
990 member_paths.push(manager.workspace_config_name().to_string());
992 if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
993 member_paths.push(rel.to_string_lossy().to_string());
994 } else {
995 member_paths.push(lockfile_path.to_string_lossy().to_string());
996 }
997
998 if packages.is_empty() {
999 for member in &workspace.members {
1000 let manifest_rel = member
1001 .path
1002 .join(manager.workspace_config_name());
1003 member_paths.push(manifest_rel.to_string_lossy().to_string());
1004 }
1005 } else {
1006 let mut to_visit: Vec<String> = packages.clone();
1007 let mut visited = HashSet::new();
1008
1009 while let Some(pkg_name) = to_visit.pop() {
1010 if visited.contains(&pkg_name) {
1011 continue;
1012 }
1013 visited.insert(pkg_name.clone());
1014
1015 if let Some(member) = workspace.find_member(&pkg_name) {
1016 let manifest_rel = member
1017 .path
1018 .join(manager.workspace_config_name());
1019 member_paths.push(manifest_rel.to_string_lossy().to_string());
1020
1021 if traverse_workspace_deps {
1023 let mut dependency_candidates: HashSet<String> = HashSet::new();
1024
1025 if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1026 for dep in &entry.dependencies {
1027 if entries
1028 .iter()
1029 .any(|e| e.name == dep.name && e.is_workspace_member)
1030 {
1031 dependency_candidates.insert(dep.name.clone());
1032 }
1033 }
1034 }
1035
1036 for dep_name in &member.dependencies {
1037 if workspace.find_member(dep_name).is_some() {
1038 dependency_candidates.insert(dep_name.clone());
1039 }
1040 }
1041
1042 for dep_name in dependency_candidates {
1043 to_visit.push(dep_name);
1044 }
1045 }
1046 }
1047 }
1048 }
1049
1050 if task_trace_enabled() {
1051 tracing::info!(
1052 task = %task_label,
1053 members = ?member_paths,
1054 "Workspace input member paths selected"
1055 );
1056 }
1057
1058 Ok((workspace, entries, member_paths, Some(hash)))
1059 })
1060 .await
1061 .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1062 }
1063
1064 async fn materialize_workspace(
1065 &self,
1066 workspace: &Workspace,
1067 entries: &[LockfileEntry],
1068 target_dir: &Path,
1069 ) -> Result<()> {
1070 let materializer: Box<dyn Materializer> = match workspace.manager {
1072 PackageManager::Npm
1073 | PackageManager::Bun
1074 | PackageManager::Pnpm
1075 | PackageManager::YarnClassic
1076 | PackageManager::YarnModern => Box::new(NodeModulesMaterializer),
1077 PackageManager::Cargo => Box::new(CargoMaterializer),
1078 };
1079
1080 materializer
1081 .materialize(workspace, entries, target_dir)
1082 .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1083 }
1084
1085 fn clone_with_config(&self) -> Self {
1086 Self {
1087 config: self.config.clone(),
1088 }
1089 }
1090}
1091
1092fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1093 let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1094
1095 loop {
1096 let is_root = match manager {
1097 PackageManager::Npm
1098 | PackageManager::Bun
1099 | PackageManager::YarnClassic
1100 | PackageManager::YarnModern => package_json_has_workspaces(¤t),
1101 PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1102 PackageManager::Cargo => cargo_toml_has_workspace(¤t),
1103 };
1104
1105 if is_root {
1106 return current;
1107 }
1108
1109 if let Some(parent) = current.parent() {
1110 current = parent.to_path_buf();
1111 } else {
1112 return start.to_path_buf();
1113 }
1114 }
1115}
1116
1117fn package_json_has_workspaces(dir: &Path) -> bool {
1118 let path = dir.join("package.json");
1119 let content = std::fs::read_to_string(&path);
1120 let Ok(json) = content.and_then(|s| {
1121 serde_json::from_str::<serde_json::Value>(&s)
1122 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1123 }) else {
1124 return false;
1125 };
1126
1127 match json.get("workspaces") {
1128 Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1129 Some(serde_json::Value::Object(map)) => map
1130 .get("packages")
1131 .and_then(|packages| packages.as_array())
1132 .map(|arr| !arr.is_empty())
1133 .unwrap_or(false),
1134 _ => false,
1135 }
1136}
1137
1138fn cargo_toml_has_workspace(dir: &Path) -> bool {
1139 let path = dir.join("Cargo.toml");
1140 let Ok(content) = std::fs::read_to_string(&path) else {
1141 return false;
1142 };
1143
1144 content.contains("[workspace]")
1145}
1146
1147fn task_trace_enabled() -> bool {
1148 static ENABLED: OnceLock<bool> = OnceLock::new();
1149 *ENABLED.get_or_init(|| {
1150 matches!(
1151 std::env::var("CUENV_TRACE_TASKS")
1152 .unwrap_or_default()
1153 .trim()
1154 .to_ascii_lowercase()
1155 .as_str(),
1156 "1" | "true" | "yes" | "on"
1157 )
1158 })
1159}
1160
1161pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1164 let exit_code = result
1165 .exit_code
1166 .map(|c| c.to_string())
1167 .unwrap_or_else(|| "unknown".to_string());
1168
1169 let mut sections = Vec::new();
1170 sections.push(format!(
1171 "Task '{}' failed with exit code {}.",
1172 result.name, exit_code
1173 ));
1174
1175 let output = format_failure_streams(result, max_output_lines);
1176 if output.is_empty() {
1177 sections.push(
1178 "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1179 .to_string(),
1180 );
1181 } else {
1182 sections.push(output);
1183 }
1184
1185 sections.join("\n\n")
1186}
1187
1188fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1189 let mut streams = Vec::new();
1190
1191 if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1192 streams.push(stdout);
1193 }
1194
1195 if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1196 streams.push(stderr);
1197 }
1198
1199 streams.join("\n\n")
1200}
1201
1202fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1203 let normalized = content.trim_end();
1204 if normalized.is_empty() {
1205 return None;
1206 }
1207
1208 let lines: Vec<&str> = normalized.lines().collect();
1209 let total = lines.len();
1210 let start = total.saturating_sub(max_output_lines);
1211 let snippet = lines[start..].join("\n");
1212
1213 let header = if total > max_output_lines {
1214 format!("{label} (last {max_output_lines} of {total} lines):")
1215 } else {
1216 format!("{label}:")
1217 };
1218
1219 Some(format!("{header}\n{snippet}"))
1220}
1221
1222fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1223 match path.file_name().and_then(|n| n.to_str())? {
1224 "package-lock.json" => Some(PackageManager::Npm),
1225 "bun.lock" => Some(PackageManager::Bun),
1226 "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1227 "yarn.lock" => Some(PackageManager::YarnModern),
1228 "Cargo.lock" => Some(PackageManager::Cargo),
1229 _ => None,
1230 }
1231}
1232
1233fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1234 let sanitized_task = task_name
1237 .chars()
1238 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1239 .collect::<String>();
1240
1241 let base = std::env::temp_dir().join(format!(
1242 "cuenv-work-{}-{}",
1243 sanitized_task,
1244 &key[..12.min(key.len())]
1245 ));
1246
1247 if base.exists()
1250 && let Err(e) = std::fs::remove_dir_all(&base)
1251 {
1252 let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1255 let fallback = std::env::temp_dir().join(format!(
1256 "cuenv-work-{}-{}-{}",
1257 sanitized_task,
1258 &key[..12.min(key.len())],
1259 ts
1260 ));
1261 tracing::warn!(
1262 previous = %base.display(),
1263 fallback = %fallback.display(),
1264 error = %e,
1265 "Failed to clean previous hermetic workdir; using fresh fallback directory"
1266 );
1267 std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1268 source: e,
1269 path: Some(fallback.clone().into()),
1270 operation: "create_dir_all".into(),
1271 })?;
1272 return Ok(fallback);
1273 }
1274
1275 std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1276 source: e,
1277 path: Some(base.clone().into()),
1278 operation: "create_dir_all".into(),
1279 })?;
1280 Ok(base)
1281}
1282
1283pub async fn execute_command(
1285 command: &str,
1286 args: &[String],
1287 environment: &Environment,
1288) -> Result<i32> {
1289 tracing::info!("Executing command: {} {:?}", command, args);
1290 let mut cmd = Command::new(command);
1291 cmd.args(args);
1292 let env_vars = environment.merge_with_system();
1293 for (key, value) in env_vars {
1294 cmd.env(key, value);
1295 }
1296 cmd.stdout(Stdio::inherit());
1297 cmd.stderr(Stdio::inherit());
1298 cmd.stdin(Stdio::inherit());
1299 let status = cmd.status().await.map_err(|e| {
1300 Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1301 })?;
1302 Ok(status.code().unwrap_or(1))
1303}
1304
1305#[cfg(test)]
1306mod tests {
1307 use super::*;
1308 use std::fs;
1309 use tempfile::TempDir;
1310
1311 #[tokio::test]
1312 async fn test_executor_config_default() {
1313 let config = ExecutorConfig::default();
1314 assert!(!config.capture_output);
1315 assert_eq!(config.max_parallel, 0);
1316 assert!(config.environment.is_empty());
1317 }
1318
1319 #[tokio::test]
1320 async fn test_task_result() {
1321 let result = TaskResult {
1322 name: "test".to_string(),
1323 exit_code: Some(0),
1324 stdout: "output".to_string(),
1325 stderr: String::new(),
1326 success: true,
1327 };
1328 assert_eq!(result.name, "test");
1329 assert_eq!(result.exit_code, Some(0));
1330 assert!(result.success);
1331 assert_eq!(result.stdout, "output");
1332 }
1333
1334 #[tokio::test]
1335 async fn test_execute_simple_task() {
1336 let config = ExecutorConfig {
1337 capture_output: true,
1338 ..Default::default()
1339 };
1340 let executor = TaskExecutor::new(config);
1341 let task = Task {
1342 command: "echo".to_string(),
1343 args: vec!["hello".to_string()],
1344 shell: None,
1345 env: HashMap::new(),
1346 depends_on: vec![],
1347 inputs: vec![],
1348 outputs: vec![],
1349 external_inputs: None,
1350 workspaces: vec![],
1351 description: Some("Hello task".to_string()),
1352 };
1353 let result = executor.execute_task("test", &task).await.unwrap();
1354 assert!(result.success);
1355 assert_eq!(result.exit_code, Some(0));
1356 assert!(result.stdout.contains("hello"));
1357 }
1358
1359 #[tokio::test]
1360 async fn test_execute_with_environment() {
1361 let mut config = ExecutorConfig {
1362 capture_output: true,
1363 ..Default::default()
1364 };
1365 config
1366 .environment
1367 .set("TEST_VAR".to_string(), "test_value".to_string());
1368 let executor = TaskExecutor::new(config);
1369 let task = Task {
1370 command: "printenv".to_string(),
1371 args: vec!["TEST_VAR".to_string()],
1372 shell: None,
1373 env: HashMap::new(),
1374 depends_on: vec![],
1375 inputs: vec![],
1376 outputs: vec![],
1377 external_inputs: None,
1378 workspaces: vec![],
1379 description: Some("Print env task".to_string()),
1380 };
1381 let result = executor.execute_task("test", &task).await.unwrap();
1382 assert!(result.success);
1383 assert!(result.stdout.contains("test_value"));
1384 }
1385
1386 #[tokio::test]
1387 async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1388 let tmp = TempDir::new().unwrap();
1389 let root = tmp.path();
1390
1391 fs::write(
1393 root.join("package.json"),
1394 r#"{
1395 "name": "root-app",
1396 "version": "0.0.0",
1397 "workspaces": ["packages/*", "apps/*"],
1398 "dependencies": {
1399 "@rawkodeacademy/content-technologies": "workspace:*"
1400 }
1401}"#,
1402 )
1403 .unwrap();
1404 fs::write(
1407 root.join("bun.lock"),
1408 r#"{
1409 "lockfileVersion": 1,
1410 "workspaces": {
1411 "": {
1412 "name": "root-app",
1413 "dependencies": {
1414 "@rawkodeacademy/content-technologies": "workspace:*"
1415 }
1416 },
1417 "packages/content-technologies": {
1418 "name": "@rawkodeacademy/content-technologies",
1419 "version": "0.0.1"
1420 },
1421 "apps/site": {
1422 "version": "0.0.0",
1423 "dependencies": {
1424 "@rawkodeacademy/content-technologies": "workspace:*"
1425 }
1426 }
1427 },
1428 "packages": {}
1429}"#,
1430 )
1431 .unwrap();
1432
1433 fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1435 fs::write(
1436 root.join("packages/content-technologies/package.json"),
1437 r#"{
1438 "name": "@rawkodeacademy/content-technologies",
1439 "version": "0.0.1"
1440}"#,
1441 )
1442 .unwrap();
1443
1444 fs::create_dir_all(root.join("apps/site")).unwrap();
1445 fs::write(
1446 root.join("apps/site/package.json"),
1447 r#"{
1448 "name": "site",
1449 "version": "0.0.0",
1450 "dependencies": {
1451 "@rawkodeacademy/content-technologies": "workspace:*"
1452 }
1453}"#,
1454 )
1455 .unwrap();
1456
1457 let mut workspaces = HashMap::new();
1458 workspaces.insert(
1459 "bun".to_string(),
1460 WorkspaceConfig {
1461 enabled: true,
1462 package_manager: Some("bun".to_string()),
1463 root: None,
1464 },
1465 );
1466
1467 let config = ExecutorConfig {
1468 capture_output: true,
1469 project_root: root.join("apps/site"),
1470 workspaces: Some(workspaces),
1471 ..Default::default()
1472 };
1473 let executor = TaskExecutor::new(config);
1474
1475 let task = Task {
1476 command: "sh".to_string(),
1477 args: vec![
1478 "-c".to_string(),
1479 "find ../.. -maxdepth 4 -type d | sort".to_string(),
1480 ],
1481 shell: None,
1482 env: HashMap::new(),
1483 depends_on: vec![],
1484 inputs: vec!["package.json".to_string()],
1485 outputs: vec![],
1486 external_inputs: None,
1487 workspaces: vec!["bun".to_string()],
1488 description: None,
1489 };
1490
1491 let result = executor.execute_task("install", &task).await.unwrap();
1492 assert!(
1493 result.success,
1494 "command failed stdout='{}' stderr='{}'",
1495 result.stdout, result.stderr
1496 );
1497 assert!(
1498 result
1499 .stdout
1500 .split_whitespace()
1501 .any(|line| line.ends_with("packages/content-technologies")),
1502 "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1503 result.stdout,
1504 result.stderr
1505 );
1506 }
1507
1508 #[tokio::test]
1509 async fn test_execute_failing_task() {
1510 let config = ExecutorConfig {
1511 capture_output: true,
1512 ..Default::default()
1513 };
1514 let executor = TaskExecutor::new(config);
1515 let task = Task {
1516 command: "false".to_string(),
1517 args: vec![],
1518 shell: None,
1519 env: HashMap::new(),
1520 depends_on: vec![],
1521 inputs: vec![],
1522 outputs: vec![],
1523 external_inputs: None,
1524 workspaces: vec![],
1525 description: Some("Failing task".to_string()),
1526 };
1527 let result = executor.execute_task("test", &task).await.unwrap();
1528 assert!(!result.success);
1529 assert_eq!(result.exit_code, Some(1));
1530 }
1531
1532 #[tokio::test]
1533 async fn test_execute_sequential_group() {
1534 let config = ExecutorConfig {
1535 capture_output: true,
1536 ..Default::default()
1537 };
1538 let executor = TaskExecutor::new(config);
1539 let task1 = Task {
1540 command: "echo".to_string(),
1541 args: vec!["first".to_string()],
1542 shell: None,
1543 env: HashMap::new(),
1544 depends_on: vec![],
1545 inputs: vec![],
1546 outputs: vec![],
1547 external_inputs: None,
1548 workspaces: vec![],
1549 description: Some("First task".to_string()),
1550 };
1551 let task2 = Task {
1552 command: "echo".to_string(),
1553 args: vec!["second".to_string()],
1554 shell: None,
1555 env: HashMap::new(),
1556 depends_on: vec![],
1557 inputs: vec![],
1558 outputs: vec![],
1559 external_inputs: None,
1560 workspaces: vec![],
1561 description: Some("Second task".to_string()),
1562 };
1563 let group = TaskGroup::Sequential(vec![
1564 TaskDefinition::Single(Box::new(task1)),
1565 TaskDefinition::Single(Box::new(task2)),
1566 ]);
1567 let all_tasks = Tasks::new();
1568 let results = executor
1569 .execute_group("seq", &group, &all_tasks)
1570 .await
1571 .unwrap();
1572 assert_eq!(results.len(), 2);
1573 assert!(results[0].stdout.contains("first"));
1574 assert!(results[1].stdout.contains("second"));
1575 }
1576
1577 #[tokio::test]
1578 async fn test_command_injection_prevention() {
1579 let config = ExecutorConfig {
1580 capture_output: true,
1581 ..Default::default()
1582 };
1583 let executor = TaskExecutor::new(config);
1584 let malicious_task = Task {
1585 command: "echo".to_string(),
1586 args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1587 shell: None,
1588 env: HashMap::new(),
1589 depends_on: vec![],
1590 inputs: vec![],
1591 outputs: vec![],
1592 external_inputs: None,
1593 workspaces: vec![],
1594 description: Some("Malicious task test".to_string()),
1595 };
1596 let result = executor
1597 .execute_task("malicious", &malicious_task)
1598 .await
1599 .unwrap();
1600 assert!(result.success);
1601 assert!(result.stdout.contains("hello ; rm -rf /"));
1602 }
1603
1604 #[tokio::test]
1605 async fn test_special_characters_in_args() {
1606 let config = ExecutorConfig {
1607 capture_output: true,
1608 ..Default::default()
1609 };
1610 let executor = TaskExecutor::new(config);
1611 let special_chars = vec![
1612 "$USER",
1613 "$(whoami)",
1614 "`whoami`",
1615 "&& echo hacked",
1616 "|| echo failed",
1617 "> /tmp/hack",
1618 "| cat",
1619 ];
1620 for special_arg in special_chars {
1621 let task = Task {
1622 command: "echo".to_string(),
1623 args: vec!["safe".to_string(), special_arg.to_string()],
1624 shell: None,
1625 env: HashMap::new(),
1626 depends_on: vec![],
1627 inputs: vec![],
1628 outputs: vec![],
1629 external_inputs: None,
1630 workspaces: vec![],
1631 description: Some("Special character test".to_string()),
1632 };
1633 let result = executor.execute_task("special", &task).await.unwrap();
1634 assert!(result.success);
1635 assert!(result.stdout.contains("safe"));
1636 assert!(result.stdout.contains(special_arg));
1637 }
1638 }
1639
1640 #[tokio::test]
1641 async fn test_environment_variable_safety() {
1642 let mut config = ExecutorConfig {
1643 capture_output: true,
1644 ..Default::default()
1645 };
1646 config
1647 .environment
1648 .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1649 let executor = TaskExecutor::new(config);
1650 let task = Task {
1651 command: "printenv".to_string(),
1652 args: vec!["DANGEROUS_VAR".to_string()],
1653 shell: None,
1654 env: HashMap::new(),
1655 depends_on: vec![],
1656 inputs: vec![],
1657 outputs: vec![],
1658 external_inputs: None,
1659 workspaces: vec![],
1660 description: Some("Environment variable safety test".to_string()),
1661 };
1662 let result = executor.execute_task("env_test", &task).await.unwrap();
1663 assert!(result.success);
1664 assert!(result.stdout.contains("; rm -rf /"));
1665 }
1666
1667 #[tokio::test]
1668 async fn test_execute_graph_parallel_groups() {
1669 let config = ExecutorConfig {
1671 capture_output: true,
1672 max_parallel: 2,
1673 ..Default::default()
1674 };
1675 let executor = TaskExecutor::new(config);
1676 let mut graph = TaskGraph::new();
1677
1678 let t1 = Task {
1679 command: "echo".into(),
1680 args: vec!["A".into()],
1681 shell: None,
1682 env: HashMap::new(),
1683 depends_on: vec![],
1684 inputs: vec![],
1685 outputs: vec![],
1686 external_inputs: None,
1687 workspaces: vec![],
1688 description: None,
1689 };
1690 let t2 = Task {
1691 command: "echo".into(),
1692 args: vec!["B".into()],
1693 shell: None,
1694 env: HashMap::new(),
1695 depends_on: vec![],
1696 inputs: vec![],
1697 outputs: vec![],
1698 external_inputs: None,
1699 workspaces: vec![],
1700 description: None,
1701 };
1702
1703 graph.add_task("t1", t1).unwrap();
1704 graph.add_task("t2", t2).unwrap();
1705 let results = executor.execute_graph(&graph).await.unwrap();
1706 assert_eq!(results.len(), 2);
1707 let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1708 assert!(joined.contains("A") && joined.contains("B"));
1709 }
1710}