cuenv_core/tasks/
executor.rs

1//! Task executor for running tasks with environment support and hermetic, input-addressed execution
2//!
3//! - Environment variable propagation
4//! - Parallel and sequential execution
5//! - Hermetic workdir populated from declared inputs (files/dirs/globs)
6//! - Persistent task result cache keyed by inputs + command + env + cuenv version + platform
7
8use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
9use super::{ParallelGroup, Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
10use crate::cache::tasks as task_cache;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::manifest::WorkspaceConfig;
14use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
15use crate::{Error, Result};
16use async_recursion::async_recursion;
17use chrono::Utc;
18use cuenv_workspaces::{
19    BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
20    NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
21    PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
22    YarnModernLockfileParser, detect_from_command, detect_package_managers,
23    materializer::{
24        Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
25    },
26};
27use std::collections::{BTreeMap, HashMap, HashSet};
28use std::path::{Path, PathBuf};
29use std::process::Stdio;
30use std::sync::{Arc, OnceLock};
31use tokio::process::Command;
32use tokio::task::JoinSet;
33
34/// Task execution result
35#[derive(Debug, Clone)]
36pub struct TaskResult {
37    pub name: String,
38    pub exit_code: Option<i32>,
39    pub stdout: String,
40    pub stderr: String,
41    pub success: bool,
42}
43
44/// Number of lines from stdout/stderr to include when summarizing failures
45pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
46
47/// Task executor configuration
48#[derive(Debug, Clone)]
49pub struct ExecutorConfig {
50    /// Whether to capture output (vs streaming to stdout/stderr)
51    pub capture_output: bool,
52    /// Maximum parallel tasks (0 = unlimited)
53    pub max_parallel: usize,
54    /// Environment variables to propagate (resolved via policies)
55    pub environment: Environment,
56    /// Optional working directory override (for hermetic execution)
57    pub working_dir: Option<PathBuf>,
58    /// Project root for resolving inputs/outputs (env.cue root)
59    pub project_root: PathBuf,
60    /// Path to cue.mod root for resolving relative source paths
61    pub cue_module_root: Option<PathBuf>,
62    /// Optional: materialize cached outputs on cache hit
63    pub materialize_outputs: Option<PathBuf>,
64    /// Optional: cache directory override
65    pub cache_dir: Option<PathBuf>,
66    /// Optional: print cache path on hits/misses
67    pub show_cache_path: bool,
68    /// Global workspace configuration
69    pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
70    /// Backend configuration
71    pub backend_config: Option<BackendConfig>,
72    /// CLI backend selection override
73    pub cli_backend: Option<String>,
74}
75
76impl Default for ExecutorConfig {
77    fn default() -> Self {
78        Self {
79            capture_output: false,
80            max_parallel: 0,
81            environment: Environment::new(),
82            working_dir: None,
83            project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
84            cue_module_root: None,
85            materialize_outputs: None,
86            cache_dir: None,
87            show_cache_path: false,
88            workspaces: None,
89            backend_config: None,
90            cli_backend: None,
91        }
92    }
93}
94
95/// Task executor
96pub struct TaskExecutor {
97    config: ExecutorConfig,
98    backend: Arc<dyn TaskBackend>,
99}
100impl TaskExecutor {
101    /// Create a new executor with host backend only
102    pub fn new(config: ExecutorConfig) -> Self {
103        Self::with_dagger_factory(config, None)
104    }
105
106    /// Create a new executor with optional dagger backend support.
107    ///
108    /// Pass `Some(cuenv_dagger::create_dagger_backend)` to enable dagger backend.
109    pub fn with_dagger_factory(
110        config: ExecutorConfig,
111        dagger_factory: Option<BackendFactory>,
112    ) -> Self {
113        let backend = create_backend_with_factory(
114            config.backend_config.as_ref(),
115            config.project_root.clone(),
116            config.cli_backend.as_deref(),
117            dagger_factory,
118        );
119        Self { config, backend }
120    }
121
122    /// Create a new executor with the given config but sharing the backend
123    fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
124        Self { config, backend }
125    }
126
127    /// Execute a single task
128    pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
129        // Delegate execution to the configured backend
130        // For now, we pass the project root as the execution context.
131        // Hermetic execution logic is currently bypassed in favor of direct execution
132        // as per the current codebase state (see original TODOs).
133        // The backend implementation handles the specific execution details.
134
135        // If using Dagger backend, we skip the workspace resolution and hermetic setup for now
136        // as it operates in a container.
137        if self.backend.name() == "dagger" {
138            return self
139                .backend
140                .execute(
141                    name,
142                    task,
143                    &self.config.environment,
144                    &self.config.project_root,
145                    self.config.capture_output,
146                )
147                .await;
148        }
149
150        // All tasks run directly in workspace/project root (hermetic sandbox disabled)
151        return self.execute_task_non_hermetic(name, task).await;
152
153        // TODO: Re-enable hermetic execution when sandbox properly preserves monorepo structure
154        #[allow(unreachable_code)]
155        {
156            // Resolve workspace dependencies if enabled
157            let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
158            let mut workspace_input_patterns = Vec::new();
159            let mut workspace_lockfile_hashes = BTreeMap::new();
160
161            for workspace_name in &task.workspaces {
162                if let Some(global_workspaces) = &self.config.workspaces {
163                    if let Some(ws_config) = global_workspaces.get(workspace_name) {
164                        if ws_config.enabled {
165                            let (ws, entries, paths, hash) = self
166                                .resolve_workspace(name, task, workspace_name, ws_config)
167                                .await?;
168                            workspace_ctxs.push((ws, entries));
169                            workspace_input_patterns.extend(paths);
170                            if let Some(h) = hash {
171                                workspace_lockfile_hashes.insert(workspace_name.clone(), h);
172                            }
173                        }
174                    } else {
175                        tracing::warn!(
176                            task = %name,
177                            workspace = %workspace_name,
178                            "Workspace not found in global configuration"
179                        );
180                    }
181                }
182            }
183
184            // Resolve inputs relative to the workspace root (if any) while keeping
185            // project-scoped inputs anchored to the original project path.
186            // Use the first workspace as the primary root if multiple are present,
187            // or fallback to project root. This might need refinement for multi-workspace tasks.
188            let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
189
190            let project_prefix = primary_workspace_root
191                .as_ref()
192                .and_then(|root| self.config.project_root.strip_prefix(root).ok())
193                .map(|p| p.to_path_buf());
194            let input_root = primary_workspace_root
195                .clone()
196                .unwrap_or_else(|| self.config.project_root.clone());
197
198            let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
199            let resolved_inputs = {
200                let _g = span_inputs.enter();
201                let resolver = InputResolver::new(&input_root);
202                let mut all_inputs = task.collect_all_inputs_with_prefix(project_prefix.as_deref());
203                all_inputs.extend(workspace_input_patterns.iter().cloned());
204                resolver.resolve(&all_inputs)?
205            };
206            if task_trace_enabled() {
207                tracing::info!(
208                    task = %name,
209                    input_root = %input_root.display(),
210                    project_root = %self.config.project_root.display(),
211                    inputs_count = resolved_inputs.files.len(),
212                    workspace_inputs = workspace_input_patterns.len(),
213                    "Resolved task inputs"
214                );
215            }
216
217            // Build cache key envelope
218            let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
219            // Ensure deterministic order is already guaranteed by BTreeMap
220            let env_summary: BTreeMap<String, String> = self
221                .config
222                .environment
223                .vars
224                .iter()
225                .map(|(k, v)| (k.clone(), v.clone()))
226                .collect();
227            let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
228            let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
229            let shell_json = serde_json::to_value(&task.shell).ok();
230            let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
231                None
232            } else {
233                Some(workspace_lockfile_hashes)
234            };
235
236            let envelope = task_cache::CacheKeyEnvelope {
237                inputs: inputs_summary.clone(),
238                command: task.command.clone(),
239                args: task.args.clone(),
240                shell: shell_json,
241                env: env_summary.clone(),
242                cuenv_version: cuenv_version.clone(),
243                platform: platform.clone(),
244                workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
245                // Package hashes are implicitly included in inputs_summary because we added member paths to inputs
246                workspace_package_hashes: None,
247            };
248            let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
249
250            // Cache lookup
251            let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
252            let cache_hit = {
253                let _g = span_cache.enter();
254                task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
255            };
256
257            if let Some(hit) = cache_hit {
258                tracing::info!(
259                    task = %name,
260                    key = %cache_key,
261                    path = %hit.path.display(),
262                    "Task {} cache hit: {}. Skipping execution.",
263                    name,
264                    cache_key
265                );
266                if self.config.show_cache_path {
267                    tracing::info!(cache_path = %hit.path.display(), "Cache path");
268                }
269                if let Some(dest) = &self.config.materialize_outputs {
270                    let count = task_cache::materialize_outputs(
271                        &cache_key,
272                        dest,
273                        self.config.cache_dir.as_deref(),
274                    )?;
275                    tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
276                }
277                // On cache hit, surface cached logs so behavior matches a fresh
278                // execution from the caller's perspective. We return them in the
279                // TaskResult even if `capture_output` is false, allowing callers
280                // (like the CLI) to print cached logs explicitly.
281                let stdout_path = hit.path.join("logs").join("stdout.log");
282                let stderr_path = hit.path.join("logs").join("stderr.log");
283                let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
284                let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
285                // If logs are present, we can return immediately. If logs are
286                // missing (older cache format), fall through to execute to
287                // backfill logs for future hits.
288                if !(stdout.is_empty() && stderr.is_empty()) {
289                    if !self.config.capture_output {
290                        cuenv_events::emit_task_cache_hit!(name, cache_key);
291                        // Emit cached logs as output events
292                        if !stdout.is_empty() {
293                            cuenv_events::emit_task_output!(name, "stdout", stdout);
294                        }
295                        if !stderr.is_empty() {
296                            cuenv_events::emit_task_output!(name, "stderr", stderr);
297                        }
298                    }
299                    return Ok(TaskResult {
300                        name: name.to_string(),
301                        exit_code: Some(0),
302                        stdout,
303                        stderr,
304                        success: true,
305                    });
306                } else {
307                    tracing::info!(
308                        task = %name,
309                        key = %cache_key,
310                        "Cache entry lacks logs; executing to backfill logs"
311                    );
312                }
313            }
314
315            tracing::info!(
316                task = %name,
317                key = %cache_key,
318                "Task {} executing hermetically… key {}",
319                name,
320                cache_key
321            );
322
323            let hermetic_root = create_hermetic_dir(name, &cache_key)?;
324            if self.config.show_cache_path {
325                tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
326            }
327
328            // Seed working directory with inputs
329            let span_populate =
330                tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
331            {
332                let _g = span_populate.enter();
333                populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
334            }
335
336            // Materialize workspace artifacts (node_modules, target)
337            for (ws, entries) in workspace_ctxs {
338                self.materialize_workspace(&ws, &entries, &hermetic_root)
339                    .await?;
340            }
341
342            // Materialize outputs from inputsFrom tasks
343            if let Some(ref inputs_from) = task.inputs_from {
344                for task_output in inputs_from {
345                    let source_task = &task_output.task;
346                    let source_cache_key = task_cache::lookup_latest(
347                        &self.config.project_root,
348                        source_task,
349                        self.config.cache_dir.as_deref(),
350                    )
351                    .ok_or_else(|| {
352                        Error::configuration(format!(
353                            "Task '{}' depends on outputs from '{}' but no cached result found. \
354                         Ensure '{}' runs before this task (add it to dependsOn).",
355                            name, source_task, source_task
356                        ))
357                    })?;
358
359                    // Materialize outputs into the hermetic directory
360                    let materialized = task_cache::materialize_outputs(
361                        &source_cache_key,
362                        &hermetic_root,
363                        self.config.cache_dir.as_deref(),
364                    )?;
365                    tracing::info!(
366                        task = %name,
367                        source_task = %source_task,
368                        materialized = materialized,
369                        "Materialized outputs from dependent task"
370                    );
371                }
372            }
373
374            // Initial snapshot to detect undeclared writes
375            let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
376
377            // Build command - handle script mode vs command mode
378            let mut cmd = if let Some(script) = &task.script {
379                // Script mode: use shell to execute the script
380                let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
381                    (
382                        shell.command.clone().unwrap_or_else(|| "bash".to_string()),
383                        shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
384                    )
385                } else {
386                    // Default to bash for scripts
387                    ("bash".to_string(), "-c".to_string())
388                };
389
390                let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
391                let mut cmd = Command::new(&resolved_shell);
392                cmd.arg(&shell_flag);
393                cmd.arg(script);
394                cmd
395            } else {
396                // Command mode: existing behavior
397                // Resolve command path using the environment's PATH.
398                // This is necessary because when spawning a process, the OS looks up
399                // the executable in the current process's PATH, not the environment
400                // that will be set on the child process.
401                let resolved_command = self.config.environment.resolve_command(&task.command);
402
403                if let Some(shell) = &task.shell {
404                    if shell.command.is_some() && shell.flag.is_some() {
405                        let shell_command = shell.command.as_ref().unwrap();
406                        let shell_flag = shell.flag.as_ref().unwrap();
407                        // Resolve shell command too
408                        let resolved_shell = self.config.environment.resolve_command(shell_command);
409                        let mut cmd = Command::new(&resolved_shell);
410                        cmd.arg(shell_flag);
411                        if task.args.is_empty() {
412                            cmd.arg(&resolved_command);
413                        } else {
414                            let full_command = if task.command.is_empty() {
415                                task.args.join(" ")
416                            } else {
417                                format!("{} {}", resolved_command, task.args.join(" "))
418                            };
419                            cmd.arg(full_command);
420                        }
421                        cmd
422                    } else {
423                        let mut cmd = Command::new(&resolved_command);
424                        for arg in &task.args {
425                            cmd.arg(arg);
426                        }
427                        cmd
428                    }
429                } else {
430                    let mut cmd = Command::new(&resolved_command);
431                    for arg in &task.args {
432                        cmd.arg(arg);
433                    }
434                    cmd
435                }
436            };
437
438            let workdir = if let Some(dir) = &self.config.working_dir {
439                dir.clone()
440            } else if let Some(prefix) = project_prefix.as_ref() {
441                hermetic_root.join(prefix)
442            } else {
443                hermetic_root.clone()
444            };
445            std::fs::create_dir_all(&workdir).map_err(|e| Error::Io {
446                source: e,
447                path: Some(workdir.clone().into()),
448                operation: "create_dir_all".into(),
449            })?;
450            cmd.current_dir(&workdir);
451            // Set environment variables (resolved + system), set CWD
452            let env_vars = self.config.environment.merge_with_system();
453            if task_trace_enabled() {
454                tracing::info!(
455                    task = %name,
456                    hermetic_root = %hermetic_root.display(),
457                    workdir = %workdir.display(),
458                    command = %task.command,
459                    args = ?task.args,
460                    env_count = env_vars.len(),
461                    "Launching task command"
462                );
463            }
464            for (k, v) in env_vars {
465                cmd.env(k, v);
466            }
467
468            // Configure output: always capture to ensure consistent behavior on
469            // cache hits and allow callers to decide what to print.
470            cmd.stdout(Stdio::piped());
471            cmd.stderr(Stdio::piped());
472
473            let stream_logs = !self.config.capture_output;
474            if stream_logs {
475                let cmd_str = if task.command.is_empty() {
476                    task.args.join(" ")
477                } else {
478                    format!("{} {}", task.command, task.args.join(" "))
479                };
480                cuenv_events::emit_task_started!(name, cmd_str, true);
481            }
482
483            let start = std::time::Instant::now();
484            let mut child = cmd.spawn().map_err(|e| {
485                Error::configuration(format!("Failed to spawn task '{}': {}", name, e))
486            })?;
487
488            let stdout_handle = child.stdout.take();
489            let stderr_handle = child.stderr.take();
490
491            let stdout_task = async move {
492                if let Some(mut stdout) = stdout_handle {
493                    let mut output = Vec::new();
494                    let mut buf = [0u8; 4096];
495                    use std::io::Write;
496                    use tokio::io::AsyncReadExt;
497
498                    loop {
499                        match stdout.read(&mut buf).await {
500                            Ok(0) => break, // EOF
501                            Ok(n) => {
502                                let chunk = &buf[0..n];
503                                if stream_logs {
504                                    let mut handle = std::io::stdout().lock();
505                                    let _ = handle.write_all(chunk);
506                                    let _ = handle.flush();
507                                }
508                                output.extend_from_slice(chunk);
509                            }
510                            Err(_) => break,
511                        }
512                    }
513                    String::from_utf8_lossy(&output).to_string()
514                } else {
515                    String::new()
516                }
517            };
518
519            let stderr_task = async move {
520                if let Some(mut stderr) = stderr_handle {
521                    let mut output = Vec::new();
522                    let mut buf = [0u8; 4096];
523                    use std::io::Write;
524                    use tokio::io::AsyncReadExt;
525
526                    loop {
527                        match stderr.read(&mut buf).await {
528                            Ok(0) => break, // EOF
529                            Ok(n) => {
530                                let chunk = &buf[0..n];
531                                if stream_logs {
532                                    let mut handle = std::io::stderr().lock();
533                                    let _ = handle.write_all(chunk);
534                                    let _ = handle.flush();
535                                }
536                                output.extend_from_slice(chunk);
537                            }
538                            Err(_) => break,
539                        }
540                    }
541                    String::from_utf8_lossy(&output).to_string()
542                } else {
543                    String::new()
544                }
545            };
546
547            let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
548
549            let status = child.wait().await.map_err(|e| {
550                Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
551            })?;
552            let duration = start.elapsed();
553
554            let exit_code = status.code().unwrap_or(1);
555            let success = status.success();
556            if !success {
557                tracing::warn!(task = %name, exit = exit_code, "Task failed");
558                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
559                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
560            } else {
561                tracing::info!(task = %name, "Task completed successfully");
562            }
563
564            // Collect declared outputs and warn on undeclared writes
565            let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
566                task.outputs
567                    .iter()
568                    .map(|o| prefix.join(o).to_string_lossy().to_string())
569                    .collect()
570            } else {
571                task.outputs.clone()
572            };
573            let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
574            let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
575            let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
576
577            // Stage outputs into a temp dir for cache persistence
578            let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
579            if outputs_stage.exists() {
580                let _ = std::fs::remove_dir_all(&outputs_stage);
581            }
582            std::fs::create_dir_all(&outputs_stage).map_err(|e| Error::Io {
583                source: e,
584                path: Some(outputs_stage.clone().into()),
585                operation: "create_dir_all".into(),
586            })?;
587
588            for rel in &outputs {
589                let rel_for_project = project_prefix
590                    .as_ref()
591                    .and_then(|prefix| rel.strip_prefix(prefix).ok())
592                    .unwrap_or(rel)
593                    .to_path_buf();
594                let src = hermetic_root.join(rel);
595                // Intentionally avoid `let`-chains here to preserve readability
596                // and to align with review guidance; allow clippy's collapsible-if.
597                #[allow(clippy::collapsible_if)]
598                if let Ok(meta) = std::fs::metadata(&src) {
599                    if meta.is_file() {
600                        let dst = outputs_stage.join(&rel_for_project);
601                        if let Some(parent) = dst.parent() {
602                            std::fs::create_dir_all(parent).map_err(|e| Error::Io {
603                                source: e,
604                                path: Some(parent.into()),
605                                operation: "create_dir_all".into(),
606                            })?;
607                        }
608                        std::fs::copy(&src, &dst).map_err(|e| Error::Io {
609                            source: e,
610                            path: Some(dst.into()),
611                            operation: "copy".into(),
612                        })?;
613                        let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
614                        output_index.push(task_cache::OutputIndexEntry {
615                            rel_path: rel_for_project.to_string_lossy().to_string(),
616                            size: meta.len(),
617                            sha256: sha,
618                        });
619                    }
620                }
621            }
622
623            // Detect undeclared writes
624            let mut warned = false;
625            for entry in walkdir::WalkDir::new(&hermetic_root)
626                .into_iter()
627                .filter_map(|e| e.ok())
628            {
629                let p = entry.path();
630                if p.is_dir() {
631                    continue;
632                }
633                let rel = match p.strip_prefix(&hermetic_root) {
634                    Ok(r) => r.to_path_buf(),
635                    Err(_) => continue,
636                };
637                let rel_str = rel.to_string_lossy().to_string();
638                let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
639                let initial = initial_hashes.get(&rel_str);
640                let changed = match initial {
641                    None => true,
642                    Some(prev) => prev != &sha,
643                };
644                if changed && !outputs_set.contains(&rel) {
645                    if !warned {
646                        tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
647                        warned = true;
648                    }
649                    tracing::debug!(path = %rel_str, "Undeclared write");
650                }
651            }
652
653            // Persist cache entry on success
654            if success {
655                let meta = task_cache::TaskResultMeta {
656                    task_name: name.to_string(),
657                    command: task.command.clone(),
658                    args: task.args.clone(),
659                    env_summary,
660                    inputs_summary: inputs_summary.clone(),
661                    created_at: Utc::now(),
662                    cuenv_version,
663                    platform,
664                    duration_ms: duration.as_millis(),
665                    exit_code,
666                    cache_key_envelope: envelope_json.clone(),
667                    output_index,
668                };
669                let logs = task_cache::TaskLogs {
670                    // Persist logs regardless of capture setting so cache hits can
671                    // reproduce output faithfully.
672                    stdout: Some(stdout.clone()),
673                    stderr: Some(stderr.clone()),
674                };
675                let cache_span = tracing::info_span!("cache.save", key = %cache_key);
676                {
677                    let _g = cache_span.enter();
678                    task_cache::save_result(
679                        &cache_key,
680                        &meta,
681                        &outputs_stage,
682                        &hermetic_root,
683                        logs,
684                        self.config.cache_dir.as_deref(),
685                    )?;
686                }
687
688                // Record this task's cache key as the latest for this project
689                task_cache::record_latest(
690                    &self.config.project_root,
691                    name,
692                    &cache_key,
693                    self.config.cache_dir.as_deref(),
694                )?;
695
696                // Outputs remain in cache only - dependent tasks use inputsFrom to consume them
697            } else {
698                // Optionally persist logs in a failure/ subdir: not implemented for brevity
699            }
700
701            Ok(TaskResult {
702                name: name.to_string(),
703                exit_code: Some(exit_code),
704                stdout,
705                stderr,
706                success,
707            })
708        }
709    }
710
711    /// Execute a task non-hermetically (directly in workspace/project root)
712    ///
713    /// Used for tasks like `bun install` that need to write to the real filesystem.
714    async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
715        // Check if this is an unresolved TaskRef (should have been resolved before execution)
716        if task.is_task_ref() && task.project_root.is_none() {
717            return Err(Error::configuration(format!(
718                "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
719                 This usually means:\n\
720                 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
721                 - The referenced task '{}' doesn't exist in that project\n\
722                 - There was an error loading the referenced project's env.cue\n\
723                 Run with RUST_LOG=debug for more details.",
724                name,
725                task.task_ref.as_deref().unwrap_or("unknown"),
726                task.task_ref
727                    .as_deref()
728                    .and_then(|r| r.split(':').next_back())
729                    .unwrap_or("unknown")
730            )));
731        }
732
733        // Determine working directory (in priority order):
734        // 1. Explicit directory field on task (relative to cue.mod root)
735        // 2. TaskRef project_root (from resolution)
736        // 3. Source file directory (from _source metadata)
737        // 4. Install tasks (hermetic: false with workspaces) run from workspace root
738        // 5. Default to project root
739        let workdir = if let Some(ref dir) = task.directory {
740            // Explicit directory override: resolve relative to cue.mod root or project root
741            self.config
742                .cue_module_root
743                .as_ref()
744                .unwrap_or(&self.config.project_root)
745                .join(dir)
746        } else if let Some(ref project_root) = task.project_root {
747            // TaskRef tasks run in their original project directory
748            project_root.clone()
749        } else if let Some(ref source) = task.source {
750            // Default: run in the directory of the source file
751            if let Some(dir) = source.directory() {
752                self.config
753                    .cue_module_root
754                    .as_ref()
755                    .unwrap_or(&self.config.project_root)
756                    .join(dir)
757            } else {
758                // Source is at root (e.g., "env.cue"), use cue_module_root if available
759                // This ensures tasks defined in root env.cue run from module root,
760                // even when invoked from a subdirectory
761                self.config
762                    .cue_module_root
763                    .clone()
764                    .unwrap_or_else(|| self.config.project_root.clone())
765            }
766        } else if !task.hermetic && !task.workspaces.is_empty() {
767            // Find workspace root for install tasks
768            let workspace_name = &task.workspaces[0];
769            let manager = match workspace_name.as_str() {
770                "bun" => PackageManager::Bun,
771                "npm" => PackageManager::Npm,
772                "pnpm" => PackageManager::Pnpm,
773                "yarn" => PackageManager::YarnModern,
774                "cargo" => PackageManager::Cargo,
775                _ => PackageManager::Npm, // fallback
776            };
777            find_workspace_root(manager, &self.config.project_root)
778        } else {
779            self.config.project_root.clone()
780        };
781
782        tracing::info!(
783            task = %name,
784            workdir = %workdir.display(),
785            hermetic = false,
786            "Executing non-hermetic task"
787        );
788
789        // Emit command being run
790        let cmd_str = if let Some(script) = &task.script {
791            format!("[script: {} bytes]", script.len())
792        } else if task.command.is_empty() {
793            task.args.join(" ")
794        } else {
795            format!("{} {}", task.command, task.args.join(" "))
796        };
797        if !self.config.capture_output {
798            cuenv_events::emit_task_started!(name, cmd_str, false);
799        }
800
801        // Build command - handle script mode vs command mode
802        let mut cmd = if let Some(script) = &task.script {
803            // Script mode: use shell to execute the script
804            let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
805                (
806                    shell.command.clone().unwrap_or_else(|| "bash".to_string()),
807                    shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
808                )
809            } else {
810                // Default to bash for scripts
811                ("bash".to_string(), "-c".to_string())
812            };
813
814            let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
815            let mut cmd = Command::new(&resolved_shell);
816            cmd.arg(&shell_flag);
817            cmd.arg(script);
818            cmd
819        } else {
820            // Command mode: existing behavior
821            let resolved_command = self.config.environment.resolve_command(&task.command);
822
823            if let Some(shell) = &task.shell {
824                if shell.command.is_some() && shell.flag.is_some() {
825                    let shell_command = shell.command.as_ref().unwrap();
826                    let shell_flag = shell.flag.as_ref().unwrap();
827                    let resolved_shell = self.config.environment.resolve_command(shell_command);
828                    let mut cmd = Command::new(&resolved_shell);
829                    cmd.arg(shell_flag);
830                    if task.args.is_empty() {
831                        cmd.arg(&resolved_command);
832                    } else {
833                        let full_command = if task.command.is_empty() {
834                            task.args.join(" ")
835                        } else {
836                            format!("{} {}", resolved_command, task.args.join(" "))
837                        };
838                        cmd.arg(full_command);
839                    }
840                    cmd
841                } else {
842                    let mut cmd = Command::new(&resolved_command);
843                    for arg in &task.args {
844                        cmd.arg(arg);
845                    }
846                    cmd
847                }
848            } else {
849                let mut cmd = Command::new(&resolved_command);
850                for arg in &task.args {
851                    cmd.arg(arg);
852                }
853                cmd
854            }
855        };
856
857        // Set working directory and environment
858        cmd.current_dir(&workdir);
859        let env_vars = self.config.environment.merge_with_system();
860        for (k, v) in &env_vars {
861            cmd.env(k, v);
862        }
863
864        // Execute - always capture output for consistent behavior
865        // If not in capture mode, stream output to terminal in real-time
866        if self.config.capture_output {
867            let output = cmd
868                .stdout(Stdio::piped())
869                .stderr(Stdio::piped())
870                .output()
871                .await
872                .map_err(|e| Error::Io {
873                    source: e,
874                    path: None,
875                    operation: format!("spawn task {}", name),
876                })?;
877
878            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
879            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
880            let exit_code = output.status.code().unwrap_or(-1);
881            let success = output.status.success();
882
883            if !success {
884                tracing::warn!(task = %name, exit = exit_code, "Task failed");
885                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
886                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
887            }
888
889            Ok(TaskResult {
890                name: name.to_string(),
891                exit_code: Some(exit_code),
892                stdout,
893                stderr,
894                success,
895            })
896        } else {
897            // Stream output directly to terminal (interactive mode)
898            let status = cmd
899                .stdout(Stdio::inherit())
900                .stderr(Stdio::inherit())
901                .status()
902                .await
903                .map_err(|e| Error::Io {
904                    source: e,
905                    path: None,
906                    operation: format!("spawn task {}", name),
907                })?;
908
909            let exit_code = status.code().unwrap_or(-1);
910            let success = status.success();
911
912            if !success {
913                tracing::warn!(task = %name, exit = exit_code, "Task failed");
914            }
915
916            Ok(TaskResult {
917                name: name.to_string(),
918                exit_code: Some(exit_code),
919                stdout: String::new(), // Output went to terminal
920                stderr: String::new(),
921                success,
922            })
923        }
924    }
925
926    /// Execute a task definition (single task or group)
927    #[async_recursion]
928    pub async fn execute_definition(
929        &self,
930        name: &str,
931        definition: &TaskDefinition,
932        all_tasks: &Tasks,
933    ) -> Result<Vec<TaskResult>> {
934        match definition {
935            TaskDefinition::Single(task) => {
936                let result = self.execute_task(name, task.as_ref()).await?;
937                Ok(vec![result])
938            }
939            TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
940        }
941    }
942
943    async fn execute_group(
944        &self,
945        prefix: &str,
946        group: &TaskGroup,
947        all_tasks: &Tasks,
948    ) -> Result<Vec<TaskResult>> {
949        match group {
950            TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
951            TaskGroup::Parallel(group) => self.execute_parallel(prefix, group, all_tasks).await,
952        }
953    }
954
955    async fn execute_sequential(
956        &self,
957        prefix: &str,
958        tasks: &[TaskDefinition],
959        all_tasks: &Tasks,
960    ) -> Result<Vec<TaskResult>> {
961        if !self.config.capture_output {
962            cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
963        }
964        let mut results = Vec::new();
965        for (i, task_def) in tasks.iter().enumerate() {
966            let task_name = format!("{}[{}]", prefix, i);
967            let task_results = self
968                .execute_definition(&task_name, task_def, all_tasks)
969                .await?;
970            for result in &task_results {
971                if !result.success {
972                    let message = format!(
973                        "Sequential task group '{prefix}' halted.\n\n{}",
974                        summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
975                    );
976                    return Err(Error::configuration(message));
977                }
978            }
979            results.extend(task_results);
980        }
981        Ok(results)
982    }
983
984    async fn execute_parallel(
985        &self,
986        prefix: &str,
987        group: &ParallelGroup,
988        all_tasks: &Tasks,
989    ) -> Result<Vec<TaskResult>> {
990        // Check for "default" task to override parallel execution
991        if let Some(default_task) = group.tasks.get("default") {
992            if !self.config.capture_output {
993                cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
994            }
995            // Execute only the default task, using the group prefix directly
996            // since "default" is implicit when invoking the group name
997            let task_name = format!("{}.default", prefix);
998            return self
999                .execute_definition(&task_name, default_task, all_tasks)
1000                .await;
1001        }
1002
1003        if !self.config.capture_output {
1004            cuenv_events::emit_task_group_started!(prefix, false, group.tasks.len());
1005        }
1006        let mut join_set = JoinSet::new();
1007        let all_tasks = Arc::new(all_tasks.clone());
1008        let mut all_results = Vec::new();
1009        let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
1010            if let Some(failed) = results.iter().find(|r| !r.success) {
1011                let message = format!(
1012                    "Parallel task group '{prefix}' halted.\n\n{}",
1013                    summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
1014                );
1015                return Err(Error::configuration(message));
1016            }
1017            all_results.extend(results);
1018            Ok(())
1019        };
1020        for (name, task_def) in &group.tasks {
1021            let task_name = format!("{}.{}", prefix, name);
1022            let task_def = task_def.clone();
1023            let all_tasks = Arc::clone(&all_tasks);
1024            let executor = self.clone_with_config();
1025            join_set.spawn(async move {
1026                executor
1027                    .execute_definition(&task_name, &task_def, &all_tasks)
1028                    .await
1029            });
1030            if self.config.max_parallel > 0
1031                && join_set.len() >= self.config.max_parallel
1032                && let Some(result) = join_set.join_next().await
1033            {
1034                match result {
1035                    Ok(Ok(results)) => merge_results(results)?,
1036                    Ok(Err(e)) => return Err(e),
1037                    Err(e) => {
1038                        return Err(Error::configuration(format!(
1039                            "Task execution panicked: {}",
1040                            e
1041                        )));
1042                    }
1043                }
1044            }
1045        }
1046        while let Some(result) = join_set.join_next().await {
1047            match result {
1048                Ok(Ok(results)) => merge_results(results)?,
1049                Ok(Err(e)) => return Err(e),
1050                Err(e) => {
1051                    return Err(Error::configuration(format!(
1052                        "Task execution panicked: {}",
1053                        e
1054                    )));
1055                }
1056            }
1057        }
1058        Ok(all_results)
1059    }
1060
1061    pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
1062        let parallel_groups = graph.get_parallel_groups()?;
1063        let mut all_results = Vec::new();
1064
1065        // IMPORTANT:
1066        // Each parallel group represents a dependency "level". We must not start tasks from the
1067        // next group until *all* tasks from the current group have completed successfully.
1068        //
1069        // The previous implementation pipelined groups (starting the next group as soon as all
1070        // tasks from the current group were spawned), which allowed dependent tasks to run before
1071        // their dependencies finished (especially visible with long-running tasks like dev servers).
1072        for mut group in parallel_groups {
1073            let mut join_set = JoinSet::new();
1074
1075            while !group.is_empty() || !join_set.is_empty() {
1076                // Fill the concurrency window for this group
1077                while let Some(node) = group.pop() {
1078                    let task = node.task.clone();
1079                    let name = node.name.clone();
1080                    let executor = self.clone_with_config();
1081                    join_set.spawn(async move { executor.execute_task(&name, &task).await });
1082
1083                    if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
1084                        break;
1085                    }
1086                }
1087
1088                if let Some(result) = join_set.join_next().await {
1089                    match result {
1090                        Ok(Ok(task_result)) => {
1091                            if !task_result.success {
1092                                join_set.abort_all();
1093                                let message = format!(
1094                                    "Task graph execution halted.\n\n{}",
1095                                    summarize_task_failure(
1096                                        &task_result,
1097                                        TASK_FAILURE_SNIPPET_LINES,
1098                                    )
1099                                );
1100                                return Err(Error::configuration(message));
1101                            }
1102                            all_results.push(task_result);
1103                        }
1104                        Ok(Err(e)) => {
1105                            join_set.abort_all();
1106                            return Err(e);
1107                        }
1108                        Err(e) => {
1109                            join_set.abort_all();
1110                            return Err(Error::configuration(format!(
1111                                "Task execution panicked: {}",
1112                                e
1113                            )));
1114                        }
1115                    }
1116                }
1117            }
1118        }
1119
1120        Ok(all_results)
1121    }
1122
1123    async fn resolve_workspace(
1124        &self,
1125        _task_name: &str,
1126        task: &Task,
1127        workspace_name: &str,
1128        config: &WorkspaceConfig,
1129    ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
1130        let root = self.config.project_root.clone();
1131        let task_label = _task_name.to_string();
1132        let command = task.command.clone();
1133        let config_pm = config.package_manager.clone();
1134        let config_root = config.root.clone();
1135        // WorkspaceConfig doesn't support 'packages' filtering yet, so assume full workspace deps for now,
1136        // or imply it from context. If we want package filtering, we need to add it to WorkspaceConfig
1137        // or assume all deps.
1138        // However, the previous logic used `packages` from WorkspaceInputs.
1139        // For now, let's assume traversing all dependencies if not specified.
1140        // But wait, `WorkspaceConfig` in schema doesn't have `packages`.
1141        // So we probably default to "all relevant" or auto-infer from current directory if we are inside a package.
1142        let mut packages = Vec::new();
1143
1144        // If we are in a subdirectory that matches a workspace member, we might want to infer that package.
1145        // The previous logic did that if `packages` was empty.
1146
1147        let lockfile_override: Option<String> = None; // WorkspaceConfig doesn't have lockfile override yet.
1148        // If we need it, we should add it to WorkspaceConfig. For now assume standard discovery.
1149
1150        let mut traverse_workspace_deps = true;
1151
1152        // Use spawn_blocking for heavy lifting
1153        let workspace_name_owned = workspace_name.to_string();
1154        tokio::task::spawn_blocking(move || {
1155            let override_for_detection = lockfile_override.as_ref().map(|lock| {
1156                let candidate = PathBuf::from(lock);
1157                if candidate.is_absolute() {
1158                    candidate
1159                } else {
1160                    root.join(lock)
1161                }
1162            });
1163
1164            // 1. Detect Package Manager
1165            // Priority: 1. explicit config, 2. workspace name (if it matches a PM), 3. auto-detect
1166            let manager = if let Some(pm_str) = config_pm {
1167                match pm_str.as_str() {
1168                    "npm" => PackageManager::Npm,
1169                    "bun" => PackageManager::Bun,
1170                    "pnpm" => PackageManager::Pnpm,
1171                    "yarn" => PackageManager::YarnModern,
1172                    "yarn-classic" => PackageManager::YarnClassic,
1173                    "cargo" => PackageManager::Cargo,
1174                    _ => {
1175                        return Err(Error::configuration(format!(
1176                            "Unknown package manager: {}",
1177                            pm_str
1178                        )));
1179                    }
1180                }
1181            } else {
1182                // Try to match workspace name to package manager
1183                match workspace_name_owned.as_str() {
1184                    "npm" => PackageManager::Npm,
1185                    "bun" => PackageManager::Bun,
1186                    "pnpm" => PackageManager::Pnpm,
1187                    "yarn" => PackageManager::YarnModern,
1188                    "cargo" => PackageManager::Cargo,
1189                    _ => {
1190                         // Auto-detect if name doesn't match
1191                        // Priority: command hint -> lockfiles
1192                        let hint = detect_from_command(&command);
1193                        let detected = match detect_package_managers(&root) {
1194                            Ok(list) => list,
1195                            Err(e) => {
1196                                if override_for_detection.is_some() {
1197                                    Vec::new()
1198                                } else {
1199                                    return Err(Error::configuration(format!(
1200                                        "Failed to detect package managers: {}",
1201                                        e
1202                                    )));
1203                                }
1204                            }
1205                        };
1206
1207                        if let Some(h) = hint {
1208                            if detected.contains(&h) {
1209                                h
1210                            } else if !detected.is_empty() {
1211                                // Prefer detected files
1212                                detected[0]
1213                            } else if let Some(ref override_path) = override_for_detection {
1214                                infer_manager_from_lockfile(override_path).ok_or_else(|| {
1215                                    Error::configuration(
1216                                        "Unable to infer package manager from lockfile override",
1217                                    )
1218                                })?
1219                            } else {
1220                                return Err(Error::configuration(
1221                                    format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
1222                                ));
1223                            }
1224                        } else if !detected.is_empty() {
1225                            detected[0]
1226                        } else if let Some(ref override_path) = override_for_detection {
1227                            infer_manager_from_lockfile(override_path).ok_or_else(|| {
1228                                Error::configuration(
1229                                    "Unable to infer package manager from lockfile override",
1230                                )
1231                            })?
1232                        } else {
1233                            return Err(Error::configuration(
1234                                "Could not detect package manager for workspace resolution",
1235                            ));
1236                        }
1237                    }
1238                }
1239            };
1240
1241            // Resolve the actual workspace root by walking up until we find a
1242            // directory that declares a workspace for this package manager.
1243            // If config.root is set, use that.
1244            let workspace_root = if let Some(root_override) = &config_root {
1245                root.join(root_override)
1246            } else {
1247                find_workspace_root(manager, &root)
1248            };
1249
1250            if task_trace_enabled() {
1251                tracing::info!(
1252                    task = %task_label,
1253                    manager = %manager,
1254                    project_root = %root.display(),
1255                    workspace_root = %workspace_root.display(),
1256                    "Resolved workspace root for package manager"
1257                );
1258            }
1259
1260            let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
1261                let candidate = PathBuf::from(lock);
1262                if candidate.is_absolute() {
1263                    candidate
1264                } else {
1265                    workspace_root.join(lock)
1266                }
1267            });
1268
1269            // 2. Discover Workspace
1270            let discovery: Box<dyn WorkspaceDiscovery> = match manager {
1271                PackageManager::Npm
1272                | PackageManager::Bun
1273                | PackageManager::YarnClassic
1274                | PackageManager::YarnModern => Box::new(PackageJsonDiscovery),
1275                PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
1276                PackageManager::Cargo => Box::new(CargoTomlDiscovery),
1277            };
1278
1279            let workspace = discovery.discover(&workspace_root).map_err(|e| {
1280                Error::configuration(format!("Failed to discover workspace: {}", e))
1281            })?;
1282
1283            // 3. Parse Lockfile
1284            let lockfile_path = if let Some(path) = lockfile_override_path {
1285                if !path.exists() {
1286                    return Err(Error::configuration(format!(
1287                        "Workspace lockfile override does not exist: {}",
1288                        path.display()
1289                    )));
1290                }
1291                path
1292            } else {
1293                workspace.lockfile.clone().ok_or_else(|| {
1294                    Error::configuration("Workspace resolution requires a lockfile")
1295                })?
1296            };
1297
1298            let parser: Box<dyn LockfileParser> = match manager {
1299                PackageManager::Npm => Box::new(NpmLockfileParser),
1300                PackageManager::Bun => Box::new(BunLockfileParser),
1301                PackageManager::Pnpm => Box::new(PnpmLockfileParser),
1302                PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
1303                PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
1304                PackageManager::Cargo => Box::new(CargoLockfileParser),
1305            };
1306
1307            let entries = parser
1308                .parse(&lockfile_path)
1309                .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
1310            if task_trace_enabled() {
1311                tracing::info!(
1312                    task = %task_label,
1313                    lockfile = %lockfile_path.display(),
1314                    members = entries.len(),
1315                    "Parsed workspace lockfile"
1316                );
1317            }
1318
1319            // Compute lockfile hash
1320            let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
1321
1322            // Infer packages when none explicitly provided by scoping to the
1323            // current workspace member. (We intentionally avoid pulling all
1324            // transitive deps here to keep hashing fast for large monorepos.)
1325            if packages.is_empty() {
1326                let current_member = workspace
1327                    .members
1328                    .iter()
1329                    .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
1330                if let Some(member) = current_member {
1331                    let inferred = vec![member.name.clone()];
1332                    if task_trace_enabled() {
1333                        tracing::info!(
1334                            task = %task_label,
1335                            inferred_packages = ?inferred,
1336                            "Inferred workspace packages from current project"
1337                        );
1338                    }
1339                    packages = inferred;
1340                    traverse_workspace_deps = true;
1341                }
1342            }
1343
1344            // 4. Collect Inputs
1345            let mut member_paths = Vec::new();
1346
1347            // Always include workspace configuration files
1348            member_paths.push(manager.workspace_config_name().to_string());
1349            if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
1350                member_paths.push(rel.to_string_lossy().to_string());
1351            } else {
1352                member_paths.push(lockfile_path.to_string_lossy().to_string());
1353            }
1354
1355            if packages.is_empty() {
1356                for member in &workspace.members {
1357                    let manifest_rel = member
1358                        .path
1359                        .join(manager.workspace_config_name());
1360                    member_paths.push(manifest_rel.to_string_lossy().to_string());
1361                }
1362            } else {
1363                let mut to_visit: Vec<String> = packages.clone();
1364                let mut visited = HashSet::new();
1365
1366                while let Some(pkg_name) = to_visit.pop() {
1367                    if visited.contains(&pkg_name) {
1368                        continue;
1369                    }
1370                    visited.insert(pkg_name.clone());
1371
1372                    if let Some(member) = workspace.find_member(&pkg_name) {
1373                        let manifest_rel = member
1374                            .path
1375                            .join(manager.workspace_config_name());
1376                        member_paths.push(manifest_rel.to_string_lossy().to_string());
1377
1378                        // Add dependencies when explicitly requested
1379                        if traverse_workspace_deps {
1380                            let mut dependency_candidates: HashSet<String> = HashSet::new();
1381
1382                            if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1383                                for dep in &entry.dependencies {
1384                                    if entries
1385                                        .iter()
1386                                        .any(|e| e.name == dep.name && e.is_workspace_member)
1387                                    {
1388                                        dependency_candidates.insert(dep.name.clone());
1389                                    }
1390                                }
1391                            }
1392
1393                            for dep_name in &member.dependencies {
1394                                if workspace.find_member(dep_name).is_some() {
1395                                    dependency_candidates.insert(dep_name.clone());
1396                                }
1397                            }
1398
1399                            for dep_name in dependency_candidates {
1400                                to_visit.push(dep_name);
1401                            }
1402                        }
1403                    }
1404                }
1405            }
1406
1407            if task_trace_enabled() {
1408                tracing::info!(
1409                    task = %task_label,
1410                    members = ?member_paths,
1411                    "Workspace input member paths selected"
1412                );
1413            }
1414
1415            Ok((workspace, entries, member_paths, Some(hash)))
1416        })
1417        .await
1418        .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1419    }
1420
1421    async fn materialize_workspace(
1422        &self,
1423        workspace: &Workspace,
1424        entries: &[LockfileEntry],
1425        target_dir: &Path,
1426    ) -> Result<()> {
1427        // Dispatch to appropriate materializer
1428        let materializer: Box<dyn Materializer> = match workspace.manager {
1429            PackageManager::Npm
1430            | PackageManager::Bun
1431            | PackageManager::Pnpm
1432            | PackageManager::YarnClassic
1433            | PackageManager::YarnModern => Box::new(NodeModulesMaterializer),
1434            PackageManager::Cargo => Box::new(CargoMaterializer),
1435        };
1436
1437        materializer
1438            .materialize(workspace, entries, target_dir)
1439            .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1440    }
1441
1442    fn clone_with_config(&self) -> Self {
1443        // Share the backend across clones to preserve container cache for Dagger chaining
1444        Self::with_shared_backend(self.config.clone(), self.backend.clone())
1445    }
1446}
1447
1448fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1449    let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1450
1451    loop {
1452        let is_root = match manager {
1453            PackageManager::Npm
1454            | PackageManager::Bun
1455            | PackageManager::YarnClassic
1456            | PackageManager::YarnModern => package_json_has_workspaces(&current),
1457            PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1458            PackageManager::Cargo => cargo_toml_has_workspace(&current),
1459        };
1460
1461        if is_root {
1462            return current;
1463        }
1464
1465        if let Some(parent) = current.parent() {
1466            current = parent.to_path_buf();
1467        } else {
1468            return start.to_path_buf();
1469        }
1470    }
1471}
1472
1473fn package_json_has_workspaces(dir: &Path) -> bool {
1474    let path = dir.join("package.json");
1475    let content = std::fs::read_to_string(&path);
1476    let Ok(json) = content.and_then(|s| {
1477        serde_json::from_str::<serde_json::Value>(&s)
1478            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1479    }) else {
1480        return false;
1481    };
1482
1483    match json.get("workspaces") {
1484        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1485        Some(serde_json::Value::Object(map)) => map
1486            .get("packages")
1487            .and_then(|packages| packages.as_array())
1488            .map(|arr| !arr.is_empty())
1489            .unwrap_or(false),
1490        _ => false,
1491    }
1492}
1493
1494fn cargo_toml_has_workspace(dir: &Path) -> bool {
1495    let path = dir.join("Cargo.toml");
1496    let Ok(content) = std::fs::read_to_string(&path) else {
1497        return false;
1498    };
1499
1500    content.contains("[workspace]")
1501}
1502
1503fn task_trace_enabled() -> bool {
1504    static ENABLED: OnceLock<bool> = OnceLock::new();
1505    *ENABLED.get_or_init(|| {
1506        matches!(
1507            std::env::var("CUENV_TRACE_TASKS")
1508                .unwrap_or_default()
1509                .trim()
1510                .to_ascii_lowercase()
1511                .as_str(),
1512            "1" | "true" | "yes" | "on"
1513        )
1514    })
1515}
1516
1517/// Build a compact, user-friendly summary for a failed task, including the
1518/// exit code and the tail of stdout/stderr to help with diagnostics.
1519pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1520    let exit_code = result
1521        .exit_code
1522        .map(|c| c.to_string())
1523        .unwrap_or_else(|| "unknown".to_string());
1524
1525    let mut sections = Vec::new();
1526    sections.push(format!(
1527        "Task '{}' failed with exit code {}.",
1528        result.name, exit_code
1529    ));
1530
1531    let output = format_failure_streams(result, max_output_lines);
1532    if output.is_empty() {
1533        sections.push(
1534            "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1535                .to_string(),
1536        );
1537    } else {
1538        sections.push(output);
1539    }
1540
1541    sections.join("\n\n")
1542}
1543
1544fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1545    let mut streams = Vec::new();
1546
1547    if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1548        streams.push(stdout);
1549    }
1550
1551    if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1552        streams.push(stderr);
1553    }
1554
1555    streams.join("\n\n")
1556}
1557
1558fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1559    let normalized = content.trim_end();
1560    if normalized.is_empty() {
1561        return None;
1562    }
1563
1564    let lines: Vec<&str> = normalized.lines().collect();
1565    let total = lines.len();
1566    let start = total.saturating_sub(max_output_lines);
1567    let snippet = lines[start..].join("\n");
1568
1569    let header = if total > max_output_lines {
1570        format!("{label} (last {max_output_lines} of {total} lines):")
1571    } else {
1572        format!("{label}:")
1573    };
1574
1575    Some(format!("{header}\n{snippet}"))
1576}
1577
1578fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1579    match path.file_name().and_then(|n| n.to_str())? {
1580        "package-lock.json" => Some(PackageManager::Npm),
1581        "bun.lock" => Some(PackageManager::Bun),
1582        "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1583        "yarn.lock" => Some(PackageManager::YarnModern),
1584        "Cargo.lock" => Some(PackageManager::Cargo),
1585        _ => None,
1586    }
1587}
1588
1589fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1590    // Use OS temp dir; name scoped by task and cache key prefix.
1591    // IMPORTANT: Ensure the workdir is clean on every run to preserve hermeticity.
1592    let sanitized_task = task_name
1593        .chars()
1594        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1595        .collect::<String>();
1596
1597    let base = std::env::temp_dir().join(format!(
1598        "cuenv-work-{}-{}",
1599        sanitized_task,
1600        &key[..12.min(key.len())]
1601    ));
1602
1603    // If a directory from a previous run exists, remove it before reuse.
1604    // This avoids contamination from artifacts left by failed runs where no cache was saved.
1605    if base.exists()
1606        && let Err(e) = std::fs::remove_dir_all(&base)
1607    {
1608        // If we cannot remove the previous directory (e.g. in-use on Windows),
1609        // fall back to a unique, fresh directory to maintain hermetic execution.
1610        let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1611        let fallback = std::env::temp_dir().join(format!(
1612            "cuenv-work-{}-{}-{}",
1613            sanitized_task,
1614            &key[..12.min(key.len())],
1615            ts
1616        ));
1617        tracing::warn!(
1618            previous = %base.display(),
1619            fallback = %fallback.display(),
1620            error = %e,
1621            "Failed to clean previous hermetic workdir; using fresh fallback directory"
1622        );
1623        std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1624            source: e,
1625            path: Some(fallback.clone().into()),
1626            operation: "create_dir_all".into(),
1627        })?;
1628        return Ok(fallback);
1629    }
1630
1631    std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1632        source: e,
1633        path: Some(base.clone().into()),
1634        operation: "create_dir_all".into(),
1635    })?;
1636    Ok(base)
1637}
1638
1639/// Execute an arbitrary command with the cuenv environment
1640pub async fn execute_command(
1641    command: &str,
1642    args: &[String],
1643    environment: &Environment,
1644) -> Result<i32> {
1645    tracing::info!("Executing command: {} {:?}", command, args);
1646    let mut cmd = Command::new(command);
1647    cmd.args(args);
1648    let env_vars = environment.merge_with_system();
1649    for (key, value) in env_vars {
1650        cmd.env(key, value);
1651    }
1652    cmd.stdout(Stdio::inherit());
1653    cmd.stderr(Stdio::inherit());
1654    cmd.stdin(Stdio::inherit());
1655    let status = cmd.status().await.map_err(|e| {
1656        Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1657    })?;
1658    Ok(status.code().unwrap_or(1))
1659}
1660
1661#[cfg(test)]
1662mod tests {
1663    use super::*;
1664    use crate::tasks::Input;
1665    use std::fs;
1666    use tempfile::TempDir;
1667
1668    #[tokio::test]
1669    async fn test_executor_config_default() {
1670        let config = ExecutorConfig::default();
1671        assert!(!config.capture_output);
1672        assert_eq!(config.max_parallel, 0);
1673        assert!(config.environment.is_empty());
1674    }
1675
1676    #[tokio::test]
1677    async fn test_task_result() {
1678        let result = TaskResult {
1679            name: "test".to_string(),
1680            exit_code: Some(0),
1681            stdout: "output".to_string(),
1682            stderr: String::new(),
1683            success: true,
1684        };
1685        assert_eq!(result.name, "test");
1686        assert_eq!(result.exit_code, Some(0));
1687        assert!(result.success);
1688        assert_eq!(result.stdout, "output");
1689    }
1690
1691    #[tokio::test]
1692    async fn test_execute_simple_task() {
1693        let config = ExecutorConfig {
1694            capture_output: true,
1695            ..Default::default()
1696        };
1697        let executor = TaskExecutor::new(config);
1698        let task = Task {
1699            command: "echo".to_string(),
1700            args: vec!["hello".to_string()],
1701            description: Some("Hello task".to_string()),
1702            ..Default::default()
1703        };
1704        let result = executor.execute_task("test", &task).await.unwrap();
1705        assert!(result.success);
1706        assert_eq!(result.exit_code, Some(0));
1707        assert!(result.stdout.contains("hello"));
1708    }
1709
1710    #[tokio::test]
1711    async fn test_execute_with_environment() {
1712        let mut config = ExecutorConfig {
1713            capture_output: true,
1714            ..Default::default()
1715        };
1716        config
1717            .environment
1718            .set("TEST_VAR".to_string(), "test_value".to_string());
1719        let executor = TaskExecutor::new(config);
1720        let task = Task {
1721            command: "printenv".to_string(),
1722            args: vec!["TEST_VAR".to_string()],
1723            description: Some("Print env task".to_string()),
1724            ..Default::default()
1725        };
1726        let result = executor.execute_task("test", &task).await.unwrap();
1727        assert!(result.success);
1728        assert!(result.stdout.contains("test_value"));
1729    }
1730
1731    #[tokio::test]
1732    async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1733        let tmp = TempDir::new().unwrap();
1734        let root = tmp.path();
1735
1736        // Workspace root with workspaces + lockfile
1737        fs::write(
1738            root.join("package.json"),
1739            r#"{
1740  "name": "root-app",
1741  "version": "0.0.0",
1742  "workspaces": ["packages/*", "apps/*"],
1743  "dependencies": {
1744    "@rawkodeacademy/content-technologies": "workspace:*"
1745  }
1746}"#,
1747        )
1748        .unwrap();
1749        // Deliberately omit the workspace member name for apps/site to mimic lockfiles
1750        // that only record member paths, ensuring we can still discover dependencies.
1751        fs::write(
1752            root.join("bun.lock"),
1753            r#"{
1754  "lockfileVersion": 1,
1755  "workspaces": {
1756    "": {
1757      "name": "root-app",
1758      "dependencies": {
1759        "@rawkodeacademy/content-technologies": "workspace:*"
1760      }
1761    },
1762    "packages/content-technologies": {
1763      "name": "@rawkodeacademy/content-technologies",
1764      "version": "0.0.1"
1765    },
1766    "apps/site": {
1767      "version": "0.0.0",
1768      "dependencies": {
1769        "@rawkodeacademy/content-technologies": "workspace:*"
1770      }
1771    }
1772  },
1773  "packages": {}
1774}"#,
1775        )
1776        .unwrap();
1777
1778        // Workspace member packages
1779        fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1780        fs::write(
1781            root.join("packages/content-technologies/package.json"),
1782            r#"{
1783  "name": "@rawkodeacademy/content-technologies",
1784  "version": "0.0.1"
1785}"#,
1786        )
1787        .unwrap();
1788
1789        fs::create_dir_all(root.join("apps/site")).unwrap();
1790        fs::write(
1791            root.join("apps/site/package.json"),
1792            r#"{
1793  "name": "site",
1794  "version": "0.0.0",
1795  "dependencies": {
1796    "@rawkodeacademy/content-technologies": "workspace:*"
1797  }
1798}"#,
1799        )
1800        .unwrap();
1801
1802        let mut workspaces = HashMap::new();
1803        workspaces.insert(
1804            "bun".to_string(),
1805            WorkspaceConfig {
1806                enabled: true,
1807                package_manager: Some("bun".to_string()),
1808                root: None,
1809                hooks: None,
1810            },
1811        );
1812
1813        let config = ExecutorConfig {
1814            capture_output: true,
1815            project_root: root.join("apps/site"),
1816            workspaces: Some(workspaces),
1817            ..Default::default()
1818        };
1819        let executor = TaskExecutor::new(config);
1820
1821        let task = Task {
1822            command: "sh".to_string(),
1823            args: vec![
1824                "-c".to_string(),
1825                "find ../.. -maxdepth 4 -type d | sort".to_string(),
1826            ],
1827            inputs: vec![Input::Path("package.json".to_string())],
1828            workspaces: vec!["bun".to_string()],
1829            ..Default::default()
1830        };
1831
1832        let result = executor.execute_task("install", &task).await.unwrap();
1833        assert!(
1834            result.success,
1835            "command failed stdout='{}' stderr='{}'",
1836            result.stdout, result.stderr
1837        );
1838        assert!(
1839            result
1840                .stdout
1841                .split_whitespace()
1842                .any(|line| line.ends_with("packages/content-technologies")),
1843            "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1844            result.stdout,
1845            result.stderr
1846        );
1847    }
1848
1849    #[tokio::test]
1850    async fn test_execute_failing_task() {
1851        let config = ExecutorConfig {
1852            capture_output: true,
1853            ..Default::default()
1854        };
1855        let executor = TaskExecutor::new(config);
1856        let task = Task {
1857            command: "false".to_string(),
1858            description: Some("Failing task".to_string()),
1859            ..Default::default()
1860        };
1861        let result = executor.execute_task("test", &task).await.unwrap();
1862        assert!(!result.success);
1863        assert_eq!(result.exit_code, Some(1));
1864    }
1865
1866    #[tokio::test]
1867    async fn test_execute_sequential_group() {
1868        let config = ExecutorConfig {
1869            capture_output: true,
1870            ..Default::default()
1871        };
1872        let executor = TaskExecutor::new(config);
1873        let task1 = Task {
1874            command: "echo".to_string(),
1875            args: vec!["first".to_string()],
1876            description: Some("First task".to_string()),
1877            ..Default::default()
1878        };
1879        let task2 = Task {
1880            command: "echo".to_string(),
1881            args: vec!["second".to_string()],
1882            description: Some("Second task".to_string()),
1883            ..Default::default()
1884        };
1885        let group = TaskGroup::Sequential(vec![
1886            TaskDefinition::Single(Box::new(task1)),
1887            TaskDefinition::Single(Box::new(task2)),
1888        ]);
1889        let all_tasks = Tasks::new();
1890        let results = executor
1891            .execute_group("seq", &group, &all_tasks)
1892            .await
1893            .unwrap();
1894        assert_eq!(results.len(), 2);
1895        assert!(results[0].stdout.contains("first"));
1896        assert!(results[1].stdout.contains("second"));
1897    }
1898
1899    #[tokio::test]
1900    async fn test_command_injection_prevention() {
1901        let config = ExecutorConfig {
1902            capture_output: true,
1903            ..Default::default()
1904        };
1905        let executor = TaskExecutor::new(config);
1906        let malicious_task = Task {
1907            command: "echo".to_string(),
1908            args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1909            description: Some("Malicious task test".to_string()),
1910            ..Default::default()
1911        };
1912        let result = executor
1913            .execute_task("malicious", &malicious_task)
1914            .await
1915            .unwrap();
1916        assert!(result.success);
1917        assert!(result.stdout.contains("hello ; rm -rf /"));
1918    }
1919
1920    #[tokio::test]
1921    async fn test_special_characters_in_args() {
1922        let config = ExecutorConfig {
1923            capture_output: true,
1924            ..Default::default()
1925        };
1926        let executor = TaskExecutor::new(config);
1927        let special_chars = vec![
1928            "$USER",
1929            "$(whoami)",
1930            "`whoami`",
1931            "&& echo hacked",
1932            "|| echo failed",
1933            "> /tmp/hack",
1934            "| cat",
1935        ];
1936        for special_arg in special_chars {
1937            let task = Task {
1938                command: "echo".to_string(),
1939                args: vec!["safe".to_string(), special_arg.to_string()],
1940                description: Some("Special character test".to_string()),
1941                ..Default::default()
1942            };
1943            let result = executor.execute_task("special", &task).await.unwrap();
1944            assert!(result.success);
1945            assert!(result.stdout.contains("safe"));
1946            assert!(result.stdout.contains(special_arg));
1947        }
1948    }
1949
1950    #[tokio::test]
1951    async fn test_environment_variable_safety() {
1952        let mut config = ExecutorConfig {
1953            capture_output: true,
1954            ..Default::default()
1955        };
1956        config
1957            .environment
1958            .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1959        let executor = TaskExecutor::new(config);
1960        let task = Task {
1961            command: "printenv".to_string(),
1962            args: vec!["DANGEROUS_VAR".to_string()],
1963            description: Some("Environment variable safety test".to_string()),
1964            ..Default::default()
1965        };
1966        let result = executor.execute_task("env_test", &task).await.unwrap();
1967        assert!(result.success);
1968        assert!(result.stdout.contains("; rm -rf /"));
1969    }
1970
1971    #[tokio::test]
1972    async fn test_execute_graph_parallel_groups() {
1973        // two independent tasks -> can run in same parallel group
1974        let config = ExecutorConfig {
1975            capture_output: true,
1976            max_parallel: 2,
1977            ..Default::default()
1978        };
1979        let executor = TaskExecutor::new(config);
1980        let mut graph = TaskGraph::new();
1981
1982        let t1 = Task {
1983            command: "echo".into(),
1984            args: vec!["A".into()],
1985            ..Default::default()
1986        };
1987        let t2 = Task {
1988            command: "echo".into(),
1989            args: vec!["B".into()],
1990            ..Default::default()
1991        };
1992
1993        graph.add_task("t1", t1).unwrap();
1994        graph.add_task("t2", t2).unwrap();
1995        let results = executor.execute_graph(&graph).await.unwrap();
1996        assert_eq!(results.len(), 2);
1997        let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1998        assert!(joined.contains("A") && joined.contains("B"));
1999    }
2000
2001    #[tokio::test]
2002    async fn test_execute_graph_respects_dependency_levels() {
2003        let tmp = TempDir::new().unwrap();
2004        let root = tmp.path();
2005
2006        let config = ExecutorConfig {
2007            capture_output: true,
2008            max_parallel: 2,
2009            project_root: root.to_path_buf(),
2010            ..Default::default()
2011        };
2012        let executor = TaskExecutor::new(config);
2013
2014        let mut tasks = Tasks::new();
2015        tasks.tasks.insert(
2016            "dep".into(),
2017            TaskDefinition::Single(Box::new(Task {
2018                command: "sh".into(),
2019                args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
2020                ..Default::default()
2021            })),
2022        );
2023        tasks.tasks.insert(
2024            "consumer".into(),
2025            TaskDefinition::Single(Box::new(Task {
2026                command: "sh".into(),
2027                args: vec!["-c".into(), "cat marker.txt".into()],
2028                depends_on: vec!["dep".into()],
2029                ..Default::default()
2030            })),
2031        );
2032
2033        let mut graph = TaskGraph::new();
2034        graph.build_for_task("consumer", &tasks).unwrap();
2035
2036        let results = executor.execute_graph(&graph).await.unwrap();
2037        assert_eq!(results.len(), 2);
2038
2039        let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
2040        assert!(consumer.success);
2041        assert!(consumer.stdout.contains("ok"));
2042    }
2043}