cuenv_core/tasks/
executor.rs

1//! Task executor for running tasks with environment support and hermetic, input-addressed execution
2//!
3//! - Environment variable propagation
4//! - Parallel and sequential execution
5//! - Hermetic workdir populated from declared inputs (files/dirs/globs)
6//! - Persistent task result cache keyed by inputs + command + env + cuenv version + platform
7
8use super::{Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
9use crate::cache::tasks as task_cache;
10use crate::environment::Environment;
11use crate::manifest::WorkspaceConfig;
12use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
13use crate::{Error, Result};
14use async_recursion::async_recursion;
15use chrono::Utc;
16use cuenv_workspaces::{
17    BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
18    NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
19    PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
20    YarnModernLockfileParser, detect_from_command, detect_package_managers,
21    materializer::{
22        Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
23    },
24};
25use std::collections::{BTreeMap, HashMap, HashSet};
26use std::path::{Path, PathBuf};
27use std::process::Stdio;
28use std::sync::{Arc, OnceLock};
29use tokio::io::{AsyncBufReadExt, BufReader};
30use tokio::process::Command;
31use tokio::task::JoinSet;
32
33/// Task execution result
34#[derive(Debug, Clone)]
35pub struct TaskResult {
36    pub name: String,
37    pub exit_code: Option<i32>,
38    pub stdout: String,
39    pub stderr: String,
40    pub success: bool,
41}
42
43/// Number of lines from stdout/stderr to include when summarizing failures
44pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
45
46/// Task executor configuration
47#[derive(Debug, Clone)]
48pub struct ExecutorConfig {
49    /// Whether to capture output (vs streaming to stdout/stderr)
50    pub capture_output: bool,
51    /// Maximum parallel tasks (0 = unlimited)
52    pub max_parallel: usize,
53    /// Environment variables to propagate (resolved via policies)
54    pub environment: Environment,
55    /// Optional working directory override (for hermetic execution)
56    pub working_dir: Option<PathBuf>,
57    /// Project root for resolving inputs/outputs (env.cue root)
58    pub project_root: PathBuf,
59    /// Optional: materialize cached outputs on cache hit
60    pub materialize_outputs: Option<PathBuf>,
61    /// Optional: cache directory override
62    pub cache_dir: Option<PathBuf>,
63    /// Optional: print cache path on hits/misses
64    pub show_cache_path: bool,
65    /// Global workspace configuration
66    pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
67}
68
69impl Default for ExecutorConfig {
70    fn default() -> Self {
71        Self {
72            capture_output: false,
73            max_parallel: 0,
74            environment: Environment::new(),
75            working_dir: None,
76            project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
77            materialize_outputs: None,
78            cache_dir: None,
79            show_cache_path: false,
80            workspaces: None,
81        }
82    }
83}
84
85/// Task executor
86pub struct TaskExecutor {
87    config: ExecutorConfig,
88}
89impl TaskExecutor {
90    pub fn new(config: ExecutorConfig) -> Self {
91        Self { config }
92    }
93
94    /// Execute a single task hermetically with caching
95    pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
96        // Resolve workspace dependencies if enabled
97        let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
98        let mut workspace_input_patterns = Vec::new();
99        let mut workspace_lockfile_hashes = BTreeMap::new();
100
101        for workspace_name in &task.workspaces {
102            if let Some(global_workspaces) = &self.config.workspaces {
103                if let Some(ws_config) = global_workspaces.get(workspace_name) {
104                    if ws_config.enabled {
105                        let (ws, entries, paths, hash) = self
106                            .resolve_workspace(name, task, workspace_name, ws_config)
107                            .await?;
108                        workspace_ctxs.push((ws, entries));
109                        workspace_input_patterns.extend(paths);
110                        if let Some(h) = hash {
111                            workspace_lockfile_hashes.insert(workspace_name.clone(), h);
112                        }
113                    }
114                } else {
115                    tracing::warn!(
116                        task = %name,
117                        workspace = %workspace_name,
118                        "Workspace not found in global configuration"
119                    );
120                }
121            }
122        }
123
124        // Resolve inputs relative to the workspace root (if any) while keeping
125        // project-scoped inputs anchored to the original project path.
126        // Use the first workspace as the primary root if multiple are present,
127        // or fallback to project root. This might need refinement for multi-workspace tasks.
128        let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
129
130        let project_prefix = primary_workspace_root
131            .as_ref()
132            .and_then(|root| self.config.project_root.strip_prefix(root).ok())
133            .map(|p| p.to_path_buf());
134        let input_root = primary_workspace_root
135            .clone()
136            .unwrap_or_else(|| self.config.project_root.clone());
137
138        let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
139        let resolved_inputs = {
140            let _g = span_inputs.enter();
141            let resolver = InputResolver::new(&input_root);
142            let mut all_inputs: Vec<String> = Vec::new();
143
144            if let Some(prefix) = project_prefix.as_ref() {
145                for input in &task.inputs {
146                    all_inputs.push(prefix.join(input).to_string_lossy().to_string());
147                }
148            } else {
149                all_inputs.extend(task.inputs.iter().cloned());
150            }
151
152            all_inputs.extend(workspace_input_patterns.iter().cloned());
153            resolver.resolve(&all_inputs)?
154        };
155        if task_trace_enabled() {
156            tracing::info!(
157                task = %name,
158                input_root = %input_root.display(),
159                project_root = %self.config.project_root.display(),
160                inputs_count = resolved_inputs.files.len(),
161                workspace_inputs = workspace_input_patterns.len(),
162                "Resolved task inputs"
163            );
164        }
165
166        // Build cache key envelope
167        let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
168        // Ensure deterministic order is already guaranteed by BTreeMap
169        let env_summary: BTreeMap<String, String> = self
170            .config
171            .environment
172            .vars
173            .iter()
174            .map(|(k, v)| (k.clone(), v.clone()))
175            .collect();
176        let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
177        let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
178        let shell_json = serde_json::to_value(&task.shell).ok();
179        let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
180            None
181        } else {
182            Some(workspace_lockfile_hashes)
183        };
184
185        let envelope = task_cache::CacheKeyEnvelope {
186            inputs: inputs_summary.clone(),
187            command: task.command.clone(),
188            args: task.args.clone(),
189            shell: shell_json,
190            env: env_summary.clone(),
191            cuenv_version: cuenv_version.clone(),
192            platform: platform.clone(),
193            workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
194            // Package hashes are implicitly included in inputs_summary because we added member paths to inputs
195            workspace_package_hashes: None,
196        };
197        let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
198
199        // Cache lookup
200        let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
201        let cache_hit = {
202            let _g = span_cache.enter();
203            task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
204        };
205
206        if let Some(hit) = cache_hit {
207            tracing::info!(
208                task = %name,
209                key = %cache_key,
210                path = %hit.path.display(),
211                "Task {} cache hit: {}. Skipping execution.",
212                name,
213                cache_key
214            );
215            if self.config.show_cache_path {
216                tracing::info!(cache_path = %hit.path.display(), "Cache path");
217            }
218            if let Some(dest) = &self.config.materialize_outputs {
219                let count = task_cache::materialize_outputs(
220                    &cache_key,
221                    dest,
222                    self.config.cache_dir.as_deref(),
223                )?;
224                tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
225            }
226            // On cache hit, surface cached logs so behavior matches a fresh
227            // execution from the caller's perspective. We return them in the
228            // TaskResult even if `capture_output` is false, allowing callers
229            // (like the CLI) to print cached logs explicitly.
230            let stdout_path = hit.path.join("logs").join("stdout.log");
231            let stderr_path = hit.path.join("logs").join("stderr.log");
232            let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
233            let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
234            // If logs are present, we can return immediately. If logs are
235            // missing (older cache format), fall through to execute to
236            // backfill logs for future hits.
237            if !(stdout.is_empty() && stderr.is_empty()) {
238                if !self.config.capture_output {
239                    let cmd_str = if task.command.is_empty() {
240                        task.args.join(" ")
241                    } else {
242                        format!("{} {}", task.command, task.args.join(" "))
243                    };
244                    eprintln!("> [{name}] {cmd_str} (cached)");
245                }
246                return Ok(TaskResult {
247                    name: name.to_string(),
248                    exit_code: Some(0),
249                    stdout,
250                    stderr,
251                    success: true,
252                });
253            } else {
254                tracing::info!(
255                    task = %name,
256                    key = %cache_key,
257                    "Cache entry lacks logs; executing to backfill logs"
258                );
259            }
260        }
261
262        tracing::info!(
263            task = %name,
264            key = %cache_key,
265            "Task {} executing hermetically… key {}",
266            name,
267            cache_key
268        );
269
270        let hermetic_root = create_hermetic_dir(name, &cache_key)?;
271        if self.config.show_cache_path {
272            tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
273        }
274
275        // Seed working directory with inputs
276        let span_populate =
277            tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
278        {
279            let _g = span_populate.enter();
280            populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
281        }
282
283        // Materialize workspace artifacts (node_modules, target)
284        for (ws, entries) in workspace_ctxs {
285            self.materialize_workspace(&ws, &entries, &hermetic_root)
286                .await?;
287        }
288
289        // Initial snapshot to detect undeclared writes
290        let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
291
292        // Resolve command path using the environment's PATH.
293        // This is necessary because when spawning a process, the OS looks up
294        // the executable in the current process's PATH, not the environment
295        // that will be set on the child process.
296        let resolved_command = self.config.environment.resolve_command(&task.command);
297
298        // Build command
299        let mut cmd = if let Some(shell) = &task.shell {
300            if shell.command.is_some() && shell.flag.is_some() {
301                let shell_command = shell.command.as_ref().unwrap();
302                let shell_flag = shell.flag.as_ref().unwrap();
303                // Resolve shell command too
304                let resolved_shell = self.config.environment.resolve_command(shell_command);
305                let mut cmd = Command::new(&resolved_shell);
306                cmd.arg(shell_flag);
307                if task.args.is_empty() {
308                    cmd.arg(&resolved_command);
309                } else {
310                    let full_command = if task.command.is_empty() {
311                        task.args.join(" ")
312                    } else {
313                        format!("{} {}", resolved_command, task.args.join(" "))
314                    };
315                    cmd.arg(full_command);
316                }
317                cmd
318            } else {
319                let mut cmd = Command::new(&resolved_command);
320                for arg in &task.args {
321                    cmd.arg(arg);
322                }
323                cmd
324            }
325        } else {
326            let mut cmd = Command::new(&resolved_command);
327            for arg in &task.args {
328                cmd.arg(arg);
329            }
330            cmd
331        };
332
333        let workdir = if let Some(dir) = &self.config.working_dir {
334            dir.clone()
335        } else if let Some(prefix) = project_prefix.as_ref() {
336            hermetic_root.join(prefix)
337        } else {
338            hermetic_root.clone()
339        };
340        let _ = std::fs::create_dir_all(&workdir);
341        cmd.current_dir(&workdir);
342        // Set environment variables (resolved + system), set CWD
343        let env_vars = self.config.environment.merge_with_system();
344        if task_trace_enabled() {
345            tracing::info!(
346                task = %name,
347                hermetic_root = %hermetic_root.display(),
348                workdir = %workdir.display(),
349                command = %task.command,
350                args = ?task.args,
351                env_count = env_vars.len(),
352                "Launching task command"
353            );
354        }
355        for (k, v) in env_vars {
356            cmd.env(k, v);
357        }
358
359        // Configure output: always capture to ensure consistent behavior on
360        // cache hits and allow callers to decide what to print.
361        cmd.stdout(Stdio::piped());
362        cmd.stderr(Stdio::piped());
363
364        let stream_logs = !self.config.capture_output;
365        if stream_logs {
366            let cmd_str = if task.command.is_empty() {
367                task.args.join(" ")
368            } else {
369                format!("{} {}", task.command, task.args.join(" "))
370            };
371            eprintln!("> [{name}] {cmd_str}");
372        }
373
374        let start = std::time::Instant::now();
375        let mut child = cmd
376            .spawn()
377            .map_err(|e| Error::configuration(format!("Failed to spawn task '{}': {}", name, e)))?;
378
379        let stdout_handle = child.stdout.take();
380        let stderr_handle = child.stderr.take();
381
382        let stdout_task = async move {
383            if let Some(stdout) = stdout_handle {
384                let reader = BufReader::new(stdout);
385                let mut lines = reader.lines();
386                let mut stdout_lines = Vec::new();
387                while let Ok(Some(line)) = lines.next_line().await {
388                    if stream_logs {
389                        println!("{line}");
390                    }
391                    stdout_lines.push(line);
392                }
393                stdout_lines.join("\n")
394            } else {
395                String::new()
396            }
397        };
398
399        let stderr_task = async move {
400            if let Some(stderr) = stderr_handle {
401                let reader = BufReader::new(stderr);
402                let mut lines = reader.lines();
403                let mut stderr_lines = Vec::new();
404                while let Ok(Some(line)) = lines.next_line().await {
405                    if stream_logs {
406                        eprintln!("{line}");
407                    }
408                    stderr_lines.push(line);
409                }
410                stderr_lines.join("\n")
411            } else {
412                String::new()
413            }
414        };
415
416        let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
417
418        let status = child.wait().await.map_err(|e| {
419            Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
420        })?;
421        let duration = start.elapsed();
422
423        let exit_code = status.code().unwrap_or(1);
424        let success = status.success();
425        if !success {
426            tracing::warn!(task = %name, exit = exit_code, "Task failed");
427            tracing::error!(task = %name, "Task stdout:\n{}", stdout);
428            tracing::error!(task = %name, "Task stderr:\n{}", stderr);
429        } else {
430            tracing::info!(task = %name, "Task completed successfully");
431        }
432
433        // Collect declared outputs and warn on undeclared writes
434        let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
435            task.outputs
436                .iter()
437                .map(|o| prefix.join(o).to_string_lossy().to_string())
438                .collect()
439        } else {
440            task.outputs.clone()
441        };
442        let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
443        let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
444        let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
445
446        // Stage outputs into a temp dir for cache persistence
447        let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
448        if outputs_stage.exists() {
449            let _ = std::fs::remove_dir_all(&outputs_stage);
450        }
451        std::fs::create_dir_all(&outputs_stage).ok();
452
453        for rel in &outputs {
454            let rel_for_project = project_prefix
455                .as_ref()
456                .and_then(|prefix| rel.strip_prefix(prefix).ok())
457                .unwrap_or(rel)
458                .to_path_buf();
459            let src = hermetic_root.join(rel);
460            // Intentionally avoid `let`-chains here to preserve readability
461            // and to align with review guidance; allow clippy's collapsible-if.
462            #[allow(clippy::collapsible_if)]
463            if let Ok(meta) = std::fs::metadata(&src) {
464                if meta.is_file() {
465                    let dst = outputs_stage.join(&rel_for_project);
466                    if let Some(parent) = dst.parent() {
467                        let _ = std::fs::create_dir_all(parent);
468                    }
469                    let _ = std::fs::copy(&src, &dst);
470                    let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
471                    output_index.push(task_cache::OutputIndexEntry {
472                        rel_path: rel_for_project.to_string_lossy().to_string(),
473                        size: meta.len(),
474                        sha256: sha,
475                    });
476                }
477            }
478        }
479
480        // Detect undeclared writes
481        let mut warned = false;
482        for entry in walkdir::WalkDir::new(&hermetic_root)
483            .into_iter()
484            .filter_map(|e| e.ok())
485        {
486            let p = entry.path();
487            if p.is_dir() {
488                continue;
489            }
490            let rel = match p.strip_prefix(&hermetic_root) {
491                Ok(r) => r.to_path_buf(),
492                Err(_) => continue,
493            };
494            let rel_str = rel.to_string_lossy().to_string();
495            let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
496            let initial = initial_hashes.get(&rel_str);
497            let changed = match initial {
498                None => true,
499                Some(prev) => prev != &sha,
500            };
501            if changed && !outputs_set.contains(&rel) {
502                if !warned {
503                    tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
504                    warned = true;
505                }
506                tracing::debug!(path = %rel_str, "Undeclared write");
507            }
508        }
509
510        // Persist cache entry on success
511        if success {
512            let meta = task_cache::TaskResultMeta {
513                task_name: name.to_string(),
514                command: task.command.clone(),
515                args: task.args.clone(),
516                env_summary,
517                inputs_summary: inputs_summary.clone(),
518                created_at: Utc::now(),
519                cuenv_version,
520                platform,
521                duration_ms: duration.as_millis(),
522                exit_code,
523                cache_key_envelope: envelope_json.clone(),
524                output_index,
525            };
526            let logs = task_cache::TaskLogs {
527                // Persist logs regardless of capture setting so cache hits can
528                // reproduce output faithfully.
529                stdout: Some(stdout.clone()),
530                stderr: Some(stderr.clone()),
531            };
532            let cache_span = tracing::info_span!("cache.save", key = %cache_key);
533            {
534                let _g = cache_span.enter();
535                task_cache::save_result(
536                    &cache_key,
537                    &meta,
538                    &outputs_stage,
539                    &hermetic_root,
540                    logs,
541                    self.config.cache_dir.as_deref(),
542                )?;
543            }
544
545            // Materialize outputs back to project root
546            for rel in &outputs {
547                let rel_for_project = project_prefix
548                    .as_ref()
549                    .and_then(|prefix| rel.strip_prefix(prefix).ok())
550                    .unwrap_or(rel)
551                    .to_path_buf();
552                let src = hermetic_root.join(rel);
553                let dst = self.config.project_root.join(&rel_for_project);
554                if src.exists() {
555                    if let Some(parent) = dst.parent() {
556                        std::fs::create_dir_all(parent).ok();
557                    }
558                    // We use copy instead of rename to keep hermetic dir intact for snapshots/logs if needed,
559                    // although it's temporary.
560                    if let Err(e) = std::fs::copy(&src, &dst) {
561                        tracing::warn!(
562                            "Failed to copy output {} back to project root: {}",
563                            rel.display(),
564                            e
565                        );
566                    }
567                }
568            }
569        } else {
570            // Optionally persist logs in a failure/ subdir: not implemented for brevity
571        }
572
573        Ok(TaskResult {
574            name: name.to_string(),
575            exit_code: Some(exit_code),
576            stdout,
577            stderr,
578            success,
579        })
580    }
581
582    /// Execute a task definition (single task or group)
583    #[async_recursion]
584    pub async fn execute_definition(
585        &self,
586        name: &str,
587        definition: &TaskDefinition,
588        all_tasks: &Tasks,
589    ) -> Result<Vec<TaskResult>> {
590        match definition {
591            TaskDefinition::Single(task) => {
592                let result = self.execute_task(name, task.as_ref()).await?;
593                Ok(vec![result])
594            }
595            TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
596        }
597    }
598
599    async fn execute_group(
600        &self,
601        prefix: &str,
602        group: &TaskGroup,
603        all_tasks: &Tasks,
604    ) -> Result<Vec<TaskResult>> {
605        match group {
606            TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
607            TaskGroup::Parallel(tasks) => self.execute_parallel(prefix, tasks, all_tasks).await,
608        }
609    }
610
611    async fn execute_sequential(
612        &self,
613        prefix: &str,
614        tasks: &[TaskDefinition],
615        all_tasks: &Tasks,
616    ) -> Result<Vec<TaskResult>> {
617        if !self.config.capture_output {
618            eprintln!("> Running sequential group: {prefix}");
619        }
620        let mut results = Vec::new();
621        for (i, task_def) in tasks.iter().enumerate() {
622            let task_name = format!("{}[{}]", prefix, i);
623            let task_results = self
624                .execute_definition(&task_name, task_def, all_tasks)
625                .await?;
626            for result in &task_results {
627                if !result.success {
628                    let message = format!(
629                        "Sequential task group '{prefix}' halted.\n\n{}",
630                        summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
631                    );
632                    return Err(Error::configuration(message));
633                }
634            }
635            results.extend(task_results);
636        }
637        Ok(results)
638    }
639
640    async fn execute_parallel(
641        &self,
642        prefix: &str,
643        tasks: &HashMap<String, TaskDefinition>,
644        all_tasks: &Tasks,
645    ) -> Result<Vec<TaskResult>> {
646        // Check for "default" task to override parallel execution
647        if let Some(default_task) = tasks.get("default") {
648            if !self.config.capture_output {
649                eprintln!("> Executing default task for group '{prefix}'");
650            }
651            // Execute only the default task, using the group prefix directly
652            // since "default" is implicit when invoking the group name
653            let task_name = format!("{}.default", prefix);
654            return self
655                .execute_definition(&task_name, default_task, all_tasks)
656                .await;
657        }
658
659        if !self.config.capture_output {
660            eprintln!(
661                "> Running parallel group: {prefix} (max_parallel={})",
662                self.config.max_parallel
663            );
664        }
665        let mut join_set = JoinSet::new();
666        let all_tasks = Arc::new(all_tasks.clone());
667        let mut all_results = Vec::new();
668        let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
669            if let Some(failed) = results.iter().find(|r| !r.success) {
670                let message = format!(
671                    "Parallel task group '{prefix}' halted.\n\n{}",
672                    summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
673                );
674                return Err(Error::configuration(message));
675            }
676            all_results.extend(results);
677            Ok(())
678        };
679        for (name, task_def) in tasks {
680            let task_name = format!("{}.{}", prefix, name);
681            let task_def = task_def.clone();
682            let all_tasks = Arc::clone(&all_tasks);
683            let executor = self.clone_with_config();
684            join_set.spawn(async move {
685                executor
686                    .execute_definition(&task_name, &task_def, &all_tasks)
687                    .await
688            });
689            if self.config.max_parallel > 0
690                && join_set.len() >= self.config.max_parallel
691                && let Some(result) = join_set.join_next().await
692            {
693                match result {
694                    Ok(Ok(results)) => merge_results(results)?,
695                    Ok(Err(e)) => return Err(e),
696                    Err(e) => {
697                        return Err(Error::configuration(format!(
698                            "Task execution panicked: {}",
699                            e
700                        )));
701                    }
702                }
703            }
704        }
705        while let Some(result) = join_set.join_next().await {
706            match result {
707                Ok(Ok(results)) => merge_results(results)?,
708                Ok(Err(e)) => return Err(e),
709                Err(e) => {
710                    return Err(Error::configuration(format!(
711                        "Task execution panicked: {}",
712                        e
713                    )));
714                }
715            }
716        }
717        Ok(all_results)
718    }
719
720    pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
721        let parallel_groups = graph.get_parallel_groups()?;
722        let mut all_results = Vec::new();
723        let mut join_set = JoinSet::new();
724        let mut group_iter = parallel_groups.into_iter();
725        let mut current_group = group_iter.next();
726        while current_group.is_some() || !join_set.is_empty() {
727            if let Some(group) = current_group.as_mut() {
728                while let Some(node) = group.pop() {
729                    let task = node.task.clone();
730                    let name = node.name.clone();
731                    let executor = self.clone_with_config();
732                    join_set.spawn(async move { executor.execute_task(&name, &task).await });
733                    if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
734                        break;
735                    }
736                }
737                if group.is_empty() {
738                    current_group = group_iter.next();
739                }
740            }
741            if let Some(result) = join_set.join_next().await {
742                match result {
743                    Ok(Ok(task_result)) => {
744                        if !task_result.success {
745                            let message = format!(
746                                "Task graph execution halted.\n\n{}",
747                                summarize_task_failure(&task_result, TASK_FAILURE_SNIPPET_LINES)
748                            );
749                            return Err(Error::configuration(message));
750                        }
751                        all_results.push(task_result);
752                    }
753                    Ok(Err(e)) => return Err(e),
754                    Err(e) => {
755                        return Err(Error::configuration(format!(
756                            "Task execution panicked: {}",
757                            e
758                        )));
759                    }
760                }
761            }
762        }
763        Ok(all_results)
764    }
765
766    async fn resolve_workspace(
767        &self,
768        _task_name: &str,
769        task: &Task,
770        workspace_name: &str,
771        config: &WorkspaceConfig,
772    ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
773        let root = self.config.project_root.clone();
774        let task_label = _task_name.to_string();
775        let command = task.command.clone();
776        let config_pm = config.package_manager.clone();
777        let config_root = config.root.clone();
778        // WorkspaceConfig doesn't support 'packages' filtering yet, so assume full workspace deps for now,
779        // or imply it from context. If we want package filtering, we need to add it to WorkspaceConfig
780        // or assume all deps.
781        // However, the previous logic used `packages` from WorkspaceInputs.
782        // For now, let's assume traversing all dependencies if not specified.
783        // But wait, `WorkspaceConfig` in schema doesn't have `packages`.
784        // So we probably default to "all relevant" or auto-infer from current directory if we are inside a package.
785        let mut packages = Vec::new();
786
787        // If we are in a subdirectory that matches a workspace member, we might want to infer that package.
788        // The previous logic did that if `packages` was empty.
789
790        let lockfile_override: Option<String> = None; // WorkspaceConfig doesn't have lockfile override yet.
791        // If we need it, we should add it to WorkspaceConfig. For now assume standard discovery.
792
793        let mut traverse_workspace_deps = true;
794
795        // Use spawn_blocking for heavy lifting
796        let workspace_name_owned = workspace_name.to_string();
797        tokio::task::spawn_blocking(move || {
798            let override_for_detection = lockfile_override.as_ref().map(|lock| {
799                let candidate = PathBuf::from(lock);
800                if candidate.is_absolute() {
801                    candidate
802                } else {
803                    root.join(lock)
804                }
805            });
806
807            // 1. Detect Package Manager
808            // Priority: 1. explicit config, 2. workspace name (if it matches a PM), 3. auto-detect
809            let manager = if let Some(pm_str) = config_pm {
810                match pm_str.as_str() {
811                    "npm" => PackageManager::Npm,
812                    "bun" => PackageManager::Bun,
813                    "pnpm" => PackageManager::Pnpm,
814                    "yarn" => PackageManager::YarnModern,
815                    "yarn-classic" => PackageManager::YarnClassic,
816                    "cargo" => PackageManager::Cargo,
817                    _ => {
818                        return Err(Error::configuration(format!(
819                            "Unknown package manager: {}",
820                            pm_str
821                        )));
822                    }
823                }
824            } else {
825                // Try to match workspace name to package manager
826                match workspace_name_owned.as_str() {
827                    "npm" => PackageManager::Npm,
828                    "bun" => PackageManager::Bun,
829                    "pnpm" => PackageManager::Pnpm,
830                    "yarn" => PackageManager::YarnModern,
831                    "cargo" => PackageManager::Cargo,
832                    _ => {
833                         // Auto-detect if name doesn't match
834                        // Priority: command hint -> lockfiles
835                        let hint = detect_from_command(&command);
836                        let detected = match detect_package_managers(&root) {
837                            Ok(list) => list,
838                            Err(e) => {
839                                if override_for_detection.is_some() {
840                                    Vec::new()
841                                } else {
842                                    return Err(Error::configuration(format!(
843                                        "Failed to detect package managers: {}",
844                                        e
845                                    )));
846                                }
847                            }
848                        };
849
850                        if let Some(h) = hint {
851                            if detected.contains(&h) {
852                                h
853                            } else if !detected.is_empty() {
854                                // Prefer detected files
855                                detected[0]
856                            } else if let Some(ref override_path) = override_for_detection {
857                                infer_manager_from_lockfile(override_path).ok_or_else(|| {
858                                    Error::configuration(
859                                        "Unable to infer package manager from lockfile override",
860                                    )
861                                })?
862                            } else {
863                                return Err(Error::configuration(
864                                    format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
865                                ));
866                            }
867                        } else if !detected.is_empty() {
868                            detected[0]
869                        } else if let Some(ref override_path) = override_for_detection {
870                            infer_manager_from_lockfile(override_path).ok_or_else(|| {
871                                Error::configuration(
872                                    "Unable to infer package manager from lockfile override",
873                                )
874                            })?
875                        } else {
876                            return Err(Error::configuration(
877                                "Could not detect package manager for workspace resolution",
878                            ));
879                        }
880                    }
881                }
882            };
883
884            // Resolve the actual workspace root by walking up until we find a
885            // directory that declares a workspace for this package manager.
886            // If config.root is set, use that.
887            let workspace_root = if let Some(root_override) = &config_root {
888                root.join(root_override)
889            } else {
890                find_workspace_root(manager, &root)
891            };
892
893            if task_trace_enabled() {
894                tracing::info!(
895                    task = %task_label,
896                    manager = %manager,
897                    project_root = %root.display(),
898                    workspace_root = %workspace_root.display(),
899                    "Resolved workspace root for package manager"
900                );
901            }
902
903            let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
904                let candidate = PathBuf::from(lock);
905                if candidate.is_absolute() {
906                    candidate
907                } else {
908                    workspace_root.join(lock)
909                }
910            });
911
912            // 2. Discover Workspace
913            let discovery: Box<dyn WorkspaceDiscovery> = match manager {
914                PackageManager::Npm
915                | PackageManager::Bun
916                | PackageManager::YarnClassic
917                | PackageManager::YarnModern => Box::new(PackageJsonDiscovery),
918                PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
919                PackageManager::Cargo => Box::new(CargoTomlDiscovery),
920            };
921
922            let workspace = discovery.discover(&workspace_root).map_err(|e| {
923                Error::configuration(format!("Failed to discover workspace: {}", e))
924            })?;
925
926            // 3. Parse Lockfile
927            let lockfile_path = if let Some(path) = lockfile_override_path {
928                if !path.exists() {
929                    return Err(Error::configuration(format!(
930                        "Workspace lockfile override does not exist: {}",
931                        path.display()
932                    )));
933                }
934                path
935            } else {
936                workspace.lockfile.clone().ok_or_else(|| {
937                    Error::configuration("Workspace resolution requires a lockfile")
938                })?
939            };
940
941            let parser: Box<dyn LockfileParser> = match manager {
942                PackageManager::Npm => Box::new(NpmLockfileParser),
943                PackageManager::Bun => Box::new(BunLockfileParser),
944                PackageManager::Pnpm => Box::new(PnpmLockfileParser),
945                PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
946                PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
947                PackageManager::Cargo => Box::new(CargoLockfileParser),
948            };
949
950            let entries = parser
951                .parse(&lockfile_path)
952                .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
953            if task_trace_enabled() {
954                tracing::info!(
955                    task = %task_label,
956                    lockfile = %lockfile_path.display(),
957                    members = entries.len(),
958                    "Parsed workspace lockfile"
959                );
960            }
961
962            // Compute lockfile hash
963            let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
964
965            // Infer packages when none explicitly provided by scoping to the
966            // current workspace member. (We intentionally avoid pulling all
967            // transitive deps here to keep hashing fast for large monorepos.)
968            if packages.is_empty() {
969                let current_member = workspace
970                    .members
971                    .iter()
972                    .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
973                if let Some(member) = current_member {
974                    let inferred = vec![member.name.clone()];
975                    if task_trace_enabled() {
976                        tracing::info!(
977                            task = %task_label,
978                            inferred_packages = ?inferred,
979                            "Inferred workspace packages from current project"
980                        );
981                    }
982                    packages = inferred;
983                    traverse_workspace_deps = true;
984                }
985            }
986
987            // 4. Collect Inputs
988            let mut member_paths = Vec::new();
989
990            // Always include workspace configuration files
991            member_paths.push(manager.workspace_config_name().to_string());
992            if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
993                member_paths.push(rel.to_string_lossy().to_string());
994            } else {
995                member_paths.push(lockfile_path.to_string_lossy().to_string());
996            }
997
998            if packages.is_empty() {
999                for member in &workspace.members {
1000                    let manifest_rel = member
1001                        .path
1002                        .join(manager.workspace_config_name());
1003                    member_paths.push(manifest_rel.to_string_lossy().to_string());
1004                }
1005            } else {
1006                let mut to_visit: Vec<String> = packages.clone();
1007                let mut visited = HashSet::new();
1008
1009                while let Some(pkg_name) = to_visit.pop() {
1010                    if visited.contains(&pkg_name) {
1011                        continue;
1012                    }
1013                    visited.insert(pkg_name.clone());
1014
1015                    if let Some(member) = workspace.find_member(&pkg_name) {
1016                        let manifest_rel = member
1017                            .path
1018                            .join(manager.workspace_config_name());
1019                        member_paths.push(manifest_rel.to_string_lossy().to_string());
1020
1021                        // Add dependencies when explicitly requested
1022                        if traverse_workspace_deps {
1023                            let mut dependency_candidates: HashSet<String> = HashSet::new();
1024
1025                            if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1026                                for dep in &entry.dependencies {
1027                                    if entries
1028                                        .iter()
1029                                        .any(|e| e.name == dep.name && e.is_workspace_member)
1030                                    {
1031                                        dependency_candidates.insert(dep.name.clone());
1032                                    }
1033                                }
1034                            }
1035
1036                            for dep_name in &member.dependencies {
1037                                if workspace.find_member(dep_name).is_some() {
1038                                    dependency_candidates.insert(dep_name.clone());
1039                                }
1040                            }
1041
1042                            for dep_name in dependency_candidates {
1043                                to_visit.push(dep_name);
1044                            }
1045                        }
1046                    }
1047                }
1048            }
1049
1050            if task_trace_enabled() {
1051                tracing::info!(
1052                    task = %task_label,
1053                    members = ?member_paths,
1054                    "Workspace input member paths selected"
1055                );
1056            }
1057
1058            Ok((workspace, entries, member_paths, Some(hash)))
1059        })
1060        .await
1061        .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1062    }
1063
1064    async fn materialize_workspace(
1065        &self,
1066        workspace: &Workspace,
1067        entries: &[LockfileEntry],
1068        target_dir: &Path,
1069    ) -> Result<()> {
1070        // Dispatch to appropriate materializer
1071        let materializer: Box<dyn Materializer> = match workspace.manager {
1072            PackageManager::Npm
1073            | PackageManager::Bun
1074            | PackageManager::Pnpm
1075            | PackageManager::YarnClassic
1076            | PackageManager::YarnModern => Box::new(NodeModulesMaterializer),
1077            PackageManager::Cargo => Box::new(CargoMaterializer),
1078        };
1079
1080        materializer
1081            .materialize(workspace, entries, target_dir)
1082            .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1083    }
1084
1085    fn clone_with_config(&self) -> Self {
1086        Self {
1087            config: self.config.clone(),
1088        }
1089    }
1090}
1091
1092fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1093    let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1094
1095    loop {
1096        let is_root = match manager {
1097            PackageManager::Npm
1098            | PackageManager::Bun
1099            | PackageManager::YarnClassic
1100            | PackageManager::YarnModern => package_json_has_workspaces(&current),
1101            PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1102            PackageManager::Cargo => cargo_toml_has_workspace(&current),
1103        };
1104
1105        if is_root {
1106            return current;
1107        }
1108
1109        if let Some(parent) = current.parent() {
1110            current = parent.to_path_buf();
1111        } else {
1112            return start.to_path_buf();
1113        }
1114    }
1115}
1116
1117fn package_json_has_workspaces(dir: &Path) -> bool {
1118    let path = dir.join("package.json");
1119    let content = std::fs::read_to_string(&path);
1120    let Ok(json) = content.and_then(|s| {
1121        serde_json::from_str::<serde_json::Value>(&s)
1122            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1123    }) else {
1124        return false;
1125    };
1126
1127    match json.get("workspaces") {
1128        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1129        Some(serde_json::Value::Object(map)) => map
1130            .get("packages")
1131            .and_then(|packages| packages.as_array())
1132            .map(|arr| !arr.is_empty())
1133            .unwrap_or(false),
1134        _ => false,
1135    }
1136}
1137
1138fn cargo_toml_has_workspace(dir: &Path) -> bool {
1139    let path = dir.join("Cargo.toml");
1140    let Ok(content) = std::fs::read_to_string(&path) else {
1141        return false;
1142    };
1143
1144    content.contains("[workspace]")
1145}
1146
1147fn task_trace_enabled() -> bool {
1148    static ENABLED: OnceLock<bool> = OnceLock::new();
1149    *ENABLED.get_or_init(|| {
1150        matches!(
1151            std::env::var("CUENV_TRACE_TASKS")
1152                .unwrap_or_default()
1153                .trim()
1154                .to_ascii_lowercase()
1155                .as_str(),
1156            "1" | "true" | "yes" | "on"
1157        )
1158    })
1159}
1160
1161/// Build a compact, user-friendly summary for a failed task, including the
1162/// exit code and the tail of stdout/stderr to help with diagnostics.
1163pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1164    let exit_code = result
1165        .exit_code
1166        .map(|c| c.to_string())
1167        .unwrap_or_else(|| "unknown".to_string());
1168
1169    let mut sections = Vec::new();
1170    sections.push(format!(
1171        "Task '{}' failed with exit code {}.",
1172        result.name, exit_code
1173    ));
1174
1175    let output = format_failure_streams(result, max_output_lines);
1176    if output.is_empty() {
1177        sections.push(
1178            "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1179                .to_string(),
1180        );
1181    } else {
1182        sections.push(output);
1183    }
1184
1185    sections.join("\n\n")
1186}
1187
1188fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1189    let mut streams = Vec::new();
1190
1191    if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1192        streams.push(stdout);
1193    }
1194
1195    if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1196        streams.push(stderr);
1197    }
1198
1199    streams.join("\n\n")
1200}
1201
1202fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1203    let normalized = content.trim_end();
1204    if normalized.is_empty() {
1205        return None;
1206    }
1207
1208    let lines: Vec<&str> = normalized.lines().collect();
1209    let total = lines.len();
1210    let start = total.saturating_sub(max_output_lines);
1211    let snippet = lines[start..].join("\n");
1212
1213    let header = if total > max_output_lines {
1214        format!("{label} (last {max_output_lines} of {total} lines):")
1215    } else {
1216        format!("{label}:")
1217    };
1218
1219    Some(format!("{header}\n{snippet}"))
1220}
1221
1222fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1223    match path.file_name().and_then(|n| n.to_str())? {
1224        "package-lock.json" => Some(PackageManager::Npm),
1225        "bun.lock" => Some(PackageManager::Bun),
1226        "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1227        "yarn.lock" => Some(PackageManager::YarnModern),
1228        "Cargo.lock" => Some(PackageManager::Cargo),
1229        _ => None,
1230    }
1231}
1232
1233fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1234    // Use OS temp dir; name scoped by task and cache key prefix.
1235    // IMPORTANT: Ensure the workdir is clean on every run to preserve hermeticity.
1236    let sanitized_task = task_name
1237        .chars()
1238        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1239        .collect::<String>();
1240
1241    let base = std::env::temp_dir().join(format!(
1242        "cuenv-work-{}-{}",
1243        sanitized_task,
1244        &key[..12.min(key.len())]
1245    ));
1246
1247    // If a directory from a previous run exists, remove it before reuse.
1248    // This avoids contamination from artifacts left by failed runs where no cache was saved.
1249    if base.exists()
1250        && let Err(e) = std::fs::remove_dir_all(&base)
1251    {
1252        // If we cannot remove the previous directory (e.g. in-use on Windows),
1253        // fall back to a unique, fresh directory to maintain hermetic execution.
1254        let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1255        let fallback = std::env::temp_dir().join(format!(
1256            "cuenv-work-{}-{}-{}",
1257            sanitized_task,
1258            &key[..12.min(key.len())],
1259            ts
1260        ));
1261        tracing::warn!(
1262            previous = %base.display(),
1263            fallback = %fallback.display(),
1264            error = %e,
1265            "Failed to clean previous hermetic workdir; using fresh fallback directory"
1266        );
1267        std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1268            source: e,
1269            path: Some(fallback.clone().into()),
1270            operation: "create_dir_all".into(),
1271        })?;
1272        return Ok(fallback);
1273    }
1274
1275    std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1276        source: e,
1277        path: Some(base.clone().into()),
1278        operation: "create_dir_all".into(),
1279    })?;
1280    Ok(base)
1281}
1282
1283/// Execute an arbitrary command with the cuenv environment
1284pub async fn execute_command(
1285    command: &str,
1286    args: &[String],
1287    environment: &Environment,
1288) -> Result<i32> {
1289    tracing::info!("Executing command: {} {:?}", command, args);
1290    let mut cmd = Command::new(command);
1291    cmd.args(args);
1292    let env_vars = environment.merge_with_system();
1293    for (key, value) in env_vars {
1294        cmd.env(key, value);
1295    }
1296    cmd.stdout(Stdio::inherit());
1297    cmd.stderr(Stdio::inherit());
1298    cmd.stdin(Stdio::inherit());
1299    let status = cmd.status().await.map_err(|e| {
1300        Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1301    })?;
1302    Ok(status.code().unwrap_or(1))
1303}
1304
1305#[cfg(test)]
1306mod tests {
1307    use super::*;
1308    use std::fs;
1309    use tempfile::TempDir;
1310
1311    #[tokio::test]
1312    async fn test_executor_config_default() {
1313        let config = ExecutorConfig::default();
1314        assert!(!config.capture_output);
1315        assert_eq!(config.max_parallel, 0);
1316        assert!(config.environment.is_empty());
1317    }
1318
1319    #[tokio::test]
1320    async fn test_task_result() {
1321        let result = TaskResult {
1322            name: "test".to_string(),
1323            exit_code: Some(0),
1324            stdout: "output".to_string(),
1325            stderr: String::new(),
1326            success: true,
1327        };
1328        assert_eq!(result.name, "test");
1329        assert_eq!(result.exit_code, Some(0));
1330        assert!(result.success);
1331        assert_eq!(result.stdout, "output");
1332    }
1333
1334    #[tokio::test]
1335    async fn test_execute_simple_task() {
1336        let config = ExecutorConfig {
1337            capture_output: true,
1338            ..Default::default()
1339        };
1340        let executor = TaskExecutor::new(config);
1341        let task = Task {
1342            command: "echo".to_string(),
1343            args: vec!["hello".to_string()],
1344            shell: None,
1345            env: HashMap::new(),
1346            depends_on: vec![],
1347            inputs: vec![],
1348            outputs: vec![],
1349            external_inputs: None,
1350            workspaces: vec![],
1351            description: Some("Hello task".to_string()),
1352        };
1353        let result = executor.execute_task("test", &task).await.unwrap();
1354        assert!(result.success);
1355        assert_eq!(result.exit_code, Some(0));
1356        assert!(result.stdout.contains("hello"));
1357    }
1358
1359    #[tokio::test]
1360    async fn test_execute_with_environment() {
1361        let mut config = ExecutorConfig {
1362            capture_output: true,
1363            ..Default::default()
1364        };
1365        config
1366            .environment
1367            .set("TEST_VAR".to_string(), "test_value".to_string());
1368        let executor = TaskExecutor::new(config);
1369        let task = Task {
1370            command: "printenv".to_string(),
1371            args: vec!["TEST_VAR".to_string()],
1372            shell: None,
1373            env: HashMap::new(),
1374            depends_on: vec![],
1375            inputs: vec![],
1376            outputs: vec![],
1377            external_inputs: None,
1378            workspaces: vec![],
1379            description: Some("Print env task".to_string()),
1380        };
1381        let result = executor.execute_task("test", &task).await.unwrap();
1382        assert!(result.success);
1383        assert!(result.stdout.contains("test_value"));
1384    }
1385
1386    #[tokio::test]
1387    async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1388        let tmp = TempDir::new().unwrap();
1389        let root = tmp.path();
1390
1391        // Workspace root with workspaces + lockfile
1392        fs::write(
1393            root.join("package.json"),
1394            r#"{
1395  "name": "root-app",
1396  "version": "0.0.0",
1397  "workspaces": ["packages/*", "apps/*"],
1398  "dependencies": {
1399    "@rawkodeacademy/content-technologies": "workspace:*"
1400  }
1401}"#,
1402        )
1403        .unwrap();
1404        // Deliberately omit the workspace member name for apps/site to mimic lockfiles
1405        // that only record member paths, ensuring we can still discover dependencies.
1406        fs::write(
1407            root.join("bun.lock"),
1408            r#"{
1409  "lockfileVersion": 1,
1410  "workspaces": {
1411    "": {
1412      "name": "root-app",
1413      "dependencies": {
1414        "@rawkodeacademy/content-technologies": "workspace:*"
1415      }
1416    },
1417    "packages/content-technologies": {
1418      "name": "@rawkodeacademy/content-technologies",
1419      "version": "0.0.1"
1420    },
1421    "apps/site": {
1422      "version": "0.0.0",
1423      "dependencies": {
1424        "@rawkodeacademy/content-technologies": "workspace:*"
1425      }
1426    }
1427  },
1428  "packages": {}
1429}"#,
1430        )
1431        .unwrap();
1432
1433        // Workspace member packages
1434        fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1435        fs::write(
1436            root.join("packages/content-technologies/package.json"),
1437            r#"{
1438  "name": "@rawkodeacademy/content-technologies",
1439  "version": "0.0.1"
1440}"#,
1441        )
1442        .unwrap();
1443
1444        fs::create_dir_all(root.join("apps/site")).unwrap();
1445        fs::write(
1446            root.join("apps/site/package.json"),
1447            r#"{
1448  "name": "site",
1449  "version": "0.0.0",
1450  "dependencies": {
1451    "@rawkodeacademy/content-technologies": "workspace:*"
1452  }
1453}"#,
1454        )
1455        .unwrap();
1456
1457        let mut workspaces = HashMap::new();
1458        workspaces.insert(
1459            "bun".to_string(),
1460            WorkspaceConfig {
1461                enabled: true,
1462                package_manager: Some("bun".to_string()),
1463                root: None,
1464            },
1465        );
1466
1467        let config = ExecutorConfig {
1468            capture_output: true,
1469            project_root: root.join("apps/site"),
1470            workspaces: Some(workspaces),
1471            ..Default::default()
1472        };
1473        let executor = TaskExecutor::new(config);
1474
1475        let task = Task {
1476            command: "sh".to_string(),
1477            args: vec![
1478                "-c".to_string(),
1479                "find ../.. -maxdepth 4 -type d | sort".to_string(),
1480            ],
1481            shell: None,
1482            env: HashMap::new(),
1483            depends_on: vec![],
1484            inputs: vec!["package.json".to_string()],
1485            outputs: vec![],
1486            external_inputs: None,
1487            workspaces: vec!["bun".to_string()],
1488            description: None,
1489        };
1490
1491        let result = executor.execute_task("install", &task).await.unwrap();
1492        assert!(
1493            result.success,
1494            "command failed stdout='{}' stderr='{}'",
1495            result.stdout, result.stderr
1496        );
1497        assert!(
1498            result
1499                .stdout
1500                .split_whitespace()
1501                .any(|line| line.ends_with("packages/content-technologies")),
1502            "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1503            result.stdout,
1504            result.stderr
1505        );
1506    }
1507
1508    #[tokio::test]
1509    async fn test_execute_failing_task() {
1510        let config = ExecutorConfig {
1511            capture_output: true,
1512            ..Default::default()
1513        };
1514        let executor = TaskExecutor::new(config);
1515        let task = Task {
1516            command: "false".to_string(),
1517            args: vec![],
1518            shell: None,
1519            env: HashMap::new(),
1520            depends_on: vec![],
1521            inputs: vec![],
1522            outputs: vec![],
1523            external_inputs: None,
1524            workspaces: vec![],
1525            description: Some("Failing task".to_string()),
1526        };
1527        let result = executor.execute_task("test", &task).await.unwrap();
1528        assert!(!result.success);
1529        assert_eq!(result.exit_code, Some(1));
1530    }
1531
1532    #[tokio::test]
1533    async fn test_execute_sequential_group() {
1534        let config = ExecutorConfig {
1535            capture_output: true,
1536            ..Default::default()
1537        };
1538        let executor = TaskExecutor::new(config);
1539        let task1 = Task {
1540            command: "echo".to_string(),
1541            args: vec!["first".to_string()],
1542            shell: None,
1543            env: HashMap::new(),
1544            depends_on: vec![],
1545            inputs: vec![],
1546            outputs: vec![],
1547            external_inputs: None,
1548            workspaces: vec![],
1549            description: Some("First task".to_string()),
1550        };
1551        let task2 = Task {
1552            command: "echo".to_string(),
1553            args: vec!["second".to_string()],
1554            shell: None,
1555            env: HashMap::new(),
1556            depends_on: vec![],
1557            inputs: vec![],
1558            outputs: vec![],
1559            external_inputs: None,
1560            workspaces: vec![],
1561            description: Some("Second task".to_string()),
1562        };
1563        let group = TaskGroup::Sequential(vec![
1564            TaskDefinition::Single(Box::new(task1)),
1565            TaskDefinition::Single(Box::new(task2)),
1566        ]);
1567        let all_tasks = Tasks::new();
1568        let results = executor
1569            .execute_group("seq", &group, &all_tasks)
1570            .await
1571            .unwrap();
1572        assert_eq!(results.len(), 2);
1573        assert!(results[0].stdout.contains("first"));
1574        assert!(results[1].stdout.contains("second"));
1575    }
1576
1577    #[tokio::test]
1578    async fn test_command_injection_prevention() {
1579        let config = ExecutorConfig {
1580            capture_output: true,
1581            ..Default::default()
1582        };
1583        let executor = TaskExecutor::new(config);
1584        let malicious_task = Task {
1585            command: "echo".to_string(),
1586            args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1587            shell: None,
1588            env: HashMap::new(),
1589            depends_on: vec![],
1590            inputs: vec![],
1591            outputs: vec![],
1592            external_inputs: None,
1593            workspaces: vec![],
1594            description: Some("Malicious task test".to_string()),
1595        };
1596        let result = executor
1597            .execute_task("malicious", &malicious_task)
1598            .await
1599            .unwrap();
1600        assert!(result.success);
1601        assert!(result.stdout.contains("hello ; rm -rf /"));
1602    }
1603
1604    #[tokio::test]
1605    async fn test_special_characters_in_args() {
1606        let config = ExecutorConfig {
1607            capture_output: true,
1608            ..Default::default()
1609        };
1610        let executor = TaskExecutor::new(config);
1611        let special_chars = vec![
1612            "$USER",
1613            "$(whoami)",
1614            "`whoami`",
1615            "&& echo hacked",
1616            "|| echo failed",
1617            "> /tmp/hack",
1618            "| cat",
1619        ];
1620        for special_arg in special_chars {
1621            let task = Task {
1622                command: "echo".to_string(),
1623                args: vec!["safe".to_string(), special_arg.to_string()],
1624                shell: None,
1625                env: HashMap::new(),
1626                depends_on: vec![],
1627                inputs: vec![],
1628                outputs: vec![],
1629                external_inputs: None,
1630                workspaces: vec![],
1631                description: Some("Special character test".to_string()),
1632            };
1633            let result = executor.execute_task("special", &task).await.unwrap();
1634            assert!(result.success);
1635            assert!(result.stdout.contains("safe"));
1636            assert!(result.stdout.contains(special_arg));
1637        }
1638    }
1639
1640    #[tokio::test]
1641    async fn test_environment_variable_safety() {
1642        let mut config = ExecutorConfig {
1643            capture_output: true,
1644            ..Default::default()
1645        };
1646        config
1647            .environment
1648            .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1649        let executor = TaskExecutor::new(config);
1650        let task = Task {
1651            command: "printenv".to_string(),
1652            args: vec!["DANGEROUS_VAR".to_string()],
1653            shell: None,
1654            env: HashMap::new(),
1655            depends_on: vec![],
1656            inputs: vec![],
1657            outputs: vec![],
1658            external_inputs: None,
1659            workspaces: vec![],
1660            description: Some("Environment variable safety test".to_string()),
1661        };
1662        let result = executor.execute_task("env_test", &task).await.unwrap();
1663        assert!(result.success);
1664        assert!(result.stdout.contains("; rm -rf /"));
1665    }
1666
1667    #[tokio::test]
1668    async fn test_execute_graph_parallel_groups() {
1669        // two independent tasks -> can run in same parallel group
1670        let config = ExecutorConfig {
1671            capture_output: true,
1672            max_parallel: 2,
1673            ..Default::default()
1674        };
1675        let executor = TaskExecutor::new(config);
1676        let mut graph = TaskGraph::new();
1677
1678        let t1 = Task {
1679            command: "echo".into(),
1680            args: vec!["A".into()],
1681            shell: None,
1682            env: HashMap::new(),
1683            depends_on: vec![],
1684            inputs: vec![],
1685            outputs: vec![],
1686            external_inputs: None,
1687            workspaces: vec![],
1688            description: None,
1689        };
1690        let t2 = Task {
1691            command: "echo".into(),
1692            args: vec!["B".into()],
1693            shell: None,
1694            env: HashMap::new(),
1695            depends_on: vec![],
1696            inputs: vec![],
1697            outputs: vec![],
1698            external_inputs: None,
1699            workspaces: vec![],
1700            description: None,
1701        };
1702
1703        graph.add_task("t1", t1).unwrap();
1704        graph.add_task("t2", t2).unwrap();
1705        let results = executor.execute_graph(&graph).await.unwrap();
1706        assert_eq!(results.len(), 2);
1707        let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1708        assert!(joined.contains("A") && joined.contains("B"));
1709    }
1710}