cuenv_core/tasks/
executor.rs

1//! Task executor for running tasks with environment support and hermetic, input-addressed execution
2//!
3//! - Environment variable propagation
4//! - Parallel and sequential execution
5//! - Hermetic workdir populated from declared inputs (files/dirs/globs)
6//! - Persistent task result cache keyed by inputs + command + env + cuenv version + platform
7
8use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
9use super::{ParallelGroup, Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
10use crate::cache::tasks as task_cache;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::manifest::WorkspaceConfig;
14use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
15use crate::{Error, Result};
16use async_recursion::async_recursion;
17use chrono::Utc;
18use cuenv_workspaces::{
19    BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
20    NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
21    PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
22    YarnModernLockfileParser, detect_from_command, detect_package_managers,
23    materializer::{
24        Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
25    },
26};
27use std::collections::{BTreeMap, HashMap, HashSet};
28use std::path::{Path, PathBuf};
29use std::process::Stdio;
30use std::sync::{Arc, OnceLock};
31use tokio::process::Command;
32use tokio::task::JoinSet;
33
34/// Task execution result
35#[derive(Debug, Clone)]
36pub struct TaskResult {
37    pub name: String,
38    pub exit_code: Option<i32>,
39    pub stdout: String,
40    pub stderr: String,
41    pub success: bool,
42}
43
44/// Number of lines from stdout/stderr to include when summarizing failures
45pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
46
47/// Task executor configuration
48#[derive(Debug, Clone)]
49pub struct ExecutorConfig {
50    /// Whether to capture output (vs streaming to stdout/stderr)
51    pub capture_output: bool,
52    /// Maximum parallel tasks (0 = unlimited)
53    pub max_parallel: usize,
54    /// Environment variables to propagate (resolved via policies)
55    pub environment: Environment,
56    /// Optional working directory override (for hermetic execution)
57    pub working_dir: Option<PathBuf>,
58    /// Project root for resolving inputs/outputs (env.cue root)
59    pub project_root: PathBuf,
60    /// Path to cue.mod root for resolving relative source paths
61    pub cue_module_root: Option<PathBuf>,
62    /// Optional: materialize cached outputs on cache hit
63    pub materialize_outputs: Option<PathBuf>,
64    /// Optional: cache directory override
65    pub cache_dir: Option<PathBuf>,
66    /// Optional: print cache path on hits/misses
67    pub show_cache_path: bool,
68    /// Global workspace configuration
69    pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
70    /// Backend configuration
71    pub backend_config: Option<BackendConfig>,
72    /// CLI backend selection override
73    pub cli_backend: Option<String>,
74}
75
76impl Default for ExecutorConfig {
77    fn default() -> Self {
78        Self {
79            capture_output: false,
80            max_parallel: 0,
81            environment: Environment::new(),
82            working_dir: None,
83            project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
84            cue_module_root: None,
85            materialize_outputs: None,
86            cache_dir: None,
87            show_cache_path: false,
88            workspaces: None,
89            backend_config: None,
90            cli_backend: None,
91        }
92    }
93}
94
95/// Task executor
96pub struct TaskExecutor {
97    config: ExecutorConfig,
98    backend: Arc<dyn TaskBackend>,
99}
100impl TaskExecutor {
101    /// Create a new executor with host backend only
102    pub fn new(config: ExecutorConfig) -> Self {
103        Self::with_dagger_factory(config, None)
104    }
105
106    /// Create a new executor with optional dagger backend support.
107    ///
108    /// Pass `Some(cuenv_dagger::create_dagger_backend)` to enable dagger backend.
109    pub fn with_dagger_factory(
110        config: ExecutorConfig,
111        dagger_factory: Option<BackendFactory>,
112    ) -> Self {
113        let backend = create_backend_with_factory(
114            config.backend_config.as_ref(),
115            config.project_root.clone(),
116            config.cli_backend.as_deref(),
117            dagger_factory,
118        );
119        Self { config, backend }
120    }
121
122    /// Create a new executor with the given config but sharing the backend
123    fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
124        Self { config, backend }
125    }
126
127    /// Execute a single task
128    pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
129        // Delegate execution to the configured backend
130        // For now, we pass the project root as the execution context.
131        // Hermetic execution logic is currently bypassed in favor of direct execution
132        // as per the current codebase state (see original TODOs).
133        // The backend implementation handles the specific execution details.
134
135        // If using Dagger backend, we skip the workspace resolution and hermetic setup for now
136        // as it operates in a container.
137        if self.backend.name() == "dagger" {
138            return self
139                .backend
140                .execute(
141                    name,
142                    task,
143                    &self.config.environment,
144                    &self.config.project_root,
145                    self.config.capture_output,
146                )
147                .await;
148        }
149
150        // All tasks run directly in workspace/project root (hermetic sandbox disabled)
151        return self.execute_task_non_hermetic(name, task).await;
152
153        // TODO: Re-enable hermetic execution when sandbox properly preserves monorepo structure
154        #[allow(unreachable_code)]
155        {
156            // Resolve workspace dependencies if enabled
157            let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
158            let mut workspace_input_patterns = Vec::new();
159            let mut workspace_lockfile_hashes = BTreeMap::new();
160
161            for workspace_name in &task.workspaces {
162                if let Some(global_workspaces) = &self.config.workspaces {
163                    if let Some(ws_config) = global_workspaces.get(workspace_name) {
164                        if ws_config.enabled {
165                            let (ws, entries, paths, hash) = self
166                                .resolve_workspace(name, task, workspace_name, ws_config)
167                                .await?;
168                            workspace_ctxs.push((ws, entries));
169                            workspace_input_patterns.extend(paths);
170                            if let Some(h) = hash {
171                                workspace_lockfile_hashes.insert(workspace_name.clone(), h);
172                            }
173                        }
174                    } else {
175                        tracing::warn!(
176                            task = %name,
177                            workspace = %workspace_name,
178                            "Workspace not found in global configuration"
179                        );
180                    }
181                }
182            }
183
184            // Resolve inputs relative to the workspace root (if any) while keeping
185            // project-scoped inputs anchored to the original project path.
186            // Use the first workspace as the primary root if multiple are present,
187            // or fallback to project root. This might need refinement for multi-workspace tasks.
188            let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
189
190            let project_prefix = primary_workspace_root
191                .as_ref()
192                .and_then(|root| self.config.project_root.strip_prefix(root).ok())
193                .map(|p| p.to_path_buf());
194            let input_root = primary_workspace_root
195                .clone()
196                .unwrap_or_else(|| self.config.project_root.clone());
197
198            let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
199            let resolved_inputs = {
200                let _g = span_inputs.enter();
201                let resolver = InputResolver::new(&input_root);
202                let mut all_inputs = task.collect_all_inputs_with_prefix(project_prefix.as_deref());
203                all_inputs.extend(workspace_input_patterns.iter().cloned());
204                resolver.resolve(&all_inputs)?
205            };
206            if task_trace_enabled() {
207                tracing::info!(
208                    task = %name,
209                    input_root = %input_root.display(),
210                    project_root = %self.config.project_root.display(),
211                    inputs_count = resolved_inputs.files.len(),
212                    workspace_inputs = workspace_input_patterns.len(),
213                    "Resolved task inputs"
214                );
215            }
216
217            // Build cache key envelope
218            let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
219            // Ensure deterministic order is already guaranteed by BTreeMap
220            let env_summary: BTreeMap<String, String> = self
221                .config
222                .environment
223                .vars
224                .iter()
225                .map(|(k, v)| (k.clone(), v.clone()))
226                .collect();
227            let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
228            let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
229            let shell_json = serde_json::to_value(&task.shell).ok();
230            let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
231                None
232            } else {
233                Some(workspace_lockfile_hashes)
234            };
235
236            let envelope = task_cache::CacheKeyEnvelope {
237                inputs: inputs_summary.clone(),
238                command: task.command.clone(),
239                args: task.args.clone(),
240                shell: shell_json,
241                env: env_summary.clone(),
242                cuenv_version: cuenv_version.clone(),
243                platform: platform.clone(),
244                workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
245                // Package hashes are implicitly included in inputs_summary because we added member paths to inputs
246                workspace_package_hashes: None,
247            };
248            let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
249
250            // Cache lookup
251            let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
252            let cache_hit = {
253                let _g = span_cache.enter();
254                task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
255            };
256
257            if let Some(hit) = cache_hit {
258                tracing::info!(
259                    task = %name,
260                    key = %cache_key,
261                    path = %hit.path.display(),
262                    "Task {} cache hit: {}. Skipping execution.",
263                    name,
264                    cache_key
265                );
266                if self.config.show_cache_path {
267                    tracing::info!(cache_path = %hit.path.display(), "Cache path");
268                }
269                if let Some(dest) = &self.config.materialize_outputs {
270                    let count = task_cache::materialize_outputs(
271                        &cache_key,
272                        dest,
273                        self.config.cache_dir.as_deref(),
274                    )?;
275                    tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
276                }
277                // On cache hit, surface cached logs so behavior matches a fresh
278                // execution from the caller's perspective. We return them in the
279                // TaskResult even if `capture_output` is false, allowing callers
280                // (like the CLI) to print cached logs explicitly.
281                let stdout_path = hit.path.join("logs").join("stdout.log");
282                let stderr_path = hit.path.join("logs").join("stderr.log");
283                let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
284                let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
285                // If logs are present, we can return immediately. If logs are
286                // missing (older cache format), fall through to execute to
287                // backfill logs for future hits.
288                if !(stdout.is_empty() && stderr.is_empty()) {
289                    if !self.config.capture_output {
290                        cuenv_events::emit_task_cache_hit!(name, cache_key);
291                        // Emit cached logs as output events
292                        if !stdout.is_empty() {
293                            cuenv_events::emit_task_output!(name, "stdout", stdout);
294                        }
295                        if !stderr.is_empty() {
296                            cuenv_events::emit_task_output!(name, "stderr", stderr);
297                        }
298                    }
299                    return Ok(TaskResult {
300                        name: name.to_string(),
301                        exit_code: Some(0),
302                        stdout,
303                        stderr,
304                        success: true,
305                    });
306                } else {
307                    tracing::info!(
308                        task = %name,
309                        key = %cache_key,
310                        "Cache entry lacks logs; executing to backfill logs"
311                    );
312                }
313            }
314
315            tracing::info!(
316                task = %name,
317                key = %cache_key,
318                "Task {} executing hermetically… key {}",
319                name,
320                cache_key
321            );
322
323            let hermetic_root = create_hermetic_dir(name, &cache_key)?;
324            if self.config.show_cache_path {
325                tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
326            }
327
328            // Seed working directory with inputs
329            let span_populate =
330                tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
331            {
332                let _g = span_populate.enter();
333                populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
334            }
335
336            // Materialize workspace artifacts (node_modules, target)
337            for (ws, entries) in workspace_ctxs {
338                self.materialize_workspace(&ws, &entries, &hermetic_root)
339                    .await?;
340            }
341
342            // Materialize outputs from inputsFrom tasks
343            if let Some(ref inputs_from) = task.inputs_from {
344                for task_output in inputs_from {
345                    let source_task = &task_output.task;
346                    let source_cache_key = task_cache::lookup_latest(
347                        &self.config.project_root,
348                        source_task,
349                        self.config.cache_dir.as_deref(),
350                    )
351                    .ok_or_else(|| {
352                        Error::configuration(format!(
353                            "Task '{}' depends on outputs from '{}' but no cached result found. \
354                         Ensure '{}' runs before this task (add it to dependsOn).",
355                            name, source_task, source_task
356                        ))
357                    })?;
358
359                    // Materialize outputs into the hermetic directory
360                    let materialized = task_cache::materialize_outputs(
361                        &source_cache_key,
362                        &hermetic_root,
363                        self.config.cache_dir.as_deref(),
364                    )?;
365                    tracing::info!(
366                        task = %name,
367                        source_task = %source_task,
368                        materialized = materialized,
369                        "Materialized outputs from dependent task"
370                    );
371                }
372            }
373
374            // Initial snapshot to detect undeclared writes
375            let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
376
377            // Build command - handle script mode vs command mode
378            let mut cmd = if let Some(script) = &task.script {
379                // Script mode: use shell to execute the script
380                let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
381                    (
382                        shell.command.clone().unwrap_or_else(|| "bash".to_string()),
383                        shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
384                    )
385                } else {
386                    // Default to bash for scripts
387                    ("bash".to_string(), "-c".to_string())
388                };
389
390                let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
391                let mut cmd = Command::new(&resolved_shell);
392                cmd.arg(&shell_flag);
393                cmd.arg(script);
394                cmd
395            } else {
396                // Command mode: existing behavior
397                // Resolve command path using the environment's PATH.
398                // This is necessary because when spawning a process, the OS looks up
399                // the executable in the current process's PATH, not the environment
400                // that will be set on the child process.
401                let resolved_command = self.config.environment.resolve_command(&task.command);
402
403                if let Some(shell) = &task.shell {
404                    if shell.command.is_some() && shell.flag.is_some() {
405                        let shell_command = shell.command.as_ref().unwrap();
406                        let shell_flag = shell.flag.as_ref().unwrap();
407                        // Resolve shell command too
408                        let resolved_shell = self.config.environment.resolve_command(shell_command);
409                        let mut cmd = Command::new(&resolved_shell);
410                        cmd.arg(shell_flag);
411                        if task.args.is_empty() {
412                            cmd.arg(&resolved_command);
413                        } else {
414                            let full_command = if task.command.is_empty() {
415                                task.args.join(" ")
416                            } else {
417                                format!("{} {}", resolved_command, task.args.join(" "))
418                            };
419                            cmd.arg(full_command);
420                        }
421                        cmd
422                    } else {
423                        let mut cmd = Command::new(&resolved_command);
424                        for arg in &task.args {
425                            cmd.arg(arg);
426                        }
427                        cmd
428                    }
429                } else {
430                    let mut cmd = Command::new(&resolved_command);
431                    for arg in &task.args {
432                        cmd.arg(arg);
433                    }
434                    cmd
435                }
436            };
437
438            let workdir = if let Some(dir) = &self.config.working_dir {
439                dir.clone()
440            } else if let Some(prefix) = project_prefix.as_ref() {
441                hermetic_root.join(prefix)
442            } else {
443                hermetic_root.clone()
444            };
445            std::fs::create_dir_all(&workdir).map_err(|e| Error::Io {
446                source: e,
447                path: Some(workdir.clone().into()),
448                operation: "create_dir_all".into(),
449            })?;
450            cmd.current_dir(&workdir);
451            // Set environment variables (resolved + system), set CWD
452            let env_vars = self.config.environment.merge_with_system();
453            if task_trace_enabled() {
454                tracing::info!(
455                    task = %name,
456                    hermetic_root = %hermetic_root.display(),
457                    workdir = %workdir.display(),
458                    command = %task.command,
459                    args = ?task.args,
460                    env_count = env_vars.len(),
461                    "Launching task command"
462                );
463            }
464            for (k, v) in env_vars {
465                cmd.env(k, v);
466            }
467
468            // Configure output: always capture to ensure consistent behavior on
469            // cache hits and allow callers to decide what to print.
470            cmd.stdout(Stdio::piped());
471            cmd.stderr(Stdio::piped());
472
473            let stream_logs = !self.config.capture_output;
474            if stream_logs {
475                let cmd_str = if task.command.is_empty() {
476                    task.args.join(" ")
477                } else {
478                    format!("{} {}", task.command, task.args.join(" "))
479                };
480                cuenv_events::emit_task_started!(name, cmd_str, true);
481            }
482
483            let start = std::time::Instant::now();
484            let mut child = cmd.spawn().map_err(|e| {
485                Error::configuration(format!("Failed to spawn task '{}': {}", name, e))
486            })?;
487
488            let stdout_handle = child.stdout.take();
489            let stderr_handle = child.stderr.take();
490
491            let stdout_task = async move {
492                if let Some(mut stdout) = stdout_handle {
493                    let mut output = Vec::new();
494                    let mut buf = [0u8; 4096];
495                    use std::io::Write;
496                    use tokio::io::AsyncReadExt;
497
498                    loop {
499                        match stdout.read(&mut buf).await {
500                            Ok(0) => break, // EOF
501                            Ok(n) => {
502                                let chunk = &buf[0..n];
503                                if stream_logs {
504                                    let mut handle = std::io::stdout().lock();
505                                    let _ = handle.write_all(chunk);
506                                    let _ = handle.flush();
507                                }
508                                output.extend_from_slice(chunk);
509                            }
510                            Err(_) => break,
511                        }
512                    }
513                    String::from_utf8_lossy(&output).to_string()
514                } else {
515                    String::new()
516                }
517            };
518
519            let stderr_task = async move {
520                if let Some(mut stderr) = stderr_handle {
521                    let mut output = Vec::new();
522                    let mut buf = [0u8; 4096];
523                    use std::io::Write;
524                    use tokio::io::AsyncReadExt;
525
526                    loop {
527                        match stderr.read(&mut buf).await {
528                            Ok(0) => break, // EOF
529                            Ok(n) => {
530                                let chunk = &buf[0..n];
531                                if stream_logs {
532                                    let mut handle = std::io::stderr().lock();
533                                    let _ = handle.write_all(chunk);
534                                    let _ = handle.flush();
535                                }
536                                output.extend_from_slice(chunk);
537                            }
538                            Err(_) => break,
539                        }
540                    }
541                    String::from_utf8_lossy(&output).to_string()
542                } else {
543                    String::new()
544                }
545            };
546
547            let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
548
549            let status = child.wait().await.map_err(|e| {
550                Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
551            })?;
552            let duration = start.elapsed();
553
554            let exit_code = status.code().unwrap_or(1);
555            let success = status.success();
556            if !success {
557                tracing::warn!(task = %name, exit = exit_code, "Task failed");
558                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
559                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
560            } else {
561                tracing::info!(task = %name, "Task completed successfully");
562            }
563
564            // Collect declared outputs and warn on undeclared writes
565            let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
566                task.outputs
567                    .iter()
568                    .map(|o| prefix.join(o).to_string_lossy().to_string())
569                    .collect()
570            } else {
571                task.outputs.clone()
572            };
573            let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
574            let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
575            let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
576
577            // Stage outputs into a temp dir for cache persistence
578            let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
579            if outputs_stage.exists() {
580                let _ = std::fs::remove_dir_all(&outputs_stage);
581            }
582            std::fs::create_dir_all(&outputs_stage).map_err(|e| Error::Io {
583                source: e,
584                path: Some(outputs_stage.clone().into()),
585                operation: "create_dir_all".into(),
586            })?;
587
588            for rel in &outputs {
589                let rel_for_project = project_prefix
590                    .as_ref()
591                    .and_then(|prefix| rel.strip_prefix(prefix).ok())
592                    .unwrap_or(rel)
593                    .to_path_buf();
594                let src = hermetic_root.join(rel);
595                // Intentionally avoid `let`-chains here to preserve readability
596                // and to align with review guidance; allow clippy's collapsible-if.
597                #[allow(clippy::collapsible_if)]
598                if let Ok(meta) = std::fs::metadata(&src) {
599                    if meta.is_file() {
600                        let dst = outputs_stage.join(&rel_for_project);
601                        if let Some(parent) = dst.parent() {
602                            std::fs::create_dir_all(parent).map_err(|e| Error::Io {
603                                source: e,
604                                path: Some(parent.into()),
605                                operation: "create_dir_all".into(),
606                            })?;
607                        }
608                        std::fs::copy(&src, &dst).map_err(|e| Error::Io {
609                            source: e,
610                            path: Some(dst.into()),
611                            operation: "copy".into(),
612                        })?;
613                        let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
614                        output_index.push(task_cache::OutputIndexEntry {
615                            rel_path: rel_for_project.to_string_lossy().to_string(),
616                            size: meta.len(),
617                            sha256: sha,
618                        });
619                    }
620                }
621            }
622
623            // Detect undeclared writes
624            let mut warned = false;
625            for entry in walkdir::WalkDir::new(&hermetic_root)
626                .into_iter()
627                .filter_map(|e| e.ok())
628            {
629                let p = entry.path();
630                if p.is_dir() {
631                    continue;
632                }
633                let rel = match p.strip_prefix(&hermetic_root) {
634                    Ok(r) => r.to_path_buf(),
635                    Err(_) => continue,
636                };
637                let rel_str = rel.to_string_lossy().to_string();
638                let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
639                let initial = initial_hashes.get(&rel_str);
640                let changed = match initial {
641                    None => true,
642                    Some(prev) => prev != &sha,
643                };
644                if changed && !outputs_set.contains(&rel) {
645                    if !warned {
646                        tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
647                        warned = true;
648                    }
649                    tracing::debug!(path = %rel_str, "Undeclared write");
650                }
651            }
652
653            // Persist cache entry on success
654            if success {
655                let meta = task_cache::TaskResultMeta {
656                    task_name: name.to_string(),
657                    command: task.command.clone(),
658                    args: task.args.clone(),
659                    env_summary,
660                    inputs_summary: inputs_summary.clone(),
661                    created_at: Utc::now(),
662                    cuenv_version,
663                    platform,
664                    duration_ms: duration.as_millis(),
665                    exit_code,
666                    cache_key_envelope: envelope_json.clone(),
667                    output_index,
668                };
669                let logs = task_cache::TaskLogs {
670                    // Persist logs regardless of capture setting so cache hits can
671                    // reproduce output faithfully.
672                    stdout: Some(stdout.clone()),
673                    stderr: Some(stderr.clone()),
674                };
675                let cache_span = tracing::info_span!("cache.save", key = %cache_key);
676                {
677                    let _g = cache_span.enter();
678                    task_cache::save_result(
679                        &cache_key,
680                        &meta,
681                        &outputs_stage,
682                        &hermetic_root,
683                        logs,
684                        self.config.cache_dir.as_deref(),
685                    )?;
686                }
687
688                // Record this task's cache key as the latest for this project
689                task_cache::record_latest(
690                    &self.config.project_root,
691                    name,
692                    &cache_key,
693                    self.config.cache_dir.as_deref(),
694                )?;
695
696                // Outputs remain in cache only - dependent tasks use inputsFrom to consume them
697            } else {
698                // Optionally persist logs in a failure/ subdir: not implemented for brevity
699            }
700
701            Ok(TaskResult {
702                name: name.to_string(),
703                exit_code: Some(exit_code),
704                stdout,
705                stderr,
706                success,
707            })
708        }
709    }
710
711    /// Execute a task non-hermetically (directly in workspace/project root)
712    ///
713    /// Used for tasks like `bun install` that need to write to the real filesystem.
714    async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
715        // Check if this is an unresolved TaskRef (should have been resolved before execution)
716        if task.is_task_ref() && task.project_root.is_none() {
717            return Err(Error::configuration(format!(
718                "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
719                 This usually means:\n\
720                 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
721                 - The referenced task '{}' doesn't exist in that project\n\
722                 - There was an error loading the referenced project's env.cue\n\
723                 Run with RUST_LOG=debug for more details.",
724                name,
725                task.task_ref.as_deref().unwrap_or("unknown"),
726                task.task_ref
727                    .as_deref()
728                    .and_then(|r| r.split(':').next_back())
729                    .unwrap_or("unknown")
730            )));
731        }
732
733        // Determine working directory (in priority order):
734        // 1. Explicit directory field on task (relative to cue.mod root)
735        // 2. TaskRef project_root (from resolution)
736        // 3. Source file directory (from _source metadata)
737        // 4. Install tasks (hermetic: false with workspaces) run from workspace root
738        // 5. Default to project root
739        let workdir = if let Some(ref dir) = task.directory {
740            // Explicit directory override: resolve relative to cue.mod root or project root
741            self.config
742                .cue_module_root
743                .as_ref()
744                .unwrap_or(&self.config.project_root)
745                .join(dir)
746        } else if let Some(ref project_root) = task.project_root {
747            // TaskRef tasks run in their original project directory
748            project_root.clone()
749        } else if let Some(ref source) = task.source {
750            // Default: run in the directory of the source file
751            if let Some(dir) = source.directory() {
752                self.config
753                    .cue_module_root
754                    .as_ref()
755                    .unwrap_or(&self.config.project_root)
756                    .join(dir)
757            } else {
758                // Source is at root (e.g., "env.cue"), use cue_module_root if available
759                // This ensures tasks defined in root env.cue run from module root,
760                // even when invoked from a subdirectory
761                self.config
762                    .cue_module_root
763                    .clone()
764                    .unwrap_or_else(|| self.config.project_root.clone())
765            }
766        } else if !task.hermetic && !task.workspaces.is_empty() {
767            // Find workspace root for install tasks
768            let workspace_name = &task.workspaces[0];
769            let manager = match workspace_name.as_str() {
770                "bun" => PackageManager::Bun,
771                "npm" => PackageManager::Npm,
772                "pnpm" => PackageManager::Pnpm,
773                "yarn" => PackageManager::YarnModern,
774                "cargo" => PackageManager::Cargo,
775                _ => PackageManager::Npm, // fallback
776            };
777            find_workspace_root(manager, &self.config.project_root)
778        } else {
779            self.config.project_root.clone()
780        };
781
782        tracing::info!(
783            task = %name,
784            workdir = %workdir.display(),
785            hermetic = false,
786            "Executing non-hermetic task"
787        );
788
789        // Emit command being run
790        let cmd_str = if let Some(script) = &task.script {
791            format!("[script: {} bytes]", script.len())
792        } else if task.command.is_empty() {
793            task.args.join(" ")
794        } else {
795            format!("{} {}", task.command, task.args.join(" "))
796        };
797        if !self.config.capture_output {
798            cuenv_events::emit_task_started!(name, cmd_str, false);
799        }
800
801        // Build command - handle script mode vs command mode
802        let mut cmd = if let Some(script) = &task.script {
803            // Script mode: use shell to execute the script
804            let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
805                (
806                    shell.command.clone().unwrap_or_else(|| "bash".to_string()),
807                    shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
808                )
809            } else {
810                // Default to bash for scripts
811                ("bash".to_string(), "-c".to_string())
812            };
813
814            let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
815            let mut cmd = Command::new(&resolved_shell);
816            cmd.arg(&shell_flag);
817            cmd.arg(script);
818            cmd
819        } else {
820            // Command mode: existing behavior
821            let resolved_command = self.config.environment.resolve_command(&task.command);
822
823            if let Some(shell) = &task.shell {
824                if shell.command.is_some() && shell.flag.is_some() {
825                    let shell_command = shell.command.as_ref().unwrap();
826                    let shell_flag = shell.flag.as_ref().unwrap();
827                    let resolved_shell = self.config.environment.resolve_command(shell_command);
828                    let mut cmd = Command::new(&resolved_shell);
829                    cmd.arg(shell_flag);
830                    if task.args.is_empty() {
831                        cmd.arg(&resolved_command);
832                    } else {
833                        let full_command = if task.command.is_empty() {
834                            task.args.join(" ")
835                        } else {
836                            format!("{} {}", resolved_command, task.args.join(" "))
837                        };
838                        cmd.arg(full_command);
839                    }
840                    cmd
841                } else {
842                    let mut cmd = Command::new(&resolved_command);
843                    for arg in &task.args {
844                        cmd.arg(arg);
845                    }
846                    cmd
847                }
848            } else {
849                let mut cmd = Command::new(&resolved_command);
850                for arg in &task.args {
851                    cmd.arg(arg);
852                }
853                cmd
854            }
855        };
856
857        // Set working directory and environment
858        cmd.current_dir(&workdir);
859        let env_vars = self.config.environment.merge_with_system();
860        for (k, v) in &env_vars {
861            cmd.env(k, v);
862        }
863
864        // Execute - always capture output for consistent behavior
865        // If not in capture mode, stream output to terminal in real-time
866        if self.config.capture_output {
867            let output = cmd
868                .stdout(Stdio::piped())
869                .stderr(Stdio::piped())
870                .output()
871                .await
872                .map_err(|e| Error::Io {
873                    source: e,
874                    path: None,
875                    operation: format!("spawn task {}", name),
876                })?;
877
878            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
879            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
880            let exit_code = output.status.code().unwrap_or(-1);
881            let success = output.status.success();
882
883            if !success {
884                tracing::warn!(task = %name, exit = exit_code, "Task failed");
885                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
886                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
887            }
888
889            Ok(TaskResult {
890                name: name.to_string(),
891                exit_code: Some(exit_code),
892                stdout,
893                stderr,
894                success,
895            })
896        } else {
897            // Stream output directly to terminal (interactive mode)
898            let status = cmd
899                .stdout(Stdio::inherit())
900                .stderr(Stdio::inherit())
901                .status()
902                .await
903                .map_err(|e| Error::Io {
904                    source: e,
905                    path: None,
906                    operation: format!("spawn task {}", name),
907                })?;
908
909            let exit_code = status.code().unwrap_or(-1);
910            let success = status.success();
911
912            if !success {
913                tracing::warn!(task = %name, exit = exit_code, "Task failed");
914            }
915
916            Ok(TaskResult {
917                name: name.to_string(),
918                exit_code: Some(exit_code),
919                stdout: String::new(), // Output went to terminal
920                stderr: String::new(),
921                success,
922            })
923        }
924    }
925
926    /// Execute a task definition (single task or group)
927    #[async_recursion]
928    pub async fn execute_definition(
929        &self,
930        name: &str,
931        definition: &TaskDefinition,
932        all_tasks: &Tasks,
933    ) -> Result<Vec<TaskResult>> {
934        match definition {
935            TaskDefinition::Single(task) => {
936                let result = self.execute_task(name, task.as_ref()).await?;
937                Ok(vec![result])
938            }
939            TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
940        }
941    }
942
943    async fn execute_group(
944        &self,
945        prefix: &str,
946        group: &TaskGroup,
947        all_tasks: &Tasks,
948    ) -> Result<Vec<TaskResult>> {
949        match group {
950            TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
951            TaskGroup::Parallel(group) => self.execute_parallel(prefix, group, all_tasks).await,
952        }
953    }
954
955    async fn execute_sequential(
956        &self,
957        prefix: &str,
958        tasks: &[TaskDefinition],
959        all_tasks: &Tasks,
960    ) -> Result<Vec<TaskResult>> {
961        if !self.config.capture_output {
962            cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
963        }
964        let mut results = Vec::new();
965        for (i, task_def) in tasks.iter().enumerate() {
966            let task_name = format!("{}[{}]", prefix, i);
967            let task_results = self
968                .execute_definition(&task_name, task_def, all_tasks)
969                .await?;
970            for result in &task_results {
971                if !result.success {
972                    let message = format!(
973                        "Sequential task group '{prefix}' halted.\n\n{}",
974                        summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
975                    );
976                    return Err(Error::configuration(message));
977                }
978            }
979            results.extend(task_results);
980        }
981        Ok(results)
982    }
983
984    async fn execute_parallel(
985        &self,
986        prefix: &str,
987        group: &ParallelGroup,
988        all_tasks: &Tasks,
989    ) -> Result<Vec<TaskResult>> {
990        // Check for "default" task to override parallel execution
991        if let Some(default_task) = group.tasks.get("default") {
992            if !self.config.capture_output {
993                cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
994            }
995            // Execute only the default task, using the group prefix directly
996            // since "default" is implicit when invoking the group name
997            let task_name = format!("{}.default", prefix);
998            return self
999                .execute_definition(&task_name, default_task, all_tasks)
1000                .await;
1001        }
1002
1003        if !self.config.capture_output {
1004            cuenv_events::emit_task_group_started!(prefix, false, group.tasks.len());
1005        }
1006        let mut join_set = JoinSet::new();
1007        let all_tasks = Arc::new(all_tasks.clone());
1008        let mut all_results = Vec::new();
1009        let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
1010            if let Some(failed) = results.iter().find(|r| !r.success) {
1011                let message = format!(
1012                    "Parallel task group '{prefix}' halted.\n\n{}",
1013                    summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
1014                );
1015                return Err(Error::configuration(message));
1016            }
1017            all_results.extend(results);
1018            Ok(())
1019        };
1020        for (name, task_def) in &group.tasks {
1021            let task_name = format!("{}.{}", prefix, name);
1022            let task_def = task_def.clone();
1023            let all_tasks = Arc::clone(&all_tasks);
1024            let executor = self.clone_with_config();
1025            join_set.spawn(async move {
1026                executor
1027                    .execute_definition(&task_name, &task_def, &all_tasks)
1028                    .await
1029            });
1030            if self.config.max_parallel > 0
1031                && join_set.len() >= self.config.max_parallel
1032                && let Some(result) = join_set.join_next().await
1033            {
1034                match result {
1035                    Ok(Ok(results)) => merge_results(results)?,
1036                    Ok(Err(e)) => return Err(e),
1037                    Err(e) => {
1038                        return Err(Error::configuration(format!(
1039                            "Task execution panicked: {}",
1040                            e
1041                        )));
1042                    }
1043                }
1044            }
1045        }
1046        while let Some(result) = join_set.join_next().await {
1047            match result {
1048                Ok(Ok(results)) => merge_results(results)?,
1049                Ok(Err(e)) => return Err(e),
1050                Err(e) => {
1051                    return Err(Error::configuration(format!(
1052                        "Task execution panicked: {}",
1053                        e
1054                    )));
1055                }
1056            }
1057        }
1058        Ok(all_results)
1059    }
1060
1061    pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
1062        let parallel_groups = graph.get_parallel_groups()?;
1063        let mut all_results = Vec::new();
1064
1065        // IMPORTANT:
1066        // Each parallel group represents a dependency "level". We must not start tasks from the
1067        // next group until *all* tasks from the current group have completed successfully.
1068        //
1069        // The previous implementation pipelined groups (starting the next group as soon as all
1070        // tasks from the current group were spawned), which allowed dependent tasks to run before
1071        // their dependencies finished (especially visible with long-running tasks like dev servers).
1072        for mut group in parallel_groups {
1073            let mut join_set = JoinSet::new();
1074
1075            while !group.is_empty() || !join_set.is_empty() {
1076                // Fill the concurrency window for this group
1077                while let Some(node) = group.pop() {
1078                    let task = node.task.clone();
1079                    let name = node.name.clone();
1080                    let executor = self.clone_with_config();
1081                    join_set.spawn(async move { executor.execute_task(&name, &task).await });
1082
1083                    if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
1084                        break;
1085                    }
1086                }
1087
1088                if let Some(result) = join_set.join_next().await {
1089                    match result {
1090                        Ok(Ok(task_result)) => {
1091                            if !task_result.success {
1092                                join_set.abort_all();
1093                                let message = format!(
1094                                    "Task graph execution halted.\n\n{}",
1095                                    summarize_task_failure(
1096                                        &task_result,
1097                                        TASK_FAILURE_SNIPPET_LINES,
1098                                    )
1099                                );
1100                                return Err(Error::configuration(message));
1101                            }
1102                            all_results.push(task_result);
1103                        }
1104                        Ok(Err(e)) => {
1105                            join_set.abort_all();
1106                            return Err(e);
1107                        }
1108                        Err(e) => {
1109                            join_set.abort_all();
1110                            return Err(Error::configuration(format!(
1111                                "Task execution panicked: {}",
1112                                e
1113                            )));
1114                        }
1115                    }
1116                }
1117            }
1118        }
1119
1120        Ok(all_results)
1121    }
1122
1123    async fn resolve_workspace(
1124        &self,
1125        _task_name: &str,
1126        task: &Task,
1127        workspace_name: &str,
1128        config: &WorkspaceConfig,
1129    ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
1130        let root = self.config.project_root.clone();
1131        let task_label = _task_name.to_string();
1132        let command = task.command.clone();
1133        let config_pm = config.package_manager.clone();
1134        let config_root = config.root.clone();
1135        // WorkspaceConfig doesn't support 'packages' filtering yet, so assume full workspace deps for now,
1136        // or imply it from context. If we want package filtering, we need to add it to WorkspaceConfig
1137        // or assume all deps.
1138        // However, the previous logic used `packages` from WorkspaceInputs.
1139        // For now, let's assume traversing all dependencies if not specified.
1140        // But wait, `WorkspaceConfig` in schema doesn't have `packages`.
1141        // So we probably default to "all relevant" or auto-infer from current directory if we are inside a package.
1142        let mut packages = Vec::new();
1143
1144        // If we are in a subdirectory that matches a workspace member, we might want to infer that package.
1145        // The previous logic did that if `packages` was empty.
1146
1147        let lockfile_override: Option<String> = None; // WorkspaceConfig doesn't have lockfile override yet.
1148        // If we need it, we should add it to WorkspaceConfig. For now assume standard discovery.
1149
1150        let mut traverse_workspace_deps = true;
1151
1152        // Use spawn_blocking for heavy lifting
1153        let workspace_name_owned = workspace_name.to_string();
1154        tokio::task::spawn_blocking(move || {
1155            let override_for_detection = lockfile_override.as_ref().map(|lock| {
1156                let candidate = PathBuf::from(lock);
1157                if candidate.is_absolute() {
1158                    candidate
1159                } else {
1160                    root.join(lock)
1161                }
1162            });
1163
1164            // 1. Detect Package Manager
1165            // Priority: 1. explicit config, 2. workspace name (if it matches a PM), 3. auto-detect
1166            let manager = if let Some(pm_str) = config_pm {
1167                match pm_str.as_str() {
1168                    "npm" => PackageManager::Npm,
1169                    "bun" => PackageManager::Bun,
1170                    "pnpm" => PackageManager::Pnpm,
1171                    "yarn" => PackageManager::YarnModern,
1172                    "yarn-classic" => PackageManager::YarnClassic,
1173                    "cargo" => PackageManager::Cargo,
1174                    _ => {
1175                        return Err(Error::configuration(format!(
1176                            "Unknown package manager: {}",
1177                            pm_str
1178                        )));
1179                    }
1180                }
1181            } else {
1182                // Try to match workspace name to package manager
1183                match workspace_name_owned.as_str() {
1184                    "npm" => PackageManager::Npm,
1185                    "bun" => PackageManager::Bun,
1186                    "pnpm" => PackageManager::Pnpm,
1187                    "yarn" => PackageManager::YarnModern,
1188                    "cargo" => PackageManager::Cargo,
1189                    _ => {
1190                         // Auto-detect if name doesn't match
1191                        // Priority: command hint -> lockfiles
1192                        let hint = detect_from_command(&command);
1193                        let detected = match detect_package_managers(&root) {
1194                            Ok(list) => list,
1195                            Err(e) => {
1196                                if override_for_detection.is_some() {
1197                                    Vec::new()
1198                                } else {
1199                                    return Err(Error::configuration(format!(
1200                                        "Failed to detect package managers: {}",
1201                                        e
1202                                    )));
1203                                }
1204                            }
1205                        };
1206
1207                        if let Some(h) = hint {
1208                            if detected.contains(&h) {
1209                                h
1210                            } else if !detected.is_empty() {
1211                                // Prefer detected files
1212                                detected[0]
1213                            } else if let Some(ref override_path) = override_for_detection {
1214                                infer_manager_from_lockfile(override_path).ok_or_else(|| {
1215                                    Error::configuration(
1216                                        "Unable to infer package manager from lockfile override",
1217                                    )
1218                                })?
1219                            } else {
1220                                return Err(Error::configuration(
1221                                    format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
1222                                ));
1223                            }
1224                        } else if !detected.is_empty() {
1225                            detected[0]
1226                        } else if let Some(ref override_path) = override_for_detection {
1227                            infer_manager_from_lockfile(override_path).ok_or_else(|| {
1228                                Error::configuration(
1229                                    "Unable to infer package manager from lockfile override",
1230                                )
1231                            })?
1232                        } else {
1233                            return Err(Error::configuration(
1234                                "Could not detect package manager for workspace resolution",
1235                            ));
1236                        }
1237                    }
1238                }
1239            };
1240
1241            // Resolve the actual workspace root by walking up until we find a
1242            // directory that declares a workspace for this package manager.
1243            // If config.root is set, use that.
1244            let workspace_root = if let Some(root_override) = &config_root {
1245                root.join(root_override)
1246            } else {
1247                find_workspace_root(manager, &root)
1248            };
1249
1250            if task_trace_enabled() {
1251                tracing::info!(
1252                    task = %task_label,
1253                    manager = %manager,
1254                    project_root = %root.display(),
1255                    workspace_root = %workspace_root.display(),
1256                    "Resolved workspace root for package manager"
1257                );
1258            }
1259
1260            let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
1261                let candidate = PathBuf::from(lock);
1262                if candidate.is_absolute() {
1263                    candidate
1264                } else {
1265                    workspace_root.join(lock)
1266                }
1267            });
1268
1269            // 2. Discover Workspace
1270            let discovery: Box<dyn WorkspaceDiscovery> = match manager {
1271                PackageManager::Npm
1272                | PackageManager::Bun
1273                | PackageManager::YarnClassic
1274                | PackageManager::YarnModern
1275                | PackageManager::Deno => Box::new(PackageJsonDiscovery),
1276                PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
1277                PackageManager::Cargo => Box::new(CargoTomlDiscovery),
1278            };
1279
1280            let workspace = discovery.discover(&workspace_root).map_err(|e| {
1281                Error::configuration(format!("Failed to discover workspace: {}", e))
1282            })?;
1283
1284            // 3. Parse Lockfile
1285            let lockfile_path = if let Some(path) = lockfile_override_path {
1286                if !path.exists() {
1287                    return Err(Error::configuration(format!(
1288                        "Workspace lockfile override does not exist: {}",
1289                        path.display()
1290                    )));
1291                }
1292                path
1293            } else {
1294                workspace.lockfile.clone().ok_or_else(|| {
1295                    Error::configuration("Workspace resolution requires a lockfile")
1296                })?
1297            };
1298
1299            let parser: Box<dyn LockfileParser> = match manager {
1300                PackageManager::Npm => Box::new(NpmLockfileParser),
1301                PackageManager::Bun => Box::new(BunLockfileParser),
1302                PackageManager::Pnpm => Box::new(PnpmLockfileParser),
1303                PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
1304                PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
1305                PackageManager::Cargo => Box::new(CargoLockfileParser),
1306                PackageManager::Deno => Box::new(NpmLockfileParser), // Deno uses a similar JSON lockfile format
1307            };
1308
1309            let entries = parser
1310                .parse(&lockfile_path)
1311                .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
1312            if task_trace_enabled() {
1313                tracing::info!(
1314                    task = %task_label,
1315                    lockfile = %lockfile_path.display(),
1316                    members = entries.len(),
1317                    "Parsed workspace lockfile"
1318                );
1319            }
1320
1321            // Compute lockfile hash
1322            let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
1323
1324            // Infer packages when none explicitly provided by scoping to the
1325            // current workspace member. (We intentionally avoid pulling all
1326            // transitive deps here to keep hashing fast for large monorepos.)
1327            if packages.is_empty() {
1328                let current_member = workspace
1329                    .members
1330                    .iter()
1331                    .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
1332                if let Some(member) = current_member {
1333                    let inferred = vec![member.name.clone()];
1334                    if task_trace_enabled() {
1335                        tracing::info!(
1336                            task = %task_label,
1337                            inferred_packages = ?inferred,
1338                            "Inferred workspace packages from current project"
1339                        );
1340                    }
1341                    packages = inferred;
1342                    traverse_workspace_deps = true;
1343                }
1344            }
1345
1346            // 4. Collect Inputs
1347            let mut member_paths = Vec::new();
1348
1349            // Always include workspace configuration files
1350            member_paths.push(manager.workspace_config_name().to_string());
1351            if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
1352                member_paths.push(rel.to_string_lossy().to_string());
1353            } else {
1354                member_paths.push(lockfile_path.to_string_lossy().to_string());
1355            }
1356
1357            if packages.is_empty() {
1358                for member in &workspace.members {
1359                    let manifest_rel = member
1360                        .path
1361                        .join(manager.workspace_config_name());
1362                    member_paths.push(manifest_rel.to_string_lossy().to_string());
1363                }
1364            } else {
1365                let mut to_visit: Vec<String> = packages.clone();
1366                let mut visited = HashSet::new();
1367
1368                while let Some(pkg_name) = to_visit.pop() {
1369                    if visited.contains(&pkg_name) {
1370                        continue;
1371                    }
1372                    visited.insert(pkg_name.clone());
1373
1374                    if let Some(member) = workspace.find_member(&pkg_name) {
1375                        let manifest_rel = member
1376                            .path
1377                            .join(manager.workspace_config_name());
1378                        member_paths.push(manifest_rel.to_string_lossy().to_string());
1379
1380                        // Add dependencies when explicitly requested
1381                        if traverse_workspace_deps {
1382                            let mut dependency_candidates: HashSet<String> = HashSet::new();
1383
1384                            if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1385                                for dep in &entry.dependencies {
1386                                    if entries
1387                                        .iter()
1388                                        .any(|e| e.name == dep.name && e.is_workspace_member)
1389                                    {
1390                                        dependency_candidates.insert(dep.name.clone());
1391                                    }
1392                                }
1393                            }
1394
1395                            for dep_name in &member.dependencies {
1396                                if workspace.find_member(dep_name).is_some() {
1397                                    dependency_candidates.insert(dep_name.clone());
1398                                }
1399                            }
1400
1401                            for dep_name in dependency_candidates {
1402                                to_visit.push(dep_name);
1403                            }
1404                        }
1405                    }
1406                }
1407            }
1408
1409            if task_trace_enabled() {
1410                tracing::info!(
1411                    task = %task_label,
1412                    members = ?member_paths,
1413                    "Workspace input member paths selected"
1414                );
1415            }
1416
1417            Ok((workspace, entries, member_paths, Some(hash)))
1418        })
1419        .await
1420        .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1421    }
1422
1423    async fn materialize_workspace(
1424        &self,
1425        workspace: &Workspace,
1426        entries: &[LockfileEntry],
1427        target_dir: &Path,
1428    ) -> Result<()> {
1429        // Dispatch to appropriate materializer
1430        let materializer: Box<dyn Materializer> = match workspace.manager {
1431            PackageManager::Npm
1432            | PackageManager::Bun
1433            | PackageManager::Pnpm
1434            | PackageManager::YarnClassic
1435            | PackageManager::YarnModern
1436            | PackageManager::Deno => Box::new(NodeModulesMaterializer),
1437            PackageManager::Cargo => Box::new(CargoMaterializer),
1438        };
1439
1440        materializer
1441            .materialize(workspace, entries, target_dir)
1442            .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1443    }
1444
1445    fn clone_with_config(&self) -> Self {
1446        // Share the backend across clones to preserve container cache for Dagger chaining
1447        Self::with_shared_backend(self.config.clone(), self.backend.clone())
1448    }
1449}
1450
1451fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1452    let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1453
1454    loop {
1455        let is_root = match manager {
1456            PackageManager::Npm
1457            | PackageManager::Bun
1458            | PackageManager::YarnClassic
1459            | PackageManager::YarnModern => package_json_has_workspaces(&current),
1460            PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1461            PackageManager::Cargo => cargo_toml_has_workspace(&current),
1462            PackageManager::Deno => deno_json_has_workspace(&current),
1463        };
1464
1465        if is_root {
1466            return current;
1467        }
1468
1469        if let Some(parent) = current.parent() {
1470            current = parent.to_path_buf();
1471        } else {
1472            return start.to_path_buf();
1473        }
1474    }
1475}
1476
1477fn package_json_has_workspaces(dir: &Path) -> bool {
1478    let path = dir.join("package.json");
1479    let content = std::fs::read_to_string(&path);
1480    let Ok(json) = content.and_then(|s| {
1481        serde_json::from_str::<serde_json::Value>(&s)
1482            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1483    }) else {
1484        return false;
1485    };
1486
1487    match json.get("workspaces") {
1488        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1489        Some(serde_json::Value::Object(map)) => map
1490            .get("packages")
1491            .and_then(|packages| packages.as_array())
1492            .map(|arr| !arr.is_empty())
1493            .unwrap_or(false),
1494        _ => false,
1495    }
1496}
1497
1498fn cargo_toml_has_workspace(dir: &Path) -> bool {
1499    let path = dir.join("Cargo.toml");
1500    let Ok(content) = std::fs::read_to_string(&path) else {
1501        return false;
1502    };
1503
1504    content.contains("[workspace]")
1505}
1506
1507fn deno_json_has_workspace(dir: &Path) -> bool {
1508    let path = dir.join("deno.json");
1509    let content = std::fs::read_to_string(&path);
1510    let Ok(json) = content.and_then(|s| {
1511        serde_json::from_str::<serde_json::Value>(&s)
1512            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1513    }) else {
1514        return false;
1515    };
1516
1517    // Deno uses "workspace" (not "workspaces") for workspace configuration
1518    match json.get("workspace") {
1519        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1520        Some(serde_json::Value::Object(_)) => true,
1521        _ => false,
1522    }
1523}
1524
1525fn task_trace_enabled() -> bool {
1526    static ENABLED: OnceLock<bool> = OnceLock::new();
1527    *ENABLED.get_or_init(|| {
1528        matches!(
1529            std::env::var("CUENV_TRACE_TASKS")
1530                .unwrap_or_default()
1531                .trim()
1532                .to_ascii_lowercase()
1533                .as_str(),
1534            "1" | "true" | "yes" | "on"
1535        )
1536    })
1537}
1538
1539/// Build a compact, user-friendly summary for a failed task, including the
1540/// exit code and the tail of stdout/stderr to help with diagnostics.
1541pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1542    let exit_code = result
1543        .exit_code
1544        .map(|c| c.to_string())
1545        .unwrap_or_else(|| "unknown".to_string());
1546
1547    let mut sections = Vec::new();
1548    sections.push(format!(
1549        "Task '{}' failed with exit code {}.",
1550        result.name, exit_code
1551    ));
1552
1553    let output = format_failure_streams(result, max_output_lines);
1554    if output.is_empty() {
1555        sections.push(
1556            "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1557                .to_string(),
1558        );
1559    } else {
1560        sections.push(output);
1561    }
1562
1563    sections.join("\n\n")
1564}
1565
1566fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1567    let mut streams = Vec::new();
1568
1569    if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1570        streams.push(stdout);
1571    }
1572
1573    if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1574        streams.push(stderr);
1575    }
1576
1577    streams.join("\n\n")
1578}
1579
1580fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1581    let normalized = content.trim_end();
1582    if normalized.is_empty() {
1583        return None;
1584    }
1585
1586    let lines: Vec<&str> = normalized.lines().collect();
1587    let total = lines.len();
1588    let start = total.saturating_sub(max_output_lines);
1589    let snippet = lines[start..].join("\n");
1590
1591    let header = if total > max_output_lines {
1592        format!("{label} (last {max_output_lines} of {total} lines):")
1593    } else {
1594        format!("{label}:")
1595    };
1596
1597    Some(format!("{header}\n{snippet}"))
1598}
1599
1600fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1601    match path.file_name().and_then(|n| n.to_str())? {
1602        "package-lock.json" => Some(PackageManager::Npm),
1603        "bun.lock" => Some(PackageManager::Bun),
1604        "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1605        "yarn.lock" => Some(PackageManager::YarnModern),
1606        "Cargo.lock" => Some(PackageManager::Cargo),
1607        _ => None,
1608    }
1609}
1610
1611fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1612    // Use OS temp dir; name scoped by task and cache key prefix.
1613    // IMPORTANT: Ensure the workdir is clean on every run to preserve hermeticity.
1614    let sanitized_task = task_name
1615        .chars()
1616        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1617        .collect::<String>();
1618
1619    let base = std::env::temp_dir().join(format!(
1620        "cuenv-work-{}-{}",
1621        sanitized_task,
1622        &key[..12.min(key.len())]
1623    ));
1624
1625    // If a directory from a previous run exists, remove it before reuse.
1626    // This avoids contamination from artifacts left by failed runs where no cache was saved.
1627    if base.exists()
1628        && let Err(e) = std::fs::remove_dir_all(&base)
1629    {
1630        // If we cannot remove the previous directory (e.g. in-use on Windows),
1631        // fall back to a unique, fresh directory to maintain hermetic execution.
1632        let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1633        let fallback = std::env::temp_dir().join(format!(
1634            "cuenv-work-{}-{}-{}",
1635            sanitized_task,
1636            &key[..12.min(key.len())],
1637            ts
1638        ));
1639        tracing::warn!(
1640            previous = %base.display(),
1641            fallback = %fallback.display(),
1642            error = %e,
1643            "Failed to clean previous hermetic workdir; using fresh fallback directory"
1644        );
1645        std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1646            source: e,
1647            path: Some(fallback.clone().into()),
1648            operation: "create_dir_all".into(),
1649        })?;
1650        return Ok(fallback);
1651    }
1652
1653    std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1654        source: e,
1655        path: Some(base.clone().into()),
1656        operation: "create_dir_all".into(),
1657    })?;
1658    Ok(base)
1659}
1660
1661/// Execute an arbitrary command with the cuenv environment
1662pub async fn execute_command(
1663    command: &str,
1664    args: &[String],
1665    environment: &Environment,
1666) -> Result<i32> {
1667    tracing::info!("Executing command: {} {:?}", command, args);
1668    let mut cmd = Command::new(command);
1669    cmd.args(args);
1670    let env_vars = environment.merge_with_system();
1671    for (key, value) in env_vars {
1672        cmd.env(key, value);
1673    }
1674    cmd.stdout(Stdio::inherit());
1675    cmd.stderr(Stdio::inherit());
1676    cmd.stdin(Stdio::inherit());
1677    let status = cmd.status().await.map_err(|e| {
1678        Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1679    })?;
1680    Ok(status.code().unwrap_or(1))
1681}
1682
1683#[cfg(test)]
1684mod tests {
1685    use super::*;
1686    use crate::tasks::Input;
1687    use std::fs;
1688    use tempfile::TempDir;
1689
1690    #[tokio::test]
1691    async fn test_executor_config_default() {
1692        let config = ExecutorConfig::default();
1693        assert!(!config.capture_output);
1694        assert_eq!(config.max_parallel, 0);
1695        assert!(config.environment.is_empty());
1696    }
1697
1698    #[tokio::test]
1699    async fn test_task_result() {
1700        let result = TaskResult {
1701            name: "test".to_string(),
1702            exit_code: Some(0),
1703            stdout: "output".to_string(),
1704            stderr: String::new(),
1705            success: true,
1706        };
1707        assert_eq!(result.name, "test");
1708        assert_eq!(result.exit_code, Some(0));
1709        assert!(result.success);
1710        assert_eq!(result.stdout, "output");
1711    }
1712
1713    #[tokio::test]
1714    async fn test_execute_simple_task() {
1715        let config = ExecutorConfig {
1716            capture_output: true,
1717            ..Default::default()
1718        };
1719        let executor = TaskExecutor::new(config);
1720        let task = Task {
1721            command: "echo".to_string(),
1722            args: vec!["hello".to_string()],
1723            description: Some("Hello task".to_string()),
1724            ..Default::default()
1725        };
1726        let result = executor.execute_task("test", &task).await.unwrap();
1727        assert!(result.success);
1728        assert_eq!(result.exit_code, Some(0));
1729        assert!(result.stdout.contains("hello"));
1730    }
1731
1732    #[tokio::test]
1733    async fn test_execute_with_environment() {
1734        let mut config = ExecutorConfig {
1735            capture_output: true,
1736            ..Default::default()
1737        };
1738        config
1739            .environment
1740            .set("TEST_VAR".to_string(), "test_value".to_string());
1741        let executor = TaskExecutor::new(config);
1742        let task = Task {
1743            command: "printenv".to_string(),
1744            args: vec!["TEST_VAR".to_string()],
1745            description: Some("Print env task".to_string()),
1746            ..Default::default()
1747        };
1748        let result = executor.execute_task("test", &task).await.unwrap();
1749        assert!(result.success);
1750        assert!(result.stdout.contains("test_value"));
1751    }
1752
1753    #[tokio::test]
1754    async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1755        let tmp = TempDir::new().unwrap();
1756        let root = tmp.path();
1757
1758        // Workspace root with workspaces + lockfile
1759        fs::write(
1760            root.join("package.json"),
1761            r#"{
1762  "name": "root-app",
1763  "version": "0.0.0",
1764  "workspaces": ["packages/*", "apps/*"],
1765  "dependencies": {
1766    "@rawkodeacademy/content-technologies": "workspace:*"
1767  }
1768}"#,
1769        )
1770        .unwrap();
1771        // Deliberately omit the workspace member name for apps/site to mimic lockfiles
1772        // that only record member paths, ensuring we can still discover dependencies.
1773        fs::write(
1774            root.join("bun.lock"),
1775            r#"{
1776  "lockfileVersion": 1,
1777  "workspaces": {
1778    "": {
1779      "name": "root-app",
1780      "dependencies": {
1781        "@rawkodeacademy/content-technologies": "workspace:*"
1782      }
1783    },
1784    "packages/content-technologies": {
1785      "name": "@rawkodeacademy/content-technologies",
1786      "version": "0.0.1"
1787    },
1788    "apps/site": {
1789      "version": "0.0.0",
1790      "dependencies": {
1791        "@rawkodeacademy/content-technologies": "workspace:*"
1792      }
1793    }
1794  },
1795  "packages": {}
1796}"#,
1797        )
1798        .unwrap();
1799
1800        // Workspace member packages
1801        fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1802        fs::write(
1803            root.join("packages/content-technologies/package.json"),
1804            r#"{
1805  "name": "@rawkodeacademy/content-technologies",
1806  "version": "0.0.1"
1807}"#,
1808        )
1809        .unwrap();
1810
1811        fs::create_dir_all(root.join("apps/site")).unwrap();
1812        fs::write(
1813            root.join("apps/site/package.json"),
1814            r#"{
1815  "name": "site",
1816  "version": "0.0.0",
1817  "dependencies": {
1818    "@rawkodeacademy/content-technologies": "workspace:*"
1819  }
1820}"#,
1821        )
1822        .unwrap();
1823
1824        let mut workspaces = HashMap::new();
1825        workspaces.insert(
1826            "bun".to_string(),
1827            WorkspaceConfig {
1828                enabled: true,
1829                package_manager: Some("bun".to_string()),
1830                root: None,
1831                hooks: None,
1832            },
1833        );
1834
1835        let config = ExecutorConfig {
1836            capture_output: true,
1837            project_root: root.join("apps/site"),
1838            workspaces: Some(workspaces),
1839            ..Default::default()
1840        };
1841        let executor = TaskExecutor::new(config);
1842
1843        let task = Task {
1844            command: "sh".to_string(),
1845            args: vec![
1846                "-c".to_string(),
1847                "find ../.. -maxdepth 4 -type d | sort".to_string(),
1848            ],
1849            inputs: vec![Input::Path("package.json".to_string())],
1850            workspaces: vec!["bun".to_string()],
1851            ..Default::default()
1852        };
1853
1854        let result = executor.execute_task("install", &task).await.unwrap();
1855        assert!(
1856            result.success,
1857            "command failed stdout='{}' stderr='{}'",
1858            result.stdout, result.stderr
1859        );
1860        assert!(
1861            result
1862                .stdout
1863                .split_whitespace()
1864                .any(|line| line.ends_with("packages/content-technologies")),
1865            "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1866            result.stdout,
1867            result.stderr
1868        );
1869    }
1870
1871    #[tokio::test]
1872    async fn test_execute_failing_task() {
1873        let config = ExecutorConfig {
1874            capture_output: true,
1875            ..Default::default()
1876        };
1877        let executor = TaskExecutor::new(config);
1878        let task = Task {
1879            command: "false".to_string(),
1880            description: Some("Failing task".to_string()),
1881            ..Default::default()
1882        };
1883        let result = executor.execute_task("test", &task).await.unwrap();
1884        assert!(!result.success);
1885        assert_eq!(result.exit_code, Some(1));
1886    }
1887
1888    #[tokio::test]
1889    async fn test_execute_sequential_group() {
1890        let config = ExecutorConfig {
1891            capture_output: true,
1892            ..Default::default()
1893        };
1894        let executor = TaskExecutor::new(config);
1895        let task1 = Task {
1896            command: "echo".to_string(),
1897            args: vec!["first".to_string()],
1898            description: Some("First task".to_string()),
1899            ..Default::default()
1900        };
1901        let task2 = Task {
1902            command: "echo".to_string(),
1903            args: vec!["second".to_string()],
1904            description: Some("Second task".to_string()),
1905            ..Default::default()
1906        };
1907        let group = TaskGroup::Sequential(vec![
1908            TaskDefinition::Single(Box::new(task1)),
1909            TaskDefinition::Single(Box::new(task2)),
1910        ]);
1911        let all_tasks = Tasks::new();
1912        let results = executor
1913            .execute_group("seq", &group, &all_tasks)
1914            .await
1915            .unwrap();
1916        assert_eq!(results.len(), 2);
1917        assert!(results[0].stdout.contains("first"));
1918        assert!(results[1].stdout.contains("second"));
1919    }
1920
1921    #[tokio::test]
1922    async fn test_command_injection_prevention() {
1923        let config = ExecutorConfig {
1924            capture_output: true,
1925            ..Default::default()
1926        };
1927        let executor = TaskExecutor::new(config);
1928        let malicious_task = Task {
1929            command: "echo".to_string(),
1930            args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1931            description: Some("Malicious task test".to_string()),
1932            ..Default::default()
1933        };
1934        let result = executor
1935            .execute_task("malicious", &malicious_task)
1936            .await
1937            .unwrap();
1938        assert!(result.success);
1939        assert!(result.stdout.contains("hello ; rm -rf /"));
1940    }
1941
1942    #[tokio::test]
1943    async fn test_special_characters_in_args() {
1944        let config = ExecutorConfig {
1945            capture_output: true,
1946            ..Default::default()
1947        };
1948        let executor = TaskExecutor::new(config);
1949        let special_chars = vec![
1950            "$USER",
1951            "$(whoami)",
1952            "`whoami`",
1953            "&& echo hacked",
1954            "|| echo failed",
1955            "> /tmp/hack",
1956            "| cat",
1957        ];
1958        for special_arg in special_chars {
1959            let task = Task {
1960                command: "echo".to_string(),
1961                args: vec!["safe".to_string(), special_arg.to_string()],
1962                description: Some("Special character test".to_string()),
1963                ..Default::default()
1964            };
1965            let result = executor.execute_task("special", &task).await.unwrap();
1966            assert!(result.success);
1967            assert!(result.stdout.contains("safe"));
1968            assert!(result.stdout.contains(special_arg));
1969        }
1970    }
1971
1972    #[tokio::test]
1973    async fn test_environment_variable_safety() {
1974        let mut config = ExecutorConfig {
1975            capture_output: true,
1976            ..Default::default()
1977        };
1978        config
1979            .environment
1980            .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1981        let executor = TaskExecutor::new(config);
1982        let task = Task {
1983            command: "printenv".to_string(),
1984            args: vec!["DANGEROUS_VAR".to_string()],
1985            description: Some("Environment variable safety test".to_string()),
1986            ..Default::default()
1987        };
1988        let result = executor.execute_task("env_test", &task).await.unwrap();
1989        assert!(result.success);
1990        assert!(result.stdout.contains("; rm -rf /"));
1991    }
1992
1993    #[tokio::test]
1994    async fn test_execute_graph_parallel_groups() {
1995        // two independent tasks -> can run in same parallel group
1996        let config = ExecutorConfig {
1997            capture_output: true,
1998            max_parallel: 2,
1999            ..Default::default()
2000        };
2001        let executor = TaskExecutor::new(config);
2002        let mut graph = TaskGraph::new();
2003
2004        let t1 = Task {
2005            command: "echo".into(),
2006            args: vec!["A".into()],
2007            ..Default::default()
2008        };
2009        let t2 = Task {
2010            command: "echo".into(),
2011            args: vec!["B".into()],
2012            ..Default::default()
2013        };
2014
2015        graph.add_task("t1", t1).unwrap();
2016        graph.add_task("t2", t2).unwrap();
2017        let results = executor.execute_graph(&graph).await.unwrap();
2018        assert_eq!(results.len(), 2);
2019        let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
2020        assert!(joined.contains("A") && joined.contains("B"));
2021    }
2022
2023    #[tokio::test]
2024    async fn test_execute_graph_respects_dependency_levels() {
2025        let tmp = TempDir::new().unwrap();
2026        let root = tmp.path();
2027
2028        let config = ExecutorConfig {
2029            capture_output: true,
2030            max_parallel: 2,
2031            project_root: root.to_path_buf(),
2032            ..Default::default()
2033        };
2034        let executor = TaskExecutor::new(config);
2035
2036        let mut tasks = Tasks::new();
2037        tasks.tasks.insert(
2038            "dep".into(),
2039            TaskDefinition::Single(Box::new(Task {
2040                command: "sh".into(),
2041                args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
2042                ..Default::default()
2043            })),
2044        );
2045        tasks.tasks.insert(
2046            "consumer".into(),
2047            TaskDefinition::Single(Box::new(Task {
2048                command: "sh".into(),
2049                args: vec!["-c".into(), "cat marker.txt".into()],
2050                depends_on: vec!["dep".into()],
2051                ..Default::default()
2052            })),
2053        );
2054
2055        let mut graph = TaskGraph::new();
2056        graph.build_for_task("consumer", &tasks).unwrap();
2057
2058        let results = executor.execute_graph(&graph).await.unwrap();
2059        assert_eq!(results.len(), 2);
2060
2061        let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
2062        assert!(consumer.success);
2063        assert!(consumer.stdout.contains("ok"));
2064    }
2065}