cuenv_core/tasks/
executor.rs

1//! Task executor for running tasks with environment support and hermetic, input-addressed execution
2//!
3//! - Environment variable propagation
4//! - Parallel and sequential execution
5//! - Hermetic workdir populated from declared inputs (files/dirs/globs)
6//! - Persistent task result cache keyed by inputs + command + env + cuenv version + platform
7
8use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
9use super::{Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
10use crate::cache::tasks as task_cache;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::manifest::WorkspaceConfig;
14use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
15use crate::{Error, Result};
16use async_recursion::async_recursion;
17use chrono::Utc;
18use cuenv_workspaces::{
19    BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
20    NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
21    PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
22    YarnModernLockfileParser, detect_from_command, detect_package_managers,
23    materializer::{
24        Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
25    },
26};
27use std::collections::{BTreeMap, HashMap, HashSet};
28use std::path::{Path, PathBuf};
29use std::process::Stdio;
30use std::sync::{Arc, OnceLock};
31use tokio::process::Command;
32use tokio::task::JoinSet;
33
34/// Task execution result
35#[derive(Debug, Clone)]
36pub struct TaskResult {
37    pub name: String,
38    pub exit_code: Option<i32>,
39    pub stdout: String,
40    pub stderr: String,
41    pub success: bool,
42}
43
44/// Number of lines from stdout/stderr to include when summarizing failures
45pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
46
47/// Task executor configuration
48#[derive(Debug, Clone)]
49pub struct ExecutorConfig {
50    /// Whether to capture output (vs streaming to stdout/stderr)
51    pub capture_output: bool,
52    /// Maximum parallel tasks (0 = unlimited)
53    pub max_parallel: usize,
54    /// Environment variables to propagate (resolved via policies)
55    pub environment: Environment,
56    /// Optional working directory override (for hermetic execution)
57    pub working_dir: Option<PathBuf>,
58    /// Project root for resolving inputs/outputs (env.cue root)
59    pub project_root: PathBuf,
60    /// Optional: materialize cached outputs on cache hit
61    pub materialize_outputs: Option<PathBuf>,
62    /// Optional: cache directory override
63    pub cache_dir: Option<PathBuf>,
64    /// Optional: print cache path on hits/misses
65    pub show_cache_path: bool,
66    /// Global workspace configuration
67    pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
68    /// Backend configuration
69    pub backend_config: Option<BackendConfig>,
70    /// CLI backend selection override
71    pub cli_backend: Option<String>,
72}
73
74impl Default for ExecutorConfig {
75    fn default() -> Self {
76        Self {
77            capture_output: false,
78            max_parallel: 0,
79            environment: Environment::new(),
80            working_dir: None,
81            project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
82            materialize_outputs: None,
83            cache_dir: None,
84            show_cache_path: false,
85            workspaces: None,
86            backend_config: None,
87            cli_backend: None,
88        }
89    }
90}
91
92/// Task executor
93pub struct TaskExecutor {
94    config: ExecutorConfig,
95    backend: Arc<dyn TaskBackend>,
96}
97impl TaskExecutor {
98    /// Create a new executor with host backend only
99    pub fn new(config: ExecutorConfig) -> Self {
100        Self::with_dagger_factory(config, None)
101    }
102
103    /// Create a new executor with optional dagger backend support.
104    ///
105    /// Pass `Some(cuenv_dagger::create_dagger_backend)` to enable dagger backend.
106    pub fn with_dagger_factory(
107        config: ExecutorConfig,
108        dagger_factory: Option<BackendFactory>,
109    ) -> Self {
110        let backend = create_backend_with_factory(
111            config.backend_config.as_ref(),
112            config.project_root.clone(),
113            config.cli_backend.as_deref(),
114            dagger_factory,
115        );
116        Self { config, backend }
117    }
118
119    /// Create a new executor with the given config but sharing the backend
120    fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
121        Self { config, backend }
122    }
123
124    /// Execute a single task
125    pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
126        // Delegate execution to the configured backend
127        // For now, we pass the project root as the execution context.
128        // Hermetic execution logic is currently bypassed in favor of direct execution
129        // as per the current codebase state (see original TODOs).
130        // The backend implementation handles the specific execution details.
131
132        // If using Dagger backend, we skip the workspace resolution and hermetic setup for now
133        // as it operates in a container.
134        if self.backend.name() == "dagger" {
135            return self
136                .backend
137                .execute(
138                    name,
139                    task,
140                    &self.config.environment,
141                    &self.config.project_root,
142                    self.config.capture_output,
143                )
144                .await;
145        }
146
147        // All tasks run directly in workspace/project root (hermetic sandbox disabled)
148        return self.execute_task_non_hermetic(name, task).await;
149
150        // TODO: Re-enable hermetic execution when sandbox properly preserves monorepo structure
151        #[allow(unreachable_code)]
152        {
153            // Resolve workspace dependencies if enabled
154            let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
155            let mut workspace_input_patterns = Vec::new();
156            let mut workspace_lockfile_hashes = BTreeMap::new();
157
158            for workspace_name in &task.workspaces {
159                if let Some(global_workspaces) = &self.config.workspaces {
160                    if let Some(ws_config) = global_workspaces.get(workspace_name) {
161                        if ws_config.enabled {
162                            let (ws, entries, paths, hash) = self
163                                .resolve_workspace(name, task, workspace_name, ws_config)
164                                .await?;
165                            workspace_ctxs.push((ws, entries));
166                            workspace_input_patterns.extend(paths);
167                            if let Some(h) = hash {
168                                workspace_lockfile_hashes.insert(workspace_name.clone(), h);
169                            }
170                        }
171                    } else {
172                        tracing::warn!(
173                            task = %name,
174                            workspace = %workspace_name,
175                            "Workspace not found in global configuration"
176                        );
177                    }
178                }
179            }
180
181            // Resolve inputs relative to the workspace root (if any) while keeping
182            // project-scoped inputs anchored to the original project path.
183            // Use the first workspace as the primary root if multiple are present,
184            // or fallback to project root. This might need refinement for multi-workspace tasks.
185            let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
186
187            let project_prefix = primary_workspace_root
188                .as_ref()
189                .and_then(|root| self.config.project_root.strip_prefix(root).ok())
190                .map(|p| p.to_path_buf());
191            let input_root = primary_workspace_root
192                .clone()
193                .unwrap_or_else(|| self.config.project_root.clone());
194
195            let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
196            let resolved_inputs = {
197                let _g = span_inputs.enter();
198                let resolver = InputResolver::new(&input_root);
199                let mut all_inputs = task.collect_all_inputs_with_prefix(project_prefix.as_deref());
200                all_inputs.extend(workspace_input_patterns.iter().cloned());
201                resolver.resolve(&all_inputs)?
202            };
203            if task_trace_enabled() {
204                tracing::info!(
205                    task = %name,
206                    input_root = %input_root.display(),
207                    project_root = %self.config.project_root.display(),
208                    inputs_count = resolved_inputs.files.len(),
209                    workspace_inputs = workspace_input_patterns.len(),
210                    "Resolved task inputs"
211                );
212            }
213
214            // Build cache key envelope
215            let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
216            // Ensure deterministic order is already guaranteed by BTreeMap
217            let env_summary: BTreeMap<String, String> = self
218                .config
219                .environment
220                .vars
221                .iter()
222                .map(|(k, v)| (k.clone(), v.clone()))
223                .collect();
224            let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
225            let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
226            let shell_json = serde_json::to_value(&task.shell).ok();
227            let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
228                None
229            } else {
230                Some(workspace_lockfile_hashes)
231            };
232
233            let envelope = task_cache::CacheKeyEnvelope {
234                inputs: inputs_summary.clone(),
235                command: task.command.clone(),
236                args: task.args.clone(),
237                shell: shell_json,
238                env: env_summary.clone(),
239                cuenv_version: cuenv_version.clone(),
240                platform: platform.clone(),
241                workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
242                // Package hashes are implicitly included in inputs_summary because we added member paths to inputs
243                workspace_package_hashes: None,
244            };
245            let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
246
247            // Cache lookup
248            let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
249            let cache_hit = {
250                let _g = span_cache.enter();
251                task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
252            };
253
254            if let Some(hit) = cache_hit {
255                tracing::info!(
256                    task = %name,
257                    key = %cache_key,
258                    path = %hit.path.display(),
259                    "Task {} cache hit: {}. Skipping execution.",
260                    name,
261                    cache_key
262                );
263                if self.config.show_cache_path {
264                    tracing::info!(cache_path = %hit.path.display(), "Cache path");
265                }
266                if let Some(dest) = &self.config.materialize_outputs {
267                    let count = task_cache::materialize_outputs(
268                        &cache_key,
269                        dest,
270                        self.config.cache_dir.as_deref(),
271                    )?;
272                    tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
273                }
274                // On cache hit, surface cached logs so behavior matches a fresh
275                // execution from the caller's perspective. We return them in the
276                // TaskResult even if `capture_output` is false, allowing callers
277                // (like the CLI) to print cached logs explicitly.
278                let stdout_path = hit.path.join("logs").join("stdout.log");
279                let stderr_path = hit.path.join("logs").join("stderr.log");
280                let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
281                let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
282                // If logs are present, we can return immediately. If logs are
283                // missing (older cache format), fall through to execute to
284                // backfill logs for future hits.
285                if !(stdout.is_empty() && stderr.is_empty()) {
286                    if !self.config.capture_output {
287                        cuenv_events::emit_task_cache_hit!(name, cache_key);
288                        // Emit cached logs as output events
289                        if !stdout.is_empty() {
290                            cuenv_events::emit_task_output!(name, "stdout", stdout);
291                        }
292                        if !stderr.is_empty() {
293                            cuenv_events::emit_task_output!(name, "stderr", stderr);
294                        }
295                    }
296                    return Ok(TaskResult {
297                        name: name.to_string(),
298                        exit_code: Some(0),
299                        stdout,
300                        stderr,
301                        success: true,
302                    });
303                } else {
304                    tracing::info!(
305                        task = %name,
306                        key = %cache_key,
307                        "Cache entry lacks logs; executing to backfill logs"
308                    );
309                }
310            }
311
312            tracing::info!(
313                task = %name,
314                key = %cache_key,
315                "Task {} executing hermetically… key {}",
316                name,
317                cache_key
318            );
319
320            let hermetic_root = create_hermetic_dir(name, &cache_key)?;
321            if self.config.show_cache_path {
322                tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
323            }
324
325            // Seed working directory with inputs
326            let span_populate =
327                tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
328            {
329                let _g = span_populate.enter();
330                populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
331            }
332
333            // Materialize workspace artifacts (node_modules, target)
334            for (ws, entries) in workspace_ctxs {
335                self.materialize_workspace(&ws, &entries, &hermetic_root)
336                    .await?;
337            }
338
339            // Materialize outputs from inputsFrom tasks
340            if let Some(ref inputs_from) = task.inputs_from {
341                for task_output in inputs_from {
342                    let source_task = &task_output.task;
343                    let source_cache_key = task_cache::lookup_latest(
344                        &self.config.project_root,
345                        source_task,
346                        self.config.cache_dir.as_deref(),
347                    )
348                    .ok_or_else(|| {
349                        Error::configuration(format!(
350                            "Task '{}' depends on outputs from '{}' but no cached result found. \
351                         Ensure '{}' runs before this task (add it to dependsOn).",
352                            name, source_task, source_task
353                        ))
354                    })?;
355
356                    // Materialize outputs into the hermetic directory
357                    let materialized = task_cache::materialize_outputs(
358                        &source_cache_key,
359                        &hermetic_root,
360                        self.config.cache_dir.as_deref(),
361                    )?;
362                    tracing::info!(
363                        task = %name,
364                        source_task = %source_task,
365                        materialized = materialized,
366                        "Materialized outputs from dependent task"
367                    );
368                }
369            }
370
371            // Initial snapshot to detect undeclared writes
372            let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
373
374            // Resolve command path using the environment's PATH.
375            // This is necessary because when spawning a process, the OS looks up
376            // the executable in the current process's PATH, not the environment
377            // that will be set on the child process.
378            let resolved_command = self.config.environment.resolve_command(&task.command);
379
380            // Build command
381            let mut cmd = if let Some(shell) = &task.shell {
382                if shell.command.is_some() && shell.flag.is_some() {
383                    let shell_command = shell.command.as_ref().unwrap();
384                    let shell_flag = shell.flag.as_ref().unwrap();
385                    // Resolve shell command too
386                    let resolved_shell = self.config.environment.resolve_command(shell_command);
387                    let mut cmd = Command::new(&resolved_shell);
388                    cmd.arg(shell_flag);
389                    if task.args.is_empty() {
390                        cmd.arg(&resolved_command);
391                    } else {
392                        let full_command = if task.command.is_empty() {
393                            task.args.join(" ")
394                        } else {
395                            format!("{} {}", resolved_command, task.args.join(" "))
396                        };
397                        cmd.arg(full_command);
398                    }
399                    cmd
400                } else {
401                    let mut cmd = Command::new(&resolved_command);
402                    for arg in &task.args {
403                        cmd.arg(arg);
404                    }
405                    cmd
406                }
407            } else {
408                let mut cmd = Command::new(&resolved_command);
409                for arg in &task.args {
410                    cmd.arg(arg);
411                }
412                cmd
413            };
414
415            let workdir = if let Some(dir) = &self.config.working_dir {
416                dir.clone()
417            } else if let Some(prefix) = project_prefix.as_ref() {
418                hermetic_root.join(prefix)
419            } else {
420                hermetic_root.clone()
421            };
422            std::fs::create_dir_all(&workdir).map_err(|e| Error::Io {
423                source: e,
424                path: Some(workdir.clone().into()),
425                operation: "create_dir_all".into(),
426            })?;
427            cmd.current_dir(&workdir);
428            // Set environment variables (resolved + system), set CWD
429            let env_vars = self.config.environment.merge_with_system();
430            if task_trace_enabled() {
431                tracing::info!(
432                    task = %name,
433                    hermetic_root = %hermetic_root.display(),
434                    workdir = %workdir.display(),
435                    command = %task.command,
436                    args = ?task.args,
437                    env_count = env_vars.len(),
438                    "Launching task command"
439                );
440            }
441            for (k, v) in env_vars {
442                cmd.env(k, v);
443            }
444
445            // Configure output: always capture to ensure consistent behavior on
446            // cache hits and allow callers to decide what to print.
447            cmd.stdout(Stdio::piped());
448            cmd.stderr(Stdio::piped());
449
450            let stream_logs = !self.config.capture_output;
451            if stream_logs {
452                let cmd_str = if task.command.is_empty() {
453                    task.args.join(" ")
454                } else {
455                    format!("{} {}", task.command, task.args.join(" "))
456                };
457                cuenv_events::emit_task_started!(name, cmd_str, true);
458            }
459
460            let start = std::time::Instant::now();
461            let mut child = cmd.spawn().map_err(|e| {
462                Error::configuration(format!("Failed to spawn task '{}': {}", name, e))
463            })?;
464
465            let stdout_handle = child.stdout.take();
466            let stderr_handle = child.stderr.take();
467
468            let stdout_task = async move {
469                if let Some(mut stdout) = stdout_handle {
470                    let mut output = Vec::new();
471                    let mut buf = [0u8; 4096];
472                    use std::io::Write;
473                    use tokio::io::AsyncReadExt;
474
475                    loop {
476                        match stdout.read(&mut buf).await {
477                            Ok(0) => break, // EOF
478                            Ok(n) => {
479                                let chunk = &buf[0..n];
480                                if stream_logs {
481                                    let mut handle = std::io::stdout().lock();
482                                    let _ = handle.write_all(chunk);
483                                    let _ = handle.flush();
484                                }
485                                output.extend_from_slice(chunk);
486                            }
487                            Err(_) => break,
488                        }
489                    }
490                    String::from_utf8_lossy(&output).to_string()
491                } else {
492                    String::new()
493                }
494            };
495
496            let stderr_task = async move {
497                if let Some(mut stderr) = stderr_handle {
498                    let mut output = Vec::new();
499                    let mut buf = [0u8; 4096];
500                    use std::io::Write;
501                    use tokio::io::AsyncReadExt;
502
503                    loop {
504                        match stderr.read(&mut buf).await {
505                            Ok(0) => break, // EOF
506                            Ok(n) => {
507                                let chunk = &buf[0..n];
508                                if stream_logs {
509                                    let mut handle = std::io::stderr().lock();
510                                    let _ = handle.write_all(chunk);
511                                    let _ = handle.flush();
512                                }
513                                output.extend_from_slice(chunk);
514                            }
515                            Err(_) => break,
516                        }
517                    }
518                    String::from_utf8_lossy(&output).to_string()
519                } else {
520                    String::new()
521                }
522            };
523
524            let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
525
526            let status = child.wait().await.map_err(|e| {
527                Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
528            })?;
529            let duration = start.elapsed();
530
531            let exit_code = status.code().unwrap_or(1);
532            let success = status.success();
533            if !success {
534                tracing::warn!(task = %name, exit = exit_code, "Task failed");
535                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
536                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
537            } else {
538                tracing::info!(task = %name, "Task completed successfully");
539            }
540
541            // Collect declared outputs and warn on undeclared writes
542            let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
543                task.outputs
544                    .iter()
545                    .map(|o| prefix.join(o).to_string_lossy().to_string())
546                    .collect()
547            } else {
548                task.outputs.clone()
549            };
550            let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
551            let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
552            let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
553
554            // Stage outputs into a temp dir for cache persistence
555            let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
556            if outputs_stage.exists() {
557                let _ = std::fs::remove_dir_all(&outputs_stage);
558            }
559            std::fs::create_dir_all(&outputs_stage).map_err(|e| Error::Io {
560                source: e,
561                path: Some(outputs_stage.clone().into()),
562                operation: "create_dir_all".into(),
563            })?;
564
565            for rel in &outputs {
566                let rel_for_project = project_prefix
567                    .as_ref()
568                    .and_then(|prefix| rel.strip_prefix(prefix).ok())
569                    .unwrap_or(rel)
570                    .to_path_buf();
571                let src = hermetic_root.join(rel);
572                // Intentionally avoid `let`-chains here to preserve readability
573                // and to align with review guidance; allow clippy's collapsible-if.
574                #[allow(clippy::collapsible_if)]
575                if let Ok(meta) = std::fs::metadata(&src) {
576                    if meta.is_file() {
577                        let dst = outputs_stage.join(&rel_for_project);
578                        if let Some(parent) = dst.parent() {
579                            std::fs::create_dir_all(parent).map_err(|e| Error::Io {
580                                source: e,
581                                path: Some(parent.into()),
582                                operation: "create_dir_all".into(),
583                            })?;
584                        }
585                        std::fs::copy(&src, &dst).map_err(|e| Error::Io {
586                            source: e,
587                            path: Some(dst.into()),
588                            operation: "copy".into(),
589                        })?;
590                        let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
591                        output_index.push(task_cache::OutputIndexEntry {
592                            rel_path: rel_for_project.to_string_lossy().to_string(),
593                            size: meta.len(),
594                            sha256: sha,
595                        });
596                    }
597                }
598            }
599
600            // Detect undeclared writes
601            let mut warned = false;
602            for entry in walkdir::WalkDir::new(&hermetic_root)
603                .into_iter()
604                .filter_map(|e| e.ok())
605            {
606                let p = entry.path();
607                if p.is_dir() {
608                    continue;
609                }
610                let rel = match p.strip_prefix(&hermetic_root) {
611                    Ok(r) => r.to_path_buf(),
612                    Err(_) => continue,
613                };
614                let rel_str = rel.to_string_lossy().to_string();
615                let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
616                let initial = initial_hashes.get(&rel_str);
617                let changed = match initial {
618                    None => true,
619                    Some(prev) => prev != &sha,
620                };
621                if changed && !outputs_set.contains(&rel) {
622                    if !warned {
623                        tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
624                        warned = true;
625                    }
626                    tracing::debug!(path = %rel_str, "Undeclared write");
627                }
628            }
629
630            // Persist cache entry on success
631            if success {
632                let meta = task_cache::TaskResultMeta {
633                    task_name: name.to_string(),
634                    command: task.command.clone(),
635                    args: task.args.clone(),
636                    env_summary,
637                    inputs_summary: inputs_summary.clone(),
638                    created_at: Utc::now(),
639                    cuenv_version,
640                    platform,
641                    duration_ms: duration.as_millis(),
642                    exit_code,
643                    cache_key_envelope: envelope_json.clone(),
644                    output_index,
645                };
646                let logs = task_cache::TaskLogs {
647                    // Persist logs regardless of capture setting so cache hits can
648                    // reproduce output faithfully.
649                    stdout: Some(stdout.clone()),
650                    stderr: Some(stderr.clone()),
651                };
652                let cache_span = tracing::info_span!("cache.save", key = %cache_key);
653                {
654                    let _g = cache_span.enter();
655                    task_cache::save_result(
656                        &cache_key,
657                        &meta,
658                        &outputs_stage,
659                        &hermetic_root,
660                        logs,
661                        self.config.cache_dir.as_deref(),
662                    )?;
663                }
664
665                // Record this task's cache key as the latest for this project
666                task_cache::record_latest(
667                    &self.config.project_root,
668                    name,
669                    &cache_key,
670                    self.config.cache_dir.as_deref(),
671                )?;
672
673                // Outputs remain in cache only - dependent tasks use inputsFrom to consume them
674            } else {
675                // Optionally persist logs in a failure/ subdir: not implemented for brevity
676            }
677
678            Ok(TaskResult {
679                name: name.to_string(),
680                exit_code: Some(exit_code),
681                stdout,
682                stderr,
683                success,
684            })
685        }
686    }
687
688    /// Execute a task non-hermetically (directly in workspace/project root)
689    ///
690    /// Used for tasks like `bun install` that need to write to the real filesystem.
691    async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
692        // Determine working directory:
693        // - Install tasks (hermetic: false with workspaces) run from workspace root
694        // - All other tasks run from project root
695        let workdir = if !task.hermetic && !task.workspaces.is_empty() {
696            // Find workspace root for install tasks
697            let workspace_name = &task.workspaces[0];
698            let manager = match workspace_name.as_str() {
699                "bun" => PackageManager::Bun,
700                "npm" => PackageManager::Npm,
701                "pnpm" => PackageManager::Pnpm,
702                "yarn" => PackageManager::YarnModern,
703                "cargo" => PackageManager::Cargo,
704                _ => PackageManager::Npm, // fallback
705            };
706            find_workspace_root(manager, &self.config.project_root)
707        } else {
708            self.config.project_root.clone()
709        };
710
711        tracing::info!(
712            task = %name,
713            workdir = %workdir.display(),
714            hermetic = false,
715            "Executing non-hermetic task"
716        );
717
718        // Emit command being run
719        let cmd_str = if task.command.is_empty() {
720            task.args.join(" ")
721        } else {
722            format!("{} {}", task.command, task.args.join(" "))
723        };
724        if !self.config.capture_output {
725            cuenv_events::emit_task_started!(name, cmd_str, false);
726        }
727
728        // Resolve command path
729        let resolved_command = self.config.environment.resolve_command(&task.command);
730
731        // Build command
732        let mut cmd = if let Some(shell) = &task.shell {
733            if shell.command.is_some() && shell.flag.is_some() {
734                let shell_command = shell.command.as_ref().unwrap();
735                let shell_flag = shell.flag.as_ref().unwrap();
736                let resolved_shell = self.config.environment.resolve_command(shell_command);
737                let mut cmd = Command::new(&resolved_shell);
738                cmd.arg(shell_flag);
739                if task.args.is_empty() {
740                    cmd.arg(&resolved_command);
741                } else {
742                    let full_command = if task.command.is_empty() {
743                        task.args.join(" ")
744                    } else {
745                        format!("{} {}", resolved_command, task.args.join(" "))
746                    };
747                    cmd.arg(full_command);
748                }
749                cmd
750            } else {
751                let mut cmd = Command::new(&resolved_command);
752                for arg in &task.args {
753                    cmd.arg(arg);
754                }
755                cmd
756            }
757        } else {
758            let mut cmd = Command::new(&resolved_command);
759            for arg in &task.args {
760                cmd.arg(arg);
761            }
762            cmd
763        };
764
765        // Set working directory and environment
766        cmd.current_dir(&workdir);
767        let env_vars = self.config.environment.merge_with_system();
768        for (k, v) in &env_vars {
769            cmd.env(k, v);
770        }
771
772        // Execute - always capture output for consistent behavior
773        // If not in capture mode, stream output to terminal in real-time
774        if self.config.capture_output {
775            let output = cmd
776                .stdout(Stdio::piped())
777                .stderr(Stdio::piped())
778                .output()
779                .await
780                .map_err(|e| Error::Io {
781                    source: e,
782                    path: None,
783                    operation: format!("spawn task {}", name),
784                })?;
785
786            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
787            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
788            let exit_code = output.status.code().unwrap_or(-1);
789            let success = output.status.success();
790
791            if !success {
792                tracing::warn!(task = %name, exit = exit_code, "Task failed");
793                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
794                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
795            }
796
797            Ok(TaskResult {
798                name: name.to_string(),
799                exit_code: Some(exit_code),
800                stdout,
801                stderr,
802                success,
803            })
804        } else {
805            // Stream output directly to terminal (interactive mode)
806            let status = cmd
807                .stdout(Stdio::inherit())
808                .stderr(Stdio::inherit())
809                .status()
810                .await
811                .map_err(|e| Error::Io {
812                    source: e,
813                    path: None,
814                    operation: format!("spawn task {}", name),
815                })?;
816
817            let exit_code = status.code().unwrap_or(-1);
818            let success = status.success();
819
820            if !success {
821                tracing::warn!(task = %name, exit = exit_code, "Task failed");
822            }
823
824            Ok(TaskResult {
825                name: name.to_string(),
826                exit_code: Some(exit_code),
827                stdout: String::new(), // Output went to terminal
828                stderr: String::new(),
829                success,
830            })
831        }
832    }
833
834    /// Execute a task definition (single task or group)
835    #[async_recursion]
836    pub async fn execute_definition(
837        &self,
838        name: &str,
839        definition: &TaskDefinition,
840        all_tasks: &Tasks,
841    ) -> Result<Vec<TaskResult>> {
842        match definition {
843            TaskDefinition::Single(task) => {
844                let result = self.execute_task(name, task.as_ref()).await?;
845                Ok(vec![result])
846            }
847            TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
848        }
849    }
850
851    async fn execute_group(
852        &self,
853        prefix: &str,
854        group: &TaskGroup,
855        all_tasks: &Tasks,
856    ) -> Result<Vec<TaskResult>> {
857        match group {
858            TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
859            TaskGroup::Parallel(tasks) => self.execute_parallel(prefix, tasks, all_tasks).await,
860        }
861    }
862
863    async fn execute_sequential(
864        &self,
865        prefix: &str,
866        tasks: &[TaskDefinition],
867        all_tasks: &Tasks,
868    ) -> Result<Vec<TaskResult>> {
869        if !self.config.capture_output {
870            cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
871        }
872        let mut results = Vec::new();
873        for (i, task_def) in tasks.iter().enumerate() {
874            let task_name = format!("{}[{}]", prefix, i);
875            let task_results = self
876                .execute_definition(&task_name, task_def, all_tasks)
877                .await?;
878            for result in &task_results {
879                if !result.success {
880                    let message = format!(
881                        "Sequential task group '{prefix}' halted.\n\n{}",
882                        summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
883                    );
884                    return Err(Error::configuration(message));
885                }
886            }
887            results.extend(task_results);
888        }
889        Ok(results)
890    }
891
892    async fn execute_parallel(
893        &self,
894        prefix: &str,
895        tasks: &HashMap<String, TaskDefinition>,
896        all_tasks: &Tasks,
897    ) -> Result<Vec<TaskResult>> {
898        // Check for "default" task to override parallel execution
899        if let Some(default_task) = tasks.get("default") {
900            if !self.config.capture_output {
901                cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
902            }
903            // Execute only the default task, using the group prefix directly
904            // since "default" is implicit when invoking the group name
905            let task_name = format!("{}.default", prefix);
906            return self
907                .execute_definition(&task_name, default_task, all_tasks)
908                .await;
909        }
910
911        if !self.config.capture_output {
912            cuenv_events::emit_task_group_started!(prefix, false, tasks.len());
913        }
914        let mut join_set = JoinSet::new();
915        let all_tasks = Arc::new(all_tasks.clone());
916        let mut all_results = Vec::new();
917        let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
918            if let Some(failed) = results.iter().find(|r| !r.success) {
919                let message = format!(
920                    "Parallel task group '{prefix}' halted.\n\n{}",
921                    summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
922                );
923                return Err(Error::configuration(message));
924            }
925            all_results.extend(results);
926            Ok(())
927        };
928        for (name, task_def) in tasks {
929            let task_name = format!("{}.{}", prefix, name);
930            let task_def = task_def.clone();
931            let all_tasks = Arc::clone(&all_tasks);
932            let executor = self.clone_with_config();
933            join_set.spawn(async move {
934                executor
935                    .execute_definition(&task_name, &task_def, &all_tasks)
936                    .await
937            });
938            if self.config.max_parallel > 0
939                && join_set.len() >= self.config.max_parallel
940                && let Some(result) = join_set.join_next().await
941            {
942                match result {
943                    Ok(Ok(results)) => merge_results(results)?,
944                    Ok(Err(e)) => return Err(e),
945                    Err(e) => {
946                        return Err(Error::configuration(format!(
947                            "Task execution panicked: {}",
948                            e
949                        )));
950                    }
951                }
952            }
953        }
954        while let Some(result) = join_set.join_next().await {
955            match result {
956                Ok(Ok(results)) => merge_results(results)?,
957                Ok(Err(e)) => return Err(e),
958                Err(e) => {
959                    return Err(Error::configuration(format!(
960                        "Task execution panicked: {}",
961                        e
962                    )));
963                }
964            }
965        }
966        Ok(all_results)
967    }
968
969    pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
970        let parallel_groups = graph.get_parallel_groups()?;
971        let mut all_results = Vec::new();
972        let mut join_set = JoinSet::new();
973        let mut group_iter = parallel_groups.into_iter();
974        let mut current_group = group_iter.next();
975        while current_group.is_some() || !join_set.is_empty() {
976            if let Some(group) = current_group.as_mut() {
977                while let Some(node) = group.pop() {
978                    let task = node.task.clone();
979                    let name = node.name.clone();
980                    let executor = self.clone_with_config();
981                    join_set.spawn(async move { executor.execute_task(&name, &task).await });
982                    if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
983                        break;
984                    }
985                }
986                if group.is_empty() {
987                    current_group = group_iter.next();
988                }
989            }
990            if let Some(result) = join_set.join_next().await {
991                match result {
992                    Ok(Ok(task_result)) => {
993                        if !task_result.success {
994                            let message = format!(
995                                "Task graph execution halted.\n\n{}",
996                                summarize_task_failure(&task_result, TASK_FAILURE_SNIPPET_LINES)
997                            );
998                            return Err(Error::configuration(message));
999                        }
1000                        all_results.push(task_result);
1001                    }
1002                    Ok(Err(e)) => return Err(e),
1003                    Err(e) => {
1004                        return Err(Error::configuration(format!(
1005                            "Task execution panicked: {}",
1006                            e
1007                        )));
1008                    }
1009                }
1010            }
1011        }
1012        Ok(all_results)
1013    }
1014
1015    async fn resolve_workspace(
1016        &self,
1017        _task_name: &str,
1018        task: &Task,
1019        workspace_name: &str,
1020        config: &WorkspaceConfig,
1021    ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
1022        let root = self.config.project_root.clone();
1023        let task_label = _task_name.to_string();
1024        let command = task.command.clone();
1025        let config_pm = config.package_manager.clone();
1026        let config_root = config.root.clone();
1027        // WorkspaceConfig doesn't support 'packages' filtering yet, so assume full workspace deps for now,
1028        // or imply it from context. If we want package filtering, we need to add it to WorkspaceConfig
1029        // or assume all deps.
1030        // However, the previous logic used `packages` from WorkspaceInputs.
1031        // For now, let's assume traversing all dependencies if not specified.
1032        // But wait, `WorkspaceConfig` in schema doesn't have `packages`.
1033        // So we probably default to "all relevant" or auto-infer from current directory if we are inside a package.
1034        let mut packages = Vec::new();
1035
1036        // If we are in a subdirectory that matches a workspace member, we might want to infer that package.
1037        // The previous logic did that if `packages` was empty.
1038
1039        let lockfile_override: Option<String> = None; // WorkspaceConfig doesn't have lockfile override yet.
1040        // If we need it, we should add it to WorkspaceConfig. For now assume standard discovery.
1041
1042        let mut traverse_workspace_deps = true;
1043
1044        // Use spawn_blocking for heavy lifting
1045        let workspace_name_owned = workspace_name.to_string();
1046        tokio::task::spawn_blocking(move || {
1047            let override_for_detection = lockfile_override.as_ref().map(|lock| {
1048                let candidate = PathBuf::from(lock);
1049                if candidate.is_absolute() {
1050                    candidate
1051                } else {
1052                    root.join(lock)
1053                }
1054            });
1055
1056            // 1. Detect Package Manager
1057            // Priority: 1. explicit config, 2. workspace name (if it matches a PM), 3. auto-detect
1058            let manager = if let Some(pm_str) = config_pm {
1059                match pm_str.as_str() {
1060                    "npm" => PackageManager::Npm,
1061                    "bun" => PackageManager::Bun,
1062                    "pnpm" => PackageManager::Pnpm,
1063                    "yarn" => PackageManager::YarnModern,
1064                    "yarn-classic" => PackageManager::YarnClassic,
1065                    "cargo" => PackageManager::Cargo,
1066                    _ => {
1067                        return Err(Error::configuration(format!(
1068                            "Unknown package manager: {}",
1069                            pm_str
1070                        )));
1071                    }
1072                }
1073            } else {
1074                // Try to match workspace name to package manager
1075                match workspace_name_owned.as_str() {
1076                    "npm" => PackageManager::Npm,
1077                    "bun" => PackageManager::Bun,
1078                    "pnpm" => PackageManager::Pnpm,
1079                    "yarn" => PackageManager::YarnModern,
1080                    "cargo" => PackageManager::Cargo,
1081                    _ => {
1082                         // Auto-detect if name doesn't match
1083                        // Priority: command hint -> lockfiles
1084                        let hint = detect_from_command(&command);
1085                        let detected = match detect_package_managers(&root) {
1086                            Ok(list) => list,
1087                            Err(e) => {
1088                                if override_for_detection.is_some() {
1089                                    Vec::new()
1090                                } else {
1091                                    return Err(Error::configuration(format!(
1092                                        "Failed to detect package managers: {}",
1093                                        e
1094                                    )));
1095                                }
1096                            }
1097                        };
1098
1099                        if let Some(h) = hint {
1100                            if detected.contains(&h) {
1101                                h
1102                            } else if !detected.is_empty() {
1103                                // Prefer detected files
1104                                detected[0]
1105                            } else if let Some(ref override_path) = override_for_detection {
1106                                infer_manager_from_lockfile(override_path).ok_or_else(|| {
1107                                    Error::configuration(
1108                                        "Unable to infer package manager from lockfile override",
1109                                    )
1110                                })?
1111                            } else {
1112                                return Err(Error::configuration(
1113                                    format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
1114                                ));
1115                            }
1116                        } else if !detected.is_empty() {
1117                            detected[0]
1118                        } else if let Some(ref override_path) = override_for_detection {
1119                            infer_manager_from_lockfile(override_path).ok_or_else(|| {
1120                                Error::configuration(
1121                                    "Unable to infer package manager from lockfile override",
1122                                )
1123                            })?
1124                        } else {
1125                            return Err(Error::configuration(
1126                                "Could not detect package manager for workspace resolution",
1127                            ));
1128                        }
1129                    }
1130                }
1131            };
1132
1133            // Resolve the actual workspace root by walking up until we find a
1134            // directory that declares a workspace for this package manager.
1135            // If config.root is set, use that.
1136            let workspace_root = if let Some(root_override) = &config_root {
1137                root.join(root_override)
1138            } else {
1139                find_workspace_root(manager, &root)
1140            };
1141
1142            if task_trace_enabled() {
1143                tracing::info!(
1144                    task = %task_label,
1145                    manager = %manager,
1146                    project_root = %root.display(),
1147                    workspace_root = %workspace_root.display(),
1148                    "Resolved workspace root for package manager"
1149                );
1150            }
1151
1152            let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
1153                let candidate = PathBuf::from(lock);
1154                if candidate.is_absolute() {
1155                    candidate
1156                } else {
1157                    workspace_root.join(lock)
1158                }
1159            });
1160
1161            // 2. Discover Workspace
1162            let discovery: Box<dyn WorkspaceDiscovery> = match manager {
1163                PackageManager::Npm
1164                | PackageManager::Bun
1165                | PackageManager::YarnClassic
1166                | PackageManager::YarnModern => Box::new(PackageJsonDiscovery),
1167                PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
1168                PackageManager::Cargo => Box::new(CargoTomlDiscovery),
1169            };
1170
1171            let workspace = discovery.discover(&workspace_root).map_err(|e| {
1172                Error::configuration(format!("Failed to discover workspace: {}", e))
1173            })?;
1174
1175            // 3. Parse Lockfile
1176            let lockfile_path = if let Some(path) = lockfile_override_path {
1177                if !path.exists() {
1178                    return Err(Error::configuration(format!(
1179                        "Workspace lockfile override does not exist: {}",
1180                        path.display()
1181                    )));
1182                }
1183                path
1184            } else {
1185                workspace.lockfile.clone().ok_or_else(|| {
1186                    Error::configuration("Workspace resolution requires a lockfile")
1187                })?
1188            };
1189
1190            let parser: Box<dyn LockfileParser> = match manager {
1191                PackageManager::Npm => Box::new(NpmLockfileParser),
1192                PackageManager::Bun => Box::new(BunLockfileParser),
1193                PackageManager::Pnpm => Box::new(PnpmLockfileParser),
1194                PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
1195                PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
1196                PackageManager::Cargo => Box::new(CargoLockfileParser),
1197            };
1198
1199            let entries = parser
1200                .parse(&lockfile_path)
1201                .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
1202            if task_trace_enabled() {
1203                tracing::info!(
1204                    task = %task_label,
1205                    lockfile = %lockfile_path.display(),
1206                    members = entries.len(),
1207                    "Parsed workspace lockfile"
1208                );
1209            }
1210
1211            // Compute lockfile hash
1212            let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
1213
1214            // Infer packages when none explicitly provided by scoping to the
1215            // current workspace member. (We intentionally avoid pulling all
1216            // transitive deps here to keep hashing fast for large monorepos.)
1217            if packages.is_empty() {
1218                let current_member = workspace
1219                    .members
1220                    .iter()
1221                    .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
1222                if let Some(member) = current_member {
1223                    let inferred = vec![member.name.clone()];
1224                    if task_trace_enabled() {
1225                        tracing::info!(
1226                            task = %task_label,
1227                            inferred_packages = ?inferred,
1228                            "Inferred workspace packages from current project"
1229                        );
1230                    }
1231                    packages = inferred;
1232                    traverse_workspace_deps = true;
1233                }
1234            }
1235
1236            // 4. Collect Inputs
1237            let mut member_paths = Vec::new();
1238
1239            // Always include workspace configuration files
1240            member_paths.push(manager.workspace_config_name().to_string());
1241            if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
1242                member_paths.push(rel.to_string_lossy().to_string());
1243            } else {
1244                member_paths.push(lockfile_path.to_string_lossy().to_string());
1245            }
1246
1247            if packages.is_empty() {
1248                for member in &workspace.members {
1249                    let manifest_rel = member
1250                        .path
1251                        .join(manager.workspace_config_name());
1252                    member_paths.push(manifest_rel.to_string_lossy().to_string());
1253                }
1254            } else {
1255                let mut to_visit: Vec<String> = packages.clone();
1256                let mut visited = HashSet::new();
1257
1258                while let Some(pkg_name) = to_visit.pop() {
1259                    if visited.contains(&pkg_name) {
1260                        continue;
1261                    }
1262                    visited.insert(pkg_name.clone());
1263
1264                    if let Some(member) = workspace.find_member(&pkg_name) {
1265                        let manifest_rel = member
1266                            .path
1267                            .join(manager.workspace_config_name());
1268                        member_paths.push(manifest_rel.to_string_lossy().to_string());
1269
1270                        // Add dependencies when explicitly requested
1271                        if traverse_workspace_deps {
1272                            let mut dependency_candidates: HashSet<String> = HashSet::new();
1273
1274                            if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1275                                for dep in &entry.dependencies {
1276                                    if entries
1277                                        .iter()
1278                                        .any(|e| e.name == dep.name && e.is_workspace_member)
1279                                    {
1280                                        dependency_candidates.insert(dep.name.clone());
1281                                    }
1282                                }
1283                            }
1284
1285                            for dep_name in &member.dependencies {
1286                                if workspace.find_member(dep_name).is_some() {
1287                                    dependency_candidates.insert(dep_name.clone());
1288                                }
1289                            }
1290
1291                            for dep_name in dependency_candidates {
1292                                to_visit.push(dep_name);
1293                            }
1294                        }
1295                    }
1296                }
1297            }
1298
1299            if task_trace_enabled() {
1300                tracing::info!(
1301                    task = %task_label,
1302                    members = ?member_paths,
1303                    "Workspace input member paths selected"
1304                );
1305            }
1306
1307            Ok((workspace, entries, member_paths, Some(hash)))
1308        })
1309        .await
1310        .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1311    }
1312
1313    async fn materialize_workspace(
1314        &self,
1315        workspace: &Workspace,
1316        entries: &[LockfileEntry],
1317        target_dir: &Path,
1318    ) -> Result<()> {
1319        // Dispatch to appropriate materializer
1320        let materializer: Box<dyn Materializer> = match workspace.manager {
1321            PackageManager::Npm
1322            | PackageManager::Bun
1323            | PackageManager::Pnpm
1324            | PackageManager::YarnClassic
1325            | PackageManager::YarnModern => Box::new(NodeModulesMaterializer),
1326            PackageManager::Cargo => Box::new(CargoMaterializer),
1327        };
1328
1329        materializer
1330            .materialize(workspace, entries, target_dir)
1331            .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1332    }
1333
1334    fn clone_with_config(&self) -> Self {
1335        // Share the backend across clones to preserve container cache for Dagger chaining
1336        Self::with_shared_backend(self.config.clone(), self.backend.clone())
1337    }
1338}
1339
1340fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1341    let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1342
1343    loop {
1344        let is_root = match manager {
1345            PackageManager::Npm
1346            | PackageManager::Bun
1347            | PackageManager::YarnClassic
1348            | PackageManager::YarnModern => package_json_has_workspaces(&current),
1349            PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1350            PackageManager::Cargo => cargo_toml_has_workspace(&current),
1351        };
1352
1353        if is_root {
1354            return current;
1355        }
1356
1357        if let Some(parent) = current.parent() {
1358            current = parent.to_path_buf();
1359        } else {
1360            return start.to_path_buf();
1361        }
1362    }
1363}
1364
1365fn package_json_has_workspaces(dir: &Path) -> bool {
1366    let path = dir.join("package.json");
1367    let content = std::fs::read_to_string(&path);
1368    let Ok(json) = content.and_then(|s| {
1369        serde_json::from_str::<serde_json::Value>(&s)
1370            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1371    }) else {
1372        return false;
1373    };
1374
1375    match json.get("workspaces") {
1376        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1377        Some(serde_json::Value::Object(map)) => map
1378            .get("packages")
1379            .and_then(|packages| packages.as_array())
1380            .map(|arr| !arr.is_empty())
1381            .unwrap_or(false),
1382        _ => false,
1383    }
1384}
1385
1386fn cargo_toml_has_workspace(dir: &Path) -> bool {
1387    let path = dir.join("Cargo.toml");
1388    let Ok(content) = std::fs::read_to_string(&path) else {
1389        return false;
1390    };
1391
1392    content.contains("[workspace]")
1393}
1394
1395fn task_trace_enabled() -> bool {
1396    static ENABLED: OnceLock<bool> = OnceLock::new();
1397    *ENABLED.get_or_init(|| {
1398        matches!(
1399            std::env::var("CUENV_TRACE_TASKS")
1400                .unwrap_or_default()
1401                .trim()
1402                .to_ascii_lowercase()
1403                .as_str(),
1404            "1" | "true" | "yes" | "on"
1405        )
1406    })
1407}
1408
1409/// Build a compact, user-friendly summary for a failed task, including the
1410/// exit code and the tail of stdout/stderr to help with diagnostics.
1411pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1412    let exit_code = result
1413        .exit_code
1414        .map(|c| c.to_string())
1415        .unwrap_or_else(|| "unknown".to_string());
1416
1417    let mut sections = Vec::new();
1418    sections.push(format!(
1419        "Task '{}' failed with exit code {}.",
1420        result.name, exit_code
1421    ));
1422
1423    let output = format_failure_streams(result, max_output_lines);
1424    if output.is_empty() {
1425        sections.push(
1426            "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1427                .to_string(),
1428        );
1429    } else {
1430        sections.push(output);
1431    }
1432
1433    sections.join("\n\n")
1434}
1435
1436fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1437    let mut streams = Vec::new();
1438
1439    if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1440        streams.push(stdout);
1441    }
1442
1443    if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1444        streams.push(stderr);
1445    }
1446
1447    streams.join("\n\n")
1448}
1449
1450fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1451    let normalized = content.trim_end();
1452    if normalized.is_empty() {
1453        return None;
1454    }
1455
1456    let lines: Vec<&str> = normalized.lines().collect();
1457    let total = lines.len();
1458    let start = total.saturating_sub(max_output_lines);
1459    let snippet = lines[start..].join("\n");
1460
1461    let header = if total > max_output_lines {
1462        format!("{label} (last {max_output_lines} of {total} lines):")
1463    } else {
1464        format!("{label}:")
1465    };
1466
1467    Some(format!("{header}\n{snippet}"))
1468}
1469
1470fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1471    match path.file_name().and_then(|n| n.to_str())? {
1472        "package-lock.json" => Some(PackageManager::Npm),
1473        "bun.lock" => Some(PackageManager::Bun),
1474        "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1475        "yarn.lock" => Some(PackageManager::YarnModern),
1476        "Cargo.lock" => Some(PackageManager::Cargo),
1477        _ => None,
1478    }
1479}
1480
1481fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1482    // Use OS temp dir; name scoped by task and cache key prefix.
1483    // IMPORTANT: Ensure the workdir is clean on every run to preserve hermeticity.
1484    let sanitized_task = task_name
1485        .chars()
1486        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1487        .collect::<String>();
1488
1489    let base = std::env::temp_dir().join(format!(
1490        "cuenv-work-{}-{}",
1491        sanitized_task,
1492        &key[..12.min(key.len())]
1493    ));
1494
1495    // If a directory from a previous run exists, remove it before reuse.
1496    // This avoids contamination from artifacts left by failed runs where no cache was saved.
1497    if base.exists()
1498        && let Err(e) = std::fs::remove_dir_all(&base)
1499    {
1500        // If we cannot remove the previous directory (e.g. in-use on Windows),
1501        // fall back to a unique, fresh directory to maintain hermetic execution.
1502        let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1503        let fallback = std::env::temp_dir().join(format!(
1504            "cuenv-work-{}-{}-{}",
1505            sanitized_task,
1506            &key[..12.min(key.len())],
1507            ts
1508        ));
1509        tracing::warn!(
1510            previous = %base.display(),
1511            fallback = %fallback.display(),
1512            error = %e,
1513            "Failed to clean previous hermetic workdir; using fresh fallback directory"
1514        );
1515        std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1516            source: e,
1517            path: Some(fallback.clone().into()),
1518            operation: "create_dir_all".into(),
1519        })?;
1520        return Ok(fallback);
1521    }
1522
1523    std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1524        source: e,
1525        path: Some(base.clone().into()),
1526        operation: "create_dir_all".into(),
1527    })?;
1528    Ok(base)
1529}
1530
1531/// Execute an arbitrary command with the cuenv environment
1532pub async fn execute_command(
1533    command: &str,
1534    args: &[String],
1535    environment: &Environment,
1536) -> Result<i32> {
1537    tracing::info!("Executing command: {} {:?}", command, args);
1538    let mut cmd = Command::new(command);
1539    cmd.args(args);
1540    let env_vars = environment.merge_with_system();
1541    for (key, value) in env_vars {
1542        cmd.env(key, value);
1543    }
1544    cmd.stdout(Stdio::inherit());
1545    cmd.stderr(Stdio::inherit());
1546    cmd.stdin(Stdio::inherit());
1547    let status = cmd.status().await.map_err(|e| {
1548        Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1549    })?;
1550    Ok(status.code().unwrap_or(1))
1551}
1552
1553#[cfg(test)]
1554mod tests {
1555    use super::*;
1556    use crate::tasks::Input;
1557    use std::fs;
1558    use tempfile::TempDir;
1559
1560    #[tokio::test]
1561    async fn test_executor_config_default() {
1562        let config = ExecutorConfig::default();
1563        assert!(!config.capture_output);
1564        assert_eq!(config.max_parallel, 0);
1565        assert!(config.environment.is_empty());
1566    }
1567
1568    #[tokio::test]
1569    async fn test_task_result() {
1570        let result = TaskResult {
1571            name: "test".to_string(),
1572            exit_code: Some(0),
1573            stdout: "output".to_string(),
1574            stderr: String::new(),
1575            success: true,
1576        };
1577        assert_eq!(result.name, "test");
1578        assert_eq!(result.exit_code, Some(0));
1579        assert!(result.success);
1580        assert_eq!(result.stdout, "output");
1581    }
1582
1583    #[tokio::test]
1584    async fn test_execute_simple_task() {
1585        let config = ExecutorConfig {
1586            capture_output: true,
1587            ..Default::default()
1588        };
1589        let executor = TaskExecutor::new(config);
1590        let task = Task {
1591            command: "echo".to_string(),
1592            args: vec!["hello".to_string()],
1593            description: Some("Hello task".to_string()),
1594            ..Default::default()
1595        };
1596        let result = executor.execute_task("test", &task).await.unwrap();
1597        assert!(result.success);
1598        assert_eq!(result.exit_code, Some(0));
1599        assert!(result.stdout.contains("hello"));
1600    }
1601
1602    #[tokio::test]
1603    async fn test_execute_with_environment() {
1604        let mut config = ExecutorConfig {
1605            capture_output: true,
1606            ..Default::default()
1607        };
1608        config
1609            .environment
1610            .set("TEST_VAR".to_string(), "test_value".to_string());
1611        let executor = TaskExecutor::new(config);
1612        let task = Task {
1613            command: "printenv".to_string(),
1614            args: vec!["TEST_VAR".to_string()],
1615            description: Some("Print env task".to_string()),
1616            ..Default::default()
1617        };
1618        let result = executor.execute_task("test", &task).await.unwrap();
1619        assert!(result.success);
1620        assert!(result.stdout.contains("test_value"));
1621    }
1622
1623    #[tokio::test]
1624    async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1625        let tmp = TempDir::new().unwrap();
1626        let root = tmp.path();
1627
1628        // Workspace root with workspaces + lockfile
1629        fs::write(
1630            root.join("package.json"),
1631            r#"{
1632  "name": "root-app",
1633  "version": "0.0.0",
1634  "workspaces": ["packages/*", "apps/*"],
1635  "dependencies": {
1636    "@rawkodeacademy/content-technologies": "workspace:*"
1637  }
1638}"#,
1639        )
1640        .unwrap();
1641        // Deliberately omit the workspace member name for apps/site to mimic lockfiles
1642        // that only record member paths, ensuring we can still discover dependencies.
1643        fs::write(
1644            root.join("bun.lock"),
1645            r#"{
1646  "lockfileVersion": 1,
1647  "workspaces": {
1648    "": {
1649      "name": "root-app",
1650      "dependencies": {
1651        "@rawkodeacademy/content-technologies": "workspace:*"
1652      }
1653    },
1654    "packages/content-technologies": {
1655      "name": "@rawkodeacademy/content-technologies",
1656      "version": "0.0.1"
1657    },
1658    "apps/site": {
1659      "version": "0.0.0",
1660      "dependencies": {
1661        "@rawkodeacademy/content-technologies": "workspace:*"
1662      }
1663    }
1664  },
1665  "packages": {}
1666}"#,
1667        )
1668        .unwrap();
1669
1670        // Workspace member packages
1671        fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1672        fs::write(
1673            root.join("packages/content-technologies/package.json"),
1674            r#"{
1675  "name": "@rawkodeacademy/content-technologies",
1676  "version": "0.0.1"
1677}"#,
1678        )
1679        .unwrap();
1680
1681        fs::create_dir_all(root.join("apps/site")).unwrap();
1682        fs::write(
1683            root.join("apps/site/package.json"),
1684            r#"{
1685  "name": "site",
1686  "version": "0.0.0",
1687  "dependencies": {
1688    "@rawkodeacademy/content-technologies": "workspace:*"
1689  }
1690}"#,
1691        )
1692        .unwrap();
1693
1694        let mut workspaces = HashMap::new();
1695        workspaces.insert(
1696            "bun".to_string(),
1697            WorkspaceConfig {
1698                enabled: true,
1699                package_manager: Some("bun".to_string()),
1700                root: None,
1701            },
1702        );
1703
1704        let config = ExecutorConfig {
1705            capture_output: true,
1706            project_root: root.join("apps/site"),
1707            workspaces: Some(workspaces),
1708            ..Default::default()
1709        };
1710        let executor = TaskExecutor::new(config);
1711
1712        let task = Task {
1713            command: "sh".to_string(),
1714            args: vec![
1715                "-c".to_string(),
1716                "find ../.. -maxdepth 4 -type d | sort".to_string(),
1717            ],
1718            inputs: vec![Input::Path("package.json".to_string())],
1719            workspaces: vec!["bun".to_string()],
1720            ..Default::default()
1721        };
1722
1723        let result = executor.execute_task("install", &task).await.unwrap();
1724        assert!(
1725            result.success,
1726            "command failed stdout='{}' stderr='{}'",
1727            result.stdout, result.stderr
1728        );
1729        assert!(
1730            result
1731                .stdout
1732                .split_whitespace()
1733                .any(|line| line.ends_with("packages/content-technologies")),
1734            "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1735            result.stdout,
1736            result.stderr
1737        );
1738    }
1739
1740    #[tokio::test]
1741    async fn test_execute_failing_task() {
1742        let config = ExecutorConfig {
1743            capture_output: true,
1744            ..Default::default()
1745        };
1746        let executor = TaskExecutor::new(config);
1747        let task = Task {
1748            command: "false".to_string(),
1749            description: Some("Failing task".to_string()),
1750            ..Default::default()
1751        };
1752        let result = executor.execute_task("test", &task).await.unwrap();
1753        assert!(!result.success);
1754        assert_eq!(result.exit_code, Some(1));
1755    }
1756
1757    #[tokio::test]
1758    async fn test_execute_sequential_group() {
1759        let config = ExecutorConfig {
1760            capture_output: true,
1761            ..Default::default()
1762        };
1763        let executor = TaskExecutor::new(config);
1764        let task1 = Task {
1765            command: "echo".to_string(),
1766            args: vec!["first".to_string()],
1767            description: Some("First task".to_string()),
1768            ..Default::default()
1769        };
1770        let task2 = Task {
1771            command: "echo".to_string(),
1772            args: vec!["second".to_string()],
1773            description: Some("Second task".to_string()),
1774            ..Default::default()
1775        };
1776        let group = TaskGroup::Sequential(vec![
1777            TaskDefinition::Single(Box::new(task1)),
1778            TaskDefinition::Single(Box::new(task2)),
1779        ]);
1780        let all_tasks = Tasks::new();
1781        let results = executor
1782            .execute_group("seq", &group, &all_tasks)
1783            .await
1784            .unwrap();
1785        assert_eq!(results.len(), 2);
1786        assert!(results[0].stdout.contains("first"));
1787        assert!(results[1].stdout.contains("second"));
1788    }
1789
1790    #[tokio::test]
1791    async fn test_command_injection_prevention() {
1792        let config = ExecutorConfig {
1793            capture_output: true,
1794            ..Default::default()
1795        };
1796        let executor = TaskExecutor::new(config);
1797        let malicious_task = Task {
1798            command: "echo".to_string(),
1799            args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1800            description: Some("Malicious task test".to_string()),
1801            ..Default::default()
1802        };
1803        let result = executor
1804            .execute_task("malicious", &malicious_task)
1805            .await
1806            .unwrap();
1807        assert!(result.success);
1808        assert!(result.stdout.contains("hello ; rm -rf /"));
1809    }
1810
1811    #[tokio::test]
1812    async fn test_special_characters_in_args() {
1813        let config = ExecutorConfig {
1814            capture_output: true,
1815            ..Default::default()
1816        };
1817        let executor = TaskExecutor::new(config);
1818        let special_chars = vec![
1819            "$USER",
1820            "$(whoami)",
1821            "`whoami`",
1822            "&& echo hacked",
1823            "|| echo failed",
1824            "> /tmp/hack",
1825            "| cat",
1826        ];
1827        for special_arg in special_chars {
1828            let task = Task {
1829                command: "echo".to_string(),
1830                args: vec!["safe".to_string(), special_arg.to_string()],
1831                description: Some("Special character test".to_string()),
1832                ..Default::default()
1833            };
1834            let result = executor.execute_task("special", &task).await.unwrap();
1835            assert!(result.success);
1836            assert!(result.stdout.contains("safe"));
1837            assert!(result.stdout.contains(special_arg));
1838        }
1839    }
1840
1841    #[tokio::test]
1842    async fn test_environment_variable_safety() {
1843        let mut config = ExecutorConfig {
1844            capture_output: true,
1845            ..Default::default()
1846        };
1847        config
1848            .environment
1849            .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1850        let executor = TaskExecutor::new(config);
1851        let task = Task {
1852            command: "printenv".to_string(),
1853            args: vec!["DANGEROUS_VAR".to_string()],
1854            description: Some("Environment variable safety test".to_string()),
1855            ..Default::default()
1856        };
1857        let result = executor.execute_task("env_test", &task).await.unwrap();
1858        assert!(result.success);
1859        assert!(result.stdout.contains("; rm -rf /"));
1860    }
1861
1862    #[tokio::test]
1863    async fn test_execute_graph_parallel_groups() {
1864        // two independent tasks -> can run in same parallel group
1865        let config = ExecutorConfig {
1866            capture_output: true,
1867            max_parallel: 2,
1868            ..Default::default()
1869        };
1870        let executor = TaskExecutor::new(config);
1871        let mut graph = TaskGraph::new();
1872
1873        let t1 = Task {
1874            command: "echo".into(),
1875            args: vec!["A".into()],
1876            ..Default::default()
1877        };
1878        let t2 = Task {
1879            command: "echo".into(),
1880            args: vec!["B".into()],
1881            ..Default::default()
1882        };
1883
1884        graph.add_task("t1", t1).unwrap();
1885        graph.add_task("t2", t2).unwrap();
1886        let results = executor.execute_graph(&graph).await.unwrap();
1887        assert_eq!(results.len(), 2);
1888        let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1889        assert!(joined.contains("A") && joined.contains("B"));
1890    }
1891}