cuenv_core/tasks/
executor.rs

1//! Task executor for running tasks with environment support and hermetic, input-addressed execution
2//!
3//! - Environment variable propagation
4//! - Parallel and sequential execution
5//! - Hermetic workdir populated from declared inputs (files/dirs/globs)
6//! - Persistent task result cache keyed by inputs + command + env + cuenv version + platform
7
8use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
9use super::{ParallelGroup, Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
10use crate::cache::tasks as task_cache;
11use crate::config::BackendConfig;
12use crate::environment::Environment;
13use crate::manifest::WorkspaceConfig;
14use crate::tasks::io::{InputResolver, collect_outputs, populate_hermetic_dir};
15use crate::{Error, Result};
16use async_recursion::async_recursion;
17use chrono::Utc;
18use cuenv_workspaces::{
19    BunLockfileParser, CargoLockfileParser, CargoTomlDiscovery, LockfileEntry, LockfileParser,
20    NpmLockfileParser, PackageJsonDiscovery, PackageManager, PnpmLockfileParser,
21    PnpmWorkspaceDiscovery, Workspace, WorkspaceDiscovery, YarnClassicLockfileParser,
22    YarnModernLockfileParser, detect_from_command, detect_package_managers,
23    materializer::{
24        Materializer, cargo_deps::CargoMaterializer, node_modules::NodeModulesMaterializer,
25    },
26};
27use std::collections::{BTreeMap, HashMap, HashSet};
28use std::path::{Path, PathBuf};
29use std::process::Stdio;
30use std::sync::{Arc, OnceLock};
31use tokio::process::Command;
32use tokio::task::JoinSet;
33
34/// Task execution result
35#[derive(Debug, Clone)]
36pub struct TaskResult {
37    pub name: String,
38    pub exit_code: Option<i32>,
39    pub stdout: String,
40    pub stderr: String,
41    pub success: bool,
42}
43
44/// Number of lines from stdout/stderr to include when summarizing failures
45pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
46
47/// Task executor configuration
48#[derive(Debug, Clone)]
49pub struct ExecutorConfig {
50    /// Whether to capture output (vs streaming to stdout/stderr)
51    pub capture_output: bool,
52    /// Maximum parallel tasks (0 = unlimited)
53    pub max_parallel: usize,
54    /// Environment variables to propagate (resolved via policies)
55    pub environment: Environment,
56    /// Optional working directory override (for hermetic execution)
57    pub working_dir: Option<PathBuf>,
58    /// Project root for resolving inputs/outputs (env.cue root)
59    pub project_root: PathBuf,
60    /// Path to cue.mod root for resolving relative source paths
61    pub cue_module_root: Option<PathBuf>,
62    /// Optional: materialize cached outputs on cache hit
63    pub materialize_outputs: Option<PathBuf>,
64    /// Optional: cache directory override
65    pub cache_dir: Option<PathBuf>,
66    /// Optional: print cache path on hits/misses
67    pub show_cache_path: bool,
68    /// Global workspace configuration
69    pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
70    /// Backend configuration
71    pub backend_config: Option<BackendConfig>,
72    /// CLI backend selection override
73    pub cli_backend: Option<String>,
74}
75
76impl Default for ExecutorConfig {
77    fn default() -> Self {
78        Self {
79            capture_output: false,
80            max_parallel: 0,
81            environment: Environment::new(),
82            working_dir: None,
83            project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
84            cue_module_root: None,
85            materialize_outputs: None,
86            cache_dir: None,
87            show_cache_path: false,
88            workspaces: None,
89            backend_config: None,
90            cli_backend: None,
91        }
92    }
93}
94
95/// Task executor
96pub struct TaskExecutor {
97    config: ExecutorConfig,
98    backend: Arc<dyn TaskBackend>,
99}
100impl TaskExecutor {
101    /// Create a new executor with host backend only
102    pub fn new(config: ExecutorConfig) -> Self {
103        Self::with_dagger_factory(config, None)
104    }
105
106    /// Create a new executor with optional dagger backend support.
107    ///
108    /// Pass `Some(cuenv_dagger::create_dagger_backend)` to enable dagger backend.
109    pub fn with_dagger_factory(
110        config: ExecutorConfig,
111        dagger_factory: Option<BackendFactory>,
112    ) -> Self {
113        let backend = create_backend_with_factory(
114            config.backend_config.as_ref(),
115            config.project_root.clone(),
116            config.cli_backend.as_deref(),
117            dagger_factory,
118        );
119        Self { config, backend }
120    }
121
122    /// Create a new executor with the given config but sharing the backend
123    fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
124        Self { config, backend }
125    }
126
127    /// Execute a single task
128    pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
129        // Delegate execution to the configured backend
130        // For now, we pass the project root as the execution context.
131        // Hermetic execution logic is currently bypassed in favor of direct execution
132        // as per the current codebase state (see original TODOs).
133        // The backend implementation handles the specific execution details.
134
135        // If using Dagger backend, we skip the workspace resolution and hermetic setup for now
136        // as it operates in a container.
137        if self.backend.name() == "dagger" {
138            return self
139                .backend
140                .execute(
141                    name,
142                    task,
143                    &self.config.environment,
144                    &self.config.project_root,
145                    self.config.capture_output,
146                )
147                .await;
148        }
149
150        // All tasks run directly in workspace/project root (hermetic sandbox disabled)
151        return self.execute_task_non_hermetic(name, task).await;
152
153        // TODO: Re-enable hermetic execution when sandbox properly preserves monorepo structure
154        #[allow(unreachable_code)]
155        {
156            // Resolve workspace dependencies if enabled
157            let mut workspace_ctxs: Vec<(Workspace, Vec<LockfileEntry>)> = Vec::new();
158            let mut workspace_input_patterns = Vec::new();
159            let mut workspace_lockfile_hashes = BTreeMap::new();
160
161            for workspace_name in &task.workspaces {
162                if let Some(global_workspaces) = &self.config.workspaces {
163                    if let Some(ws_config) = global_workspaces.get(workspace_name) {
164                        if ws_config.enabled {
165                            let (ws, entries, paths, hash) = self
166                                .resolve_workspace(name, task, workspace_name, ws_config)
167                                .await?;
168                            workspace_ctxs.push((ws, entries));
169                            workspace_input_patterns.extend(paths);
170                            if let Some(h) = hash {
171                                workspace_lockfile_hashes.insert(workspace_name.clone(), h);
172                            }
173                        }
174                    } else {
175                        tracing::warn!(
176                            task = %name,
177                            workspace = %workspace_name,
178                            "Workspace not found in global configuration"
179                        );
180                    }
181                }
182            }
183
184            // Resolve inputs relative to the workspace root (if any) while keeping
185            // project-scoped inputs anchored to the original project path.
186            // Use the first workspace as the primary root if multiple are present,
187            // or fallback to project root. This might need refinement for multi-workspace tasks.
188            let primary_workspace_root = workspace_ctxs.first().map(|(ws, _)| ws.root.clone());
189
190            let project_prefix = primary_workspace_root
191                .as_ref()
192                .and_then(|root| self.config.project_root.strip_prefix(root).ok())
193                .map(|p| p.to_path_buf());
194            let input_root = primary_workspace_root
195                .clone()
196                .unwrap_or_else(|| self.config.project_root.clone());
197
198            let span_inputs = tracing::info_span!("inputs.resolve", task = %name);
199            let resolved_inputs = {
200                let _g = span_inputs.enter();
201                let resolver = InputResolver::new(&input_root);
202                let mut all_inputs = task.collect_all_inputs_with_prefix(project_prefix.as_deref());
203                all_inputs.extend(workspace_input_patterns.iter().cloned());
204                resolver.resolve(&all_inputs)?
205            };
206            if task_trace_enabled() {
207                tracing::info!(
208                    task = %name,
209                    input_root = %input_root.display(),
210                    project_root = %self.config.project_root.display(),
211                    inputs_count = resolved_inputs.files.len(),
212                    workspace_inputs = workspace_input_patterns.len(),
213                    "Resolved task inputs"
214                );
215            }
216
217            // Build cache key envelope
218            let inputs_summary: BTreeMap<String, String> = resolved_inputs.to_summary_map();
219            // Ensure deterministic order is already guaranteed by BTreeMap
220            let env_summary: BTreeMap<String, String> = self
221                .config
222                .environment
223                .vars
224                .iter()
225                .map(|(k, v)| (k.clone(), v.clone()))
226                .collect();
227            let cuenv_version = env!("CARGO_PKG_VERSION").to_string();
228            let platform = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
229            let shell_json = serde_json::to_value(&task.shell).ok();
230            let workspace_lockfile_hashes_opt = if workspace_lockfile_hashes.is_empty() {
231                None
232            } else {
233                Some(workspace_lockfile_hashes)
234            };
235
236            let envelope = task_cache::CacheKeyEnvelope {
237                inputs: inputs_summary.clone(),
238                command: task.command.clone(),
239                args: task.args.clone(),
240                shell: shell_json,
241                env: env_summary.clone(),
242                cuenv_version: cuenv_version.clone(),
243                platform: platform.clone(),
244                workspace_lockfile_hashes: workspace_lockfile_hashes_opt,
245                // Package hashes are implicitly included in inputs_summary because we added member paths to inputs
246                workspace_package_hashes: None,
247            };
248            let (cache_key, envelope_json) = task_cache::compute_cache_key(&envelope)?;
249
250            // Cache lookup
251            let span_cache = tracing::info_span!("cache.lookup", task = %name, key = %cache_key);
252            let cache_hit = {
253                let _g = span_cache.enter();
254                task_cache::lookup(&cache_key, self.config.cache_dir.as_deref())
255            };
256
257            if let Some(hit) = cache_hit {
258                tracing::info!(
259                    task = %name,
260                    key = %cache_key,
261                    path = %hit.path.display(),
262                    "Task {} cache hit: {}. Skipping execution.",
263                    name,
264                    cache_key
265                );
266                if self.config.show_cache_path {
267                    tracing::info!(cache_path = %hit.path.display(), "Cache path");
268                }
269                if let Some(dest) = &self.config.materialize_outputs {
270                    let count = task_cache::materialize_outputs(
271                        &cache_key,
272                        dest,
273                        self.config.cache_dir.as_deref(),
274                    )?;
275                    tracing::info!(materialized = count, dest = %dest.display(), "Materialized cached outputs");
276                }
277                // On cache hit, surface cached logs so behavior matches a fresh
278                // execution from the caller's perspective. We return them in the
279                // TaskResult even if `capture_output` is false, allowing callers
280                // (like the CLI) to print cached logs explicitly.
281                let stdout_path = hit.path.join("logs").join("stdout.log");
282                let stderr_path = hit.path.join("logs").join("stderr.log");
283                let stdout = std::fs::read_to_string(&stdout_path).unwrap_or_default();
284                let stderr = std::fs::read_to_string(&stderr_path).unwrap_or_default();
285                // If logs are present, we can return immediately. If logs are
286                // missing (older cache format), fall through to execute to
287                // backfill logs for future hits.
288                if !(stdout.is_empty() && stderr.is_empty()) {
289                    if !self.config.capture_output {
290                        cuenv_events::emit_task_cache_hit!(name, cache_key);
291                        // Emit cached logs as output events
292                        if !stdout.is_empty() {
293                            cuenv_events::emit_task_output!(name, "stdout", stdout);
294                        }
295                        if !stderr.is_empty() {
296                            cuenv_events::emit_task_output!(name, "stderr", stderr);
297                        }
298                    }
299                    return Ok(TaskResult {
300                        name: name.to_string(),
301                        exit_code: Some(0),
302                        stdout,
303                        stderr,
304                        success: true,
305                    });
306                } else {
307                    tracing::info!(
308                        task = %name,
309                        key = %cache_key,
310                        "Cache entry lacks logs; executing to backfill logs"
311                    );
312                }
313            }
314
315            tracing::info!(
316                task = %name,
317                key = %cache_key,
318                "Task {} executing hermetically… key {}",
319                name,
320                cache_key
321            );
322
323            let hermetic_root = create_hermetic_dir(name, &cache_key)?;
324            if self.config.show_cache_path {
325                tracing::info!(hermetic_root = %hermetic_root.display(), "Hermetic working directory");
326            }
327
328            // Seed working directory with inputs
329            let span_populate =
330                tracing::info_span!("inputs.populate", files = resolved_inputs.files.len());
331            {
332                let _g = span_populate.enter();
333                populate_hermetic_dir(&resolved_inputs, &hermetic_root)?;
334            }
335
336            // Materialize workspace artifacts (node_modules, target)
337            for (ws, entries) in workspace_ctxs {
338                self.materialize_workspace(&ws, &entries, &hermetic_root)
339                    .await?;
340            }
341
342            // Materialize outputs from inputsFrom tasks
343            if let Some(ref inputs_from) = task.inputs_from {
344                for task_output in inputs_from {
345                    let source_task = &task_output.task;
346                    let source_cache_key = task_cache::lookup_latest(
347                        &self.config.project_root,
348                        source_task,
349                        self.config.cache_dir.as_deref(),
350                    )
351                    .ok_or_else(|| {
352                        Error::configuration(format!(
353                            "Task '{}' depends on outputs from '{}' but no cached result found. \
354                         Ensure '{}' runs before this task (add it to dependsOn).",
355                            name, source_task, source_task
356                        ))
357                    })?;
358
359                    // Materialize outputs into the hermetic directory
360                    let materialized = task_cache::materialize_outputs(
361                        &source_cache_key,
362                        &hermetic_root,
363                        self.config.cache_dir.as_deref(),
364                    )?;
365                    tracing::info!(
366                        task = %name,
367                        source_task = %source_task,
368                        materialized = materialized,
369                        "Materialized outputs from dependent task"
370                    );
371                }
372            }
373
374            // Initial snapshot to detect undeclared writes
375            let initial_hashes: BTreeMap<String, String> = inputs_summary.clone();
376
377            // Build command - handle script mode vs command mode
378            let mut cmd = if let Some(script) = &task.script {
379                // Script mode: use shell to execute the script
380                let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
381                    (
382                        shell.command.clone().unwrap_or_else(|| "bash".to_string()),
383                        shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
384                    )
385                } else {
386                    // Default to bash for scripts
387                    ("bash".to_string(), "-c".to_string())
388                };
389
390                let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
391                let mut cmd = Command::new(&resolved_shell);
392                cmd.arg(&shell_flag);
393                cmd.arg(script);
394                cmd
395            } else {
396                // Command mode: existing behavior
397                // Resolve command path using the environment's PATH.
398                // This is necessary because when spawning a process, the OS looks up
399                // the executable in the current process's PATH, not the environment
400                // that will be set on the child process.
401                let resolved_command = self.config.environment.resolve_command(&task.command);
402
403                if let Some(shell) = &task.shell {
404                    if shell.command.is_some() && shell.flag.is_some() {
405                        let shell_command = shell.command.as_ref().expect("checked is_some above");
406                        let shell_flag = shell.flag.as_ref().expect("checked is_some above");
407                        // Resolve shell command too
408                        let resolved_shell = self.config.environment.resolve_command(shell_command);
409                        let mut cmd = Command::new(&resolved_shell);
410                        cmd.arg(shell_flag);
411                        if task.args.is_empty() {
412                            cmd.arg(&resolved_command);
413                        } else {
414                            let full_command = if task.command.is_empty() {
415                                task.args.join(" ")
416                            } else {
417                                format!("{} {}", resolved_command, task.args.join(" "))
418                            };
419                            cmd.arg(full_command);
420                        }
421                        cmd
422                    } else {
423                        let mut cmd = Command::new(&resolved_command);
424                        for arg in &task.args {
425                            cmd.arg(arg);
426                        }
427                        cmd
428                    }
429                } else {
430                    let mut cmd = Command::new(&resolved_command);
431                    for arg in &task.args {
432                        cmd.arg(arg);
433                    }
434                    cmd
435                }
436            };
437
438            let workdir = if let Some(dir) = &self.config.working_dir {
439                dir.clone()
440            } else if let Some(prefix) = project_prefix.as_ref() {
441                hermetic_root.join(prefix)
442            } else {
443                hermetic_root.clone()
444            };
445            std::fs::create_dir_all(&workdir).map_err(|e| Error::Io {
446                source: e,
447                path: Some(workdir.clone().into()),
448                operation: "create_dir_all".into(),
449            })?;
450            cmd.current_dir(&workdir);
451            // Set environment variables (resolved + system), set CWD
452            let env_vars = self.config.environment.merge_with_system();
453            if task_trace_enabled() {
454                tracing::info!(
455                    task = %name,
456                    hermetic_root = %hermetic_root.display(),
457                    workdir = %workdir.display(),
458                    command = %task.command,
459                    args = ?task.args,
460                    env_count = env_vars.len(),
461                    "Launching task command"
462                );
463            }
464            for (k, v) in env_vars {
465                cmd.env(k, v);
466            }
467
468            // Configure output: always capture to ensure consistent behavior on
469            // cache hits and allow callers to decide what to print.
470            cmd.stdout(Stdio::piped());
471            cmd.stderr(Stdio::piped());
472
473            let stream_logs = !self.config.capture_output;
474            if stream_logs {
475                let cmd_str = if task.command.is_empty() {
476                    task.args.join(" ")
477                } else {
478                    format!("{} {}", task.command, task.args.join(" "))
479                };
480                cuenv_events::emit_task_started!(name, cmd_str, true);
481            }
482
483            let start = std::time::Instant::now();
484            let mut child = cmd.spawn().map_err(|e| {
485                Error::configuration(format!("Failed to spawn task '{}': {}", name, e))
486            })?;
487
488            let stdout_handle = child.stdout.take();
489            let stderr_handle = child.stderr.take();
490
491            let stdout_task = async move {
492                if let Some(mut stdout) = stdout_handle {
493                    let mut output = Vec::new();
494                    let mut buf = [0u8; 4096];
495                    use std::io::Write;
496                    use tokio::io::AsyncReadExt;
497
498                    loop {
499                        match stdout.read(&mut buf).await {
500                            Ok(0) => break, // EOF
501                            Ok(n) => {
502                                let chunk = &buf[0..n];
503                                if stream_logs {
504                                    let mut handle = std::io::stdout().lock();
505                                    let _ = handle.write_all(chunk);
506                                    let _ = handle.flush();
507                                }
508                                output.extend_from_slice(chunk);
509                            }
510                            Err(_) => break,
511                        }
512                    }
513                    String::from_utf8_lossy(&output).to_string()
514                } else {
515                    String::new()
516                }
517            };
518
519            let stderr_task = async move {
520                if let Some(mut stderr) = stderr_handle {
521                    let mut output = Vec::new();
522                    let mut buf = [0u8; 4096];
523                    use std::io::Write;
524                    use tokio::io::AsyncReadExt;
525
526                    loop {
527                        match stderr.read(&mut buf).await {
528                            Ok(0) => break, // EOF
529                            Ok(n) => {
530                                let chunk = &buf[0..n];
531                                if stream_logs {
532                                    let mut handle = std::io::stderr().lock();
533                                    let _ = handle.write_all(chunk);
534                                    let _ = handle.flush();
535                                }
536                                output.extend_from_slice(chunk);
537                            }
538                            Err(_) => break,
539                        }
540                    }
541                    String::from_utf8_lossy(&output).to_string()
542                } else {
543                    String::new()
544                }
545            };
546
547            let (stdout, stderr) = tokio::join!(stdout_task, stderr_task);
548
549            let status = child.wait().await.map_err(|e| {
550                Error::configuration(format!("Failed to wait for task '{}': {}", name, e))
551            })?;
552            let duration = start.elapsed();
553
554            let exit_code = status.code().unwrap_or(1);
555            let success = status.success();
556            if !success {
557                tracing::warn!(task = %name, exit = exit_code, "Task failed");
558                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
559                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
560            } else {
561                tracing::info!(task = %name, "Task completed successfully");
562            }
563
564            // Collect declared outputs and warn on undeclared writes
565            let output_patterns: Vec<String> = if let Some(prefix) = project_prefix.as_ref() {
566                task.outputs
567                    .iter()
568                    .map(|o| prefix.join(o).to_string_lossy().to_string())
569                    .collect()
570            } else {
571                task.outputs.clone()
572            };
573            let outputs = collect_outputs(&hermetic_root, &output_patterns)?;
574            let outputs_set: HashSet<PathBuf> = outputs.iter().cloned().collect();
575            let mut output_index: Vec<task_cache::OutputIndexEntry> = Vec::new();
576
577            // Stage outputs into a temp dir for cache persistence
578            let outputs_stage = std::env::temp_dir().join(format!("cuenv-outputs-{}", cache_key));
579            if outputs_stage.exists() {
580                let _ = std::fs::remove_dir_all(&outputs_stage);
581            }
582            std::fs::create_dir_all(&outputs_stage).map_err(|e| Error::Io {
583                source: e,
584                path: Some(outputs_stage.clone().into()),
585                operation: "create_dir_all".into(),
586            })?;
587
588            for rel in &outputs {
589                let rel_for_project = project_prefix
590                    .as_ref()
591                    .and_then(|prefix| rel.strip_prefix(prefix).ok())
592                    .unwrap_or(rel)
593                    .to_path_buf();
594                let src = hermetic_root.join(rel);
595                // Intentionally avoid `let`-chains here to preserve readability
596                // and to align with review guidance; allow clippy's collapsible-if.
597                #[allow(clippy::collapsible_if)]
598                if let Ok(meta) = std::fs::metadata(&src) {
599                    if meta.is_file() {
600                        let dst = outputs_stage.join(&rel_for_project);
601                        if let Some(parent) = dst.parent() {
602                            std::fs::create_dir_all(parent).map_err(|e| Error::Io {
603                                source: e,
604                                path: Some(parent.into()),
605                                operation: "create_dir_all".into(),
606                            })?;
607                        }
608                        std::fs::copy(&src, &dst).map_err(|e| Error::Io {
609                            source: e,
610                            path: Some(dst.into()),
611                            operation: "copy".into(),
612                        })?;
613                        let (sha, _size) = crate::tasks::io::sha256_file(&src).unwrap_or_default();
614                        output_index.push(task_cache::OutputIndexEntry {
615                            rel_path: rel_for_project.to_string_lossy().to_string(),
616                            size: meta.len(),
617                            sha256: sha,
618                        });
619                    }
620                }
621            }
622
623            // Detect undeclared writes
624            let mut warned = false;
625            for entry in walkdir::WalkDir::new(&hermetic_root)
626                .into_iter()
627                .filter_map(|e| e.ok())
628            {
629                let p = entry.path();
630                if p.is_dir() {
631                    continue;
632                }
633                let rel = match p.strip_prefix(&hermetic_root) {
634                    Ok(r) => r.to_path_buf(),
635                    Err(_) => continue,
636                };
637                let rel_str = rel.to_string_lossy().to_string();
638                let (sha, _size) = crate::tasks::io::sha256_file(p).unwrap_or_default();
639                let initial = initial_hashes.get(&rel_str);
640                let changed = match initial {
641                    None => true,
642                    Some(prev) => prev != &sha,
643                };
644                if changed && !outputs_set.contains(&rel) {
645                    if !warned {
646                        tracing::warn!(task = %name, "Detected writes to undeclared paths; these are not cached as outputs");
647                        warned = true;
648                    }
649                    tracing::debug!(path = %rel_str, "Undeclared write");
650                }
651            }
652
653            // Persist cache entry on success
654            if success {
655                let meta = task_cache::TaskResultMeta {
656                    task_name: name.to_string(),
657                    command: task.command.clone(),
658                    args: task.args.clone(),
659                    env_summary,
660                    inputs_summary: inputs_summary.clone(),
661                    created_at: Utc::now(),
662                    cuenv_version,
663                    platform,
664                    duration_ms: duration.as_millis(),
665                    exit_code,
666                    cache_key_envelope: envelope_json.clone(),
667                    output_index,
668                };
669                let logs = task_cache::TaskLogs {
670                    // Persist logs regardless of capture setting so cache hits can
671                    // reproduce output faithfully.
672                    stdout: Some(stdout.clone()),
673                    stderr: Some(stderr.clone()),
674                };
675                let cache_span = tracing::info_span!("cache.save", key = %cache_key);
676                {
677                    let _g = cache_span.enter();
678                    task_cache::save_result(
679                        &cache_key,
680                        &meta,
681                        &outputs_stage,
682                        &hermetic_root,
683                        logs,
684                        self.config.cache_dir.as_deref(),
685                    )?;
686                }
687
688                // Record this task's cache key as the latest for this project
689                task_cache::record_latest(
690                    &self.config.project_root,
691                    name,
692                    &cache_key,
693                    self.config.cache_dir.as_deref(),
694                )?;
695
696                // Outputs remain in cache only - dependent tasks use inputsFrom to consume them
697            } else {
698                // Optionally persist logs in a failure/ subdir: not implemented for brevity
699            }
700
701            Ok(TaskResult {
702                name: name.to_string(),
703                exit_code: Some(exit_code),
704                stdout,
705                stderr,
706                success,
707            })
708        }
709    }
710
711    /// Execute a task non-hermetically (directly in workspace/project root)
712    ///
713    /// Used for tasks like `bun install` that need to write to the real filesystem.
714    async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
715        // Check if this is an unresolved TaskRef (should have been resolved before execution)
716        if task.is_task_ref() && task.project_root.is_none() {
717            return Err(Error::configuration(format!(
718                "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
719                 This usually means:\n\
720                 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
721                 - The referenced task '{}' doesn't exist in that project\n\
722                 - There was an error loading the referenced project's env.cue\n\
723                 Run with RUST_LOG=debug for more details.",
724                name,
725                task.task_ref.as_deref().unwrap_or("unknown"),
726                task.task_ref
727                    .as_deref()
728                    .and_then(|r| r.split(':').next_back())
729                    .unwrap_or("unknown")
730            )));
731        }
732
733        // Determine working directory (in priority order):
734        // 1. Explicit directory field on task (relative to cue.mod root)
735        // 2. TaskRef project_root (from resolution)
736        // 3. Source file directory (from _source metadata)
737        // 4. Install tasks (hermetic: false with workspaces) run from workspace root
738        // 5. Default to project root
739        let workdir = if let Some(ref dir) = task.directory {
740            // Explicit directory override: resolve relative to cue.mod root or project root
741            self.config
742                .cue_module_root
743                .as_ref()
744                .unwrap_or(&self.config.project_root)
745                .join(dir)
746        } else if let Some(ref project_root) = task.project_root {
747            // TaskRef tasks run in their original project directory
748            project_root.clone()
749        } else if let Some(ref source) = task.source {
750            // Default: run in the directory of the source file
751            if let Some(dir) = source.directory() {
752                self.config
753                    .cue_module_root
754                    .as_ref()
755                    .unwrap_or(&self.config.project_root)
756                    .join(dir)
757            } else {
758                // Source is at root (e.g., "env.cue"), use cue_module_root if available
759                // This ensures tasks defined in root env.cue run from module root,
760                // even when invoked from a subdirectory
761                self.config
762                    .cue_module_root
763                    .clone()
764                    .unwrap_or_else(|| self.config.project_root.clone())
765            }
766        } else if !task.hermetic && !task.workspaces.is_empty() {
767            // Find workspace root for install tasks
768            let workspace_name = &task.workspaces[0];
769            let manager = match workspace_name.as_str() {
770                "bun" => PackageManager::Bun,
771                "npm" => PackageManager::Npm,
772                "pnpm" => PackageManager::Pnpm,
773                "yarn" => PackageManager::YarnModern,
774                "cargo" => PackageManager::Cargo,
775                _ => PackageManager::Npm, // fallback
776            };
777            find_workspace_root(manager, &self.config.project_root)
778        } else {
779            self.config.project_root.clone()
780        };
781
782        tracing::info!(
783            task = %name,
784            workdir = %workdir.display(),
785            hermetic = false,
786            "Executing non-hermetic task"
787        );
788
789        // Emit command being run - always emit task_started for all modes
790        // (TUI needs events even when capture_output is true)
791        let cmd_str = if let Some(script) = &task.script {
792            format!("[script: {} bytes]", script.len())
793        } else if task.command.is_empty() {
794            task.args.join(" ")
795        } else {
796            format!("{} {}", task.command, task.args.join(" "))
797        };
798
799        cuenv_events::emit_task_started!(name, cmd_str, false);
800
801        // Build command - handle script mode vs command mode
802        let mut cmd = if let Some(script) = &task.script {
803            // Script mode: use shell to execute the script
804            let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
805                (
806                    shell.command.clone().unwrap_or_else(|| "bash".to_string()),
807                    shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
808                )
809            } else {
810                // Default to bash for scripts
811                ("bash".to_string(), "-c".to_string())
812            };
813
814            let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
815            let mut cmd = Command::new(&resolved_shell);
816            cmd.arg(&shell_flag);
817            cmd.arg(script);
818            cmd
819        } else {
820            // Command mode: existing behavior
821            let resolved_command = self.config.environment.resolve_command(&task.command);
822
823            if let Some(shell) = &task.shell {
824                if shell.command.is_some() && shell.flag.is_some() {
825                    let shell_command = shell.command.as_ref().expect("checked is_some above");
826                    let shell_flag = shell.flag.as_ref().expect("checked is_some above");
827                    let resolved_shell = self.config.environment.resolve_command(shell_command);
828                    let mut cmd = Command::new(&resolved_shell);
829                    cmd.arg(shell_flag);
830                    if task.args.is_empty() {
831                        cmd.arg(&resolved_command);
832                    } else {
833                        let full_command = if task.command.is_empty() {
834                            task.args.join(" ")
835                        } else {
836                            format!("{} {}", resolved_command, task.args.join(" "))
837                        };
838                        cmd.arg(full_command);
839                    }
840                    cmd
841                } else {
842                    let mut cmd = Command::new(&resolved_command);
843                    for arg in &task.args {
844                        cmd.arg(arg);
845                    }
846                    cmd
847                }
848            } else {
849                let mut cmd = Command::new(&resolved_command);
850                for arg in &task.args {
851                    cmd.arg(arg);
852                }
853                cmd
854            }
855        };
856
857        // Set working directory and environment
858        cmd.current_dir(&workdir);
859        let env_vars = self.config.environment.merge_with_system();
860        for (k, v) in &env_vars {
861            cmd.env(k, v);
862        }
863
864        // Execute - always capture output for consistent behavior
865        // If not in capture mode, stream output to terminal in real-time
866        if self.config.capture_output {
867            use tokio::io::{AsyncBufReadExt, BufReader};
868
869            let start_time = std::time::Instant::now();
870
871            // Spawn with piped stdout/stderr for streaming
872            let mut child = cmd
873                .stdout(Stdio::piped())
874                .stderr(Stdio::piped())
875                .spawn()
876                .map_err(|e| Error::Io {
877                    source: e,
878                    path: None,
879                    operation: format!("spawn task {}", name),
880                })?;
881
882            // Take ownership of stdout/stderr handles
883            let stdout_handle = child.stdout.take();
884            let stderr_handle = child.stderr.take();
885
886            // Collect output while streaming events in real-time
887            let mut stdout_lines = Vec::new();
888            let mut stderr_lines = Vec::new();
889
890            // Stream stdout
891            let name_for_stdout = name.to_string();
892            let stdout_task = tokio::spawn(async move {
893                let mut lines = Vec::new();
894                if let Some(stdout) = stdout_handle {
895                    let mut reader = BufReader::new(stdout).lines();
896                    while let Ok(Some(line)) = reader.next_line().await {
897                        cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
898                        lines.push(line);
899                    }
900                }
901                lines
902            });
903
904            // Stream stderr
905            let name_for_stderr = name.to_string();
906            let stderr_task = tokio::spawn(async move {
907                let mut lines = Vec::new();
908                if let Some(stderr) = stderr_handle {
909                    let mut reader = BufReader::new(stderr).lines();
910                    while let Ok(Some(line)) = reader.next_line().await {
911                        cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
912                        lines.push(line);
913                    }
914                }
915                lines
916            });
917
918            // Wait for process to complete and collect output
919            let status = child.wait().await.map_err(|e| Error::Io {
920                source: e,
921                path: None,
922                operation: format!("wait for task {}", name),
923            })?;
924
925            // Collect streamed output
926            if let Ok(lines) = stdout_task.await {
927                stdout_lines = lines;
928            }
929            if let Ok(lines) = stderr_task.await {
930                stderr_lines = lines;
931            }
932
933            let duration_ms = start_time.elapsed().as_millis() as u64;
934            let stdout = stdout_lines.join("\n");
935            let stderr = stderr_lines.join("\n");
936            let exit_code = status.code().unwrap_or(-1);
937            let success = status.success();
938
939            // Emit task completion event
940            cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
941
942            if !success {
943                tracing::warn!(task = %name, exit = exit_code, "Task failed");
944                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
945                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
946            }
947
948            Ok(TaskResult {
949                name: name.to_string(),
950                exit_code: Some(exit_code),
951                stdout,
952                stderr,
953                success,
954            })
955        } else {
956            // Stream output directly to terminal (interactive mode)
957            let status = cmd
958                .stdout(Stdio::inherit())
959                .stderr(Stdio::inherit())
960                .status()
961                .await
962                .map_err(|e| Error::Io {
963                    source: e,
964                    path: None,
965                    operation: format!("spawn task {}", name),
966                })?;
967
968            let exit_code = status.code().unwrap_or(-1);
969            let success = status.success();
970
971            if !success {
972                tracing::warn!(task = %name, exit = exit_code, "Task failed");
973            }
974
975            Ok(TaskResult {
976                name: name.to_string(),
977                exit_code: Some(exit_code),
978                stdout: String::new(), // Output went to terminal
979                stderr: String::new(),
980                success,
981            })
982        }
983    }
984
985    /// Execute a task definition (single task or group)
986    #[async_recursion]
987    pub async fn execute_definition(
988        &self,
989        name: &str,
990        definition: &TaskDefinition,
991        all_tasks: &Tasks,
992    ) -> Result<Vec<TaskResult>> {
993        match definition {
994            TaskDefinition::Single(task) => {
995                let result = self.execute_task(name, task.as_ref()).await?;
996                Ok(vec![result])
997            }
998            TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
999        }
1000    }
1001
1002    async fn execute_group(
1003        &self,
1004        prefix: &str,
1005        group: &TaskGroup,
1006        all_tasks: &Tasks,
1007    ) -> Result<Vec<TaskResult>> {
1008        match group {
1009            TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
1010            TaskGroup::Parallel(group) => self.execute_parallel(prefix, group, all_tasks).await,
1011        }
1012    }
1013
1014    async fn execute_sequential(
1015        &self,
1016        prefix: &str,
1017        tasks: &[TaskDefinition],
1018        all_tasks: &Tasks,
1019    ) -> Result<Vec<TaskResult>> {
1020        if !self.config.capture_output {
1021            cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
1022        }
1023        let mut results = Vec::new();
1024        for (i, task_def) in tasks.iter().enumerate() {
1025            let task_name = format!("{}[{}]", prefix, i);
1026            let task_results = self
1027                .execute_definition(&task_name, task_def, all_tasks)
1028                .await?;
1029            for result in &task_results {
1030                if !result.success {
1031                    let message = format!(
1032                        "Sequential task group '{prefix}' halted.\n\n{}",
1033                        summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
1034                    );
1035                    return Err(Error::configuration(message));
1036                }
1037            }
1038            results.extend(task_results);
1039        }
1040        Ok(results)
1041    }
1042
1043    async fn execute_parallel(
1044        &self,
1045        prefix: &str,
1046        group: &ParallelGroup,
1047        all_tasks: &Tasks,
1048    ) -> Result<Vec<TaskResult>> {
1049        // Check for "default" task to override parallel execution
1050        if let Some(default_task) = group.tasks.get("default") {
1051            if !self.config.capture_output {
1052                cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
1053            }
1054            // Execute only the default task, using the group prefix directly
1055            // since "default" is implicit when invoking the group name
1056            let task_name = format!("{}.default", prefix);
1057            return self
1058                .execute_definition(&task_name, default_task, all_tasks)
1059                .await;
1060        }
1061
1062        if !self.config.capture_output {
1063            cuenv_events::emit_task_group_started!(prefix, false, group.tasks.len());
1064        }
1065        let mut join_set = JoinSet::new();
1066        let all_tasks = Arc::new(all_tasks.clone());
1067        let mut all_results = Vec::new();
1068        let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
1069            if let Some(failed) = results.iter().find(|r| !r.success) {
1070                let message = format!(
1071                    "Parallel task group '{prefix}' halted.\n\n{}",
1072                    summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
1073                );
1074                return Err(Error::configuration(message));
1075            }
1076            all_results.extend(results);
1077            Ok(())
1078        };
1079        for (name, task_def) in &group.tasks {
1080            let task_name = format!("{}.{}", prefix, name);
1081            let task_def = task_def.clone();
1082            let all_tasks = Arc::clone(&all_tasks);
1083            let executor = self.clone_with_config();
1084            join_set.spawn(async move {
1085                executor
1086                    .execute_definition(&task_name, &task_def, &all_tasks)
1087                    .await
1088            });
1089            if self.config.max_parallel > 0
1090                && join_set.len() >= self.config.max_parallel
1091                && let Some(result) = join_set.join_next().await
1092            {
1093                match result {
1094                    Ok(Ok(results)) => merge_results(results)?,
1095                    Ok(Err(e)) => return Err(e),
1096                    Err(e) => {
1097                        return Err(Error::configuration(format!(
1098                            "Task execution panicked: {}",
1099                            e
1100                        )));
1101                    }
1102                }
1103            }
1104        }
1105        while let Some(result) = join_set.join_next().await {
1106            match result {
1107                Ok(Ok(results)) => merge_results(results)?,
1108                Ok(Err(e)) => return Err(e),
1109                Err(e) => {
1110                    return Err(Error::configuration(format!(
1111                        "Task execution panicked: {}",
1112                        e
1113                    )));
1114                }
1115            }
1116        }
1117        Ok(all_results)
1118    }
1119
1120    pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
1121        let parallel_groups = graph.get_parallel_groups()?;
1122        let mut all_results = Vec::new();
1123
1124        // IMPORTANT:
1125        // Each parallel group represents a dependency "level". We must not start tasks from the
1126        // next group until *all* tasks from the current group have completed successfully.
1127        //
1128        // The previous implementation pipelined groups (starting the next group as soon as all
1129        // tasks from the current group were spawned), which allowed dependent tasks to run before
1130        // their dependencies finished (especially visible with long-running tasks like dev servers).
1131        for mut group in parallel_groups {
1132            let mut join_set = JoinSet::new();
1133
1134            while !group.is_empty() || !join_set.is_empty() {
1135                // Fill the concurrency window for this group
1136                while let Some(node) = group.pop() {
1137                    let task = node.task.clone();
1138                    let name = node.name.clone();
1139                    let executor = self.clone_with_config();
1140                    join_set.spawn(async move { executor.execute_task(&name, &task).await });
1141
1142                    if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
1143                        break;
1144                    }
1145                }
1146
1147                if let Some(result) = join_set.join_next().await {
1148                    match result {
1149                        Ok(Ok(task_result)) => {
1150                            if !task_result.success {
1151                                join_set.abort_all();
1152                                let message = format!(
1153                                    "Task graph execution halted.\n\n{}",
1154                                    summarize_task_failure(
1155                                        &task_result,
1156                                        TASK_FAILURE_SNIPPET_LINES,
1157                                    )
1158                                );
1159                                return Err(Error::configuration(message));
1160                            }
1161                            all_results.push(task_result);
1162                        }
1163                        Ok(Err(e)) => {
1164                            join_set.abort_all();
1165                            return Err(e);
1166                        }
1167                        Err(e) => {
1168                            join_set.abort_all();
1169                            return Err(Error::configuration(format!(
1170                                "Task execution panicked: {}",
1171                                e
1172                            )));
1173                        }
1174                    }
1175                }
1176            }
1177        }
1178
1179        Ok(all_results)
1180    }
1181
1182    async fn resolve_workspace(
1183        &self,
1184        _task_name: &str,
1185        task: &Task,
1186        workspace_name: &str,
1187        config: &WorkspaceConfig,
1188    ) -> Result<(Workspace, Vec<LockfileEntry>, Vec<String>, Option<String>)> {
1189        let root = self.config.project_root.clone();
1190        let task_label = _task_name.to_string();
1191        let command = task.command.clone();
1192        let config_pm = config.package_manager.clone();
1193        let config_root = config.root.clone();
1194        // WorkspaceConfig doesn't support 'packages' filtering yet, so assume full workspace deps for now,
1195        // or imply it from context. If we want package filtering, we need to add it to WorkspaceConfig
1196        // or assume all deps.
1197        // However, the previous logic used `packages` from WorkspaceInputs.
1198        // For now, let's assume traversing all dependencies if not specified.
1199        // But wait, `WorkspaceConfig` in schema doesn't have `packages`.
1200        // So we probably default to "all relevant" or auto-infer from current directory if we are inside a package.
1201        let mut packages = Vec::new();
1202
1203        // If we are in a subdirectory that matches a workspace member, we might want to infer that package.
1204        // The previous logic did that if `packages` was empty.
1205
1206        let lockfile_override: Option<String> = None; // WorkspaceConfig doesn't have lockfile override yet.
1207        // If we need it, we should add it to WorkspaceConfig. For now assume standard discovery.
1208
1209        let mut traverse_workspace_deps = true;
1210
1211        // Use spawn_blocking for heavy lifting
1212        let workspace_name_owned = workspace_name.to_string();
1213        tokio::task::spawn_blocking(move || {
1214            let override_for_detection = lockfile_override.as_ref().map(|lock| {
1215                let candidate = PathBuf::from(lock);
1216                if candidate.is_absolute() {
1217                    candidate
1218                } else {
1219                    root.join(lock)
1220                }
1221            });
1222
1223            // 1. Detect Package Manager
1224            // Priority: 1. explicit config, 2. workspace name (if it matches a PM), 3. auto-detect
1225            let manager = if let Some(pm_str) = config_pm {
1226                match pm_str.as_str() {
1227                    "npm" => PackageManager::Npm,
1228                    "bun" => PackageManager::Bun,
1229                    "pnpm" => PackageManager::Pnpm,
1230                    "yarn" => PackageManager::YarnModern,
1231                    "yarn-classic" => PackageManager::YarnClassic,
1232                    "cargo" => PackageManager::Cargo,
1233                    _ => {
1234                        return Err(Error::configuration(format!(
1235                            "Unknown package manager: {}",
1236                            pm_str
1237                        )));
1238                    }
1239                }
1240            } else {
1241                // Try to match workspace name to package manager
1242                match workspace_name_owned.as_str() {
1243                    "npm" => PackageManager::Npm,
1244                    "bun" => PackageManager::Bun,
1245                    "pnpm" => PackageManager::Pnpm,
1246                    "yarn" => PackageManager::YarnModern,
1247                    "cargo" => PackageManager::Cargo,
1248                    _ => {
1249                         // Auto-detect if name doesn't match
1250                        // Priority: command hint -> lockfiles
1251                        let hint = detect_from_command(&command);
1252                        let detected = match detect_package_managers(&root) {
1253                            Ok(list) => list,
1254                            Err(e) => {
1255                                if override_for_detection.is_some() {
1256                                    Vec::new()
1257                                } else {
1258                                    return Err(Error::configuration(format!(
1259                                        "Failed to detect package managers: {}",
1260                                        e
1261                                    )));
1262                                }
1263                            }
1264                        };
1265
1266                        if let Some(h) = hint {
1267                            if detected.contains(&h) {
1268                                h
1269                            } else if !detected.is_empty() {
1270                                // Prefer detected files
1271                                detected[0]
1272                            } else if let Some(ref override_path) = override_for_detection {
1273                                infer_manager_from_lockfile(override_path).ok_or_else(|| {
1274                                    Error::configuration(
1275                                        "Unable to infer package manager from lockfile override",
1276                                    )
1277                                })?
1278                            } else {
1279                                return Err(Error::configuration(
1280                                    format!("No package manager specified for workspace '{}' and could not detect one", workspace_name_owned),
1281                                ));
1282                            }
1283                        } else if !detected.is_empty() {
1284                            detected[0]
1285                        } else if let Some(ref override_path) = override_for_detection {
1286                            infer_manager_from_lockfile(override_path).ok_or_else(|| {
1287                                Error::configuration(
1288                                    "Unable to infer package manager from lockfile override",
1289                                )
1290                            })?
1291                        } else {
1292                            return Err(Error::configuration(
1293                                "Could not detect package manager for workspace resolution",
1294                            ));
1295                        }
1296                    }
1297                }
1298            };
1299
1300            // Resolve the actual workspace root by walking up until we find a
1301            // directory that declares a workspace for this package manager.
1302            // If config.root is set, use that.
1303            let workspace_root = if let Some(root_override) = &config_root {
1304                root.join(root_override)
1305            } else {
1306                find_workspace_root(manager, &root)
1307            };
1308
1309            if task_trace_enabled() {
1310                tracing::info!(
1311                    task = %task_label,
1312                    manager = %manager,
1313                    project_root = %root.display(),
1314                    workspace_root = %workspace_root.display(),
1315                    "Resolved workspace root for package manager"
1316                );
1317            }
1318
1319            let lockfile_override_path = lockfile_override.as_ref().map(|lock| {
1320                let candidate = PathBuf::from(lock);
1321                if candidate.is_absolute() {
1322                    candidate
1323                } else {
1324                    workspace_root.join(lock)
1325                }
1326            });
1327
1328            // 2. Discover Workspace
1329            let discovery: Box<dyn WorkspaceDiscovery> = match manager {
1330                PackageManager::Npm
1331                | PackageManager::Bun
1332                | PackageManager::YarnClassic
1333                | PackageManager::YarnModern
1334                | PackageManager::Deno => Box::new(PackageJsonDiscovery),
1335                PackageManager::Pnpm => Box::new(PnpmWorkspaceDiscovery),
1336                PackageManager::Cargo => Box::new(CargoTomlDiscovery),
1337            };
1338
1339            let workspace = discovery.discover(&workspace_root).map_err(|e| {
1340                Error::configuration(format!("Failed to discover workspace: {}", e))
1341            })?;
1342
1343            // 3. Parse Lockfile
1344            let lockfile_path = if let Some(path) = lockfile_override_path {
1345                if !path.exists() {
1346                    return Err(Error::configuration(format!(
1347                        "Workspace lockfile override does not exist: {}",
1348                        path.display()
1349                    )));
1350                }
1351                path
1352            } else {
1353                workspace.lockfile.clone().ok_or_else(|| {
1354                    Error::configuration("Workspace resolution requires a lockfile")
1355                })?
1356            };
1357
1358            let parser: Box<dyn LockfileParser> = match manager {
1359                PackageManager::Npm => Box::new(NpmLockfileParser),
1360                PackageManager::Bun => Box::new(BunLockfileParser),
1361                PackageManager::Pnpm => Box::new(PnpmLockfileParser),
1362                PackageManager::YarnClassic => Box::new(YarnClassicLockfileParser),
1363                PackageManager::YarnModern => Box::new(YarnModernLockfileParser),
1364                PackageManager::Cargo => Box::new(CargoLockfileParser),
1365                PackageManager::Deno => Box::new(NpmLockfileParser), // Deno uses a similar JSON lockfile format
1366            };
1367
1368            let entries = parser
1369                .parse(&lockfile_path)
1370                .map_err(|e| Error::configuration(format!("Failed to parse lockfile: {}", e)))?;
1371            if task_trace_enabled() {
1372                tracing::info!(
1373                    task = %task_label,
1374                    lockfile = %lockfile_path.display(),
1375                    members = entries.len(),
1376                    "Parsed workspace lockfile"
1377                );
1378            }
1379
1380            // Compute lockfile hash
1381            let (hash, _) = crate::tasks::io::sha256_file(&lockfile_path)?;
1382
1383            // Infer packages when none explicitly provided by scoping to the
1384            // current workspace member. (We intentionally avoid pulling all
1385            // transitive deps here to keep hashing fast for large monorepos.)
1386            if packages.is_empty() {
1387                let current_member = workspace
1388                    .members
1389                    .iter()
1390                    .find(|m| workspace_root.join(&m.path) == root || root.starts_with(workspace_root.join(&m.path)));
1391                if let Some(member) = current_member {
1392                    let inferred = vec![member.name.clone()];
1393                    if task_trace_enabled() {
1394                        tracing::info!(
1395                            task = %task_label,
1396                            inferred_packages = ?inferred,
1397                            "Inferred workspace packages from current project"
1398                        );
1399                    }
1400                    packages = inferred;
1401                    traverse_workspace_deps = true;
1402                }
1403            }
1404
1405            // 4. Collect Inputs
1406            let mut member_paths = Vec::new();
1407
1408            // Always include workspace configuration files
1409            member_paths.push(manager.workspace_config_name().to_string());
1410            if let Ok(rel) = lockfile_path.strip_prefix(&workspace_root) {
1411                member_paths.push(rel.to_string_lossy().to_string());
1412            } else {
1413                member_paths.push(lockfile_path.to_string_lossy().to_string());
1414            }
1415
1416            if packages.is_empty() {
1417                for member in &workspace.members {
1418                    let manifest_rel = member
1419                        .path
1420                        .join(manager.workspace_config_name());
1421                    member_paths.push(manifest_rel.to_string_lossy().to_string());
1422                }
1423            } else {
1424                let mut to_visit: Vec<String> = packages.clone();
1425                let mut visited = HashSet::new();
1426
1427                while let Some(pkg_name) = to_visit.pop() {
1428                    if visited.contains(&pkg_name) {
1429                        continue;
1430                    }
1431                    visited.insert(pkg_name.clone());
1432
1433                    if let Some(member) = workspace.find_member(&pkg_name) {
1434                        let manifest_rel = member
1435                            .path
1436                            .join(manager.workspace_config_name());
1437                        member_paths.push(manifest_rel.to_string_lossy().to_string());
1438
1439                        // Add dependencies when explicitly requested
1440                        if traverse_workspace_deps {
1441                            let mut dependency_candidates: HashSet<String> = HashSet::new();
1442
1443                            if let Some(entry) = entries.iter().find(|e| e.name == pkg_name) {
1444                                for dep in &entry.dependencies {
1445                                    if entries
1446                                        .iter()
1447                                        .any(|e| e.name == dep.name && e.is_workspace_member)
1448                                    {
1449                                        dependency_candidates.insert(dep.name.clone());
1450                                    }
1451                                }
1452                            }
1453
1454                            for dep_name in &member.dependencies {
1455                                if workspace.find_member(dep_name).is_some() {
1456                                    dependency_candidates.insert(dep_name.clone());
1457                                }
1458                            }
1459
1460                            for dep_name in dependency_candidates {
1461                                to_visit.push(dep_name);
1462                            }
1463                        }
1464                    }
1465                }
1466            }
1467
1468            if task_trace_enabled() {
1469                tracing::info!(
1470                    task = %task_label,
1471                    members = ?member_paths,
1472                    "Workspace input member paths selected"
1473                );
1474            }
1475
1476            Ok((workspace, entries, member_paths, Some(hash)))
1477        })
1478        .await
1479        .map_err(|e| Error::configuration(format!("Task execution panicked: {}", e)))?
1480    }
1481
1482    async fn materialize_workspace(
1483        &self,
1484        workspace: &Workspace,
1485        entries: &[LockfileEntry],
1486        target_dir: &Path,
1487    ) -> Result<()> {
1488        // Dispatch to appropriate materializer
1489        let materializer: Box<dyn Materializer> = match workspace.manager {
1490            PackageManager::Npm
1491            | PackageManager::Bun
1492            | PackageManager::Pnpm
1493            | PackageManager::YarnClassic
1494            | PackageManager::YarnModern
1495            | PackageManager::Deno => Box::new(NodeModulesMaterializer),
1496            PackageManager::Cargo => Box::new(CargoMaterializer),
1497        };
1498
1499        materializer
1500            .materialize(workspace, entries, target_dir)
1501            .map_err(|e| Error::configuration(format!("Materialization failed: {}", e)))
1502    }
1503
1504    fn clone_with_config(&self) -> Self {
1505        // Share the backend across clones to preserve container cache for Dagger chaining
1506        Self::with_shared_backend(self.config.clone(), self.backend.clone())
1507    }
1508}
1509
1510fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
1511    let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
1512
1513    loop {
1514        let is_root = match manager {
1515            PackageManager::Npm
1516            | PackageManager::Bun
1517            | PackageManager::YarnClassic
1518            | PackageManager::YarnModern => package_json_has_workspaces(&current),
1519            PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
1520            PackageManager::Cargo => cargo_toml_has_workspace(&current),
1521            PackageManager::Deno => deno_json_has_workspace(&current),
1522        };
1523
1524        if is_root {
1525            return current;
1526        }
1527
1528        if let Some(parent) = current.parent() {
1529            current = parent.to_path_buf();
1530        } else {
1531            return start.to_path_buf();
1532        }
1533    }
1534}
1535
1536fn package_json_has_workspaces(dir: &Path) -> bool {
1537    let path = dir.join("package.json");
1538    let content = std::fs::read_to_string(&path);
1539    let Ok(json) = content.and_then(|s| {
1540        serde_json::from_str::<serde_json::Value>(&s)
1541            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1542    }) else {
1543        return false;
1544    };
1545
1546    match json.get("workspaces") {
1547        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1548        Some(serde_json::Value::Object(map)) => map
1549            .get("packages")
1550            .and_then(|packages| packages.as_array())
1551            .map(|arr| !arr.is_empty())
1552            .unwrap_or(false),
1553        _ => false,
1554    }
1555}
1556
1557fn cargo_toml_has_workspace(dir: &Path) -> bool {
1558    let path = dir.join("Cargo.toml");
1559    let Ok(content) = std::fs::read_to_string(&path) else {
1560        return false;
1561    };
1562
1563    content.contains("[workspace]")
1564}
1565
1566fn deno_json_has_workspace(dir: &Path) -> bool {
1567    let path = dir.join("deno.json");
1568    let content = std::fs::read_to_string(&path);
1569    let Ok(json) = content.and_then(|s| {
1570        serde_json::from_str::<serde_json::Value>(&s)
1571            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
1572    }) else {
1573        return false;
1574    };
1575
1576    // Deno uses "workspace" (not "workspaces") for workspace configuration
1577    match json.get("workspace") {
1578        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
1579        Some(serde_json::Value::Object(_)) => true,
1580        _ => false,
1581    }
1582}
1583
1584fn task_trace_enabled() -> bool {
1585    static ENABLED: OnceLock<bool> = OnceLock::new();
1586    *ENABLED.get_or_init(|| {
1587        matches!(
1588            std::env::var("CUENV_TRACE_TASKS")
1589                .unwrap_or_default()
1590                .trim()
1591                .to_ascii_lowercase()
1592                .as_str(),
1593            "1" | "true" | "yes" | "on"
1594        )
1595    })
1596}
1597
1598/// Build a compact, user-friendly summary for a failed task, including the
1599/// exit code and the tail of stdout/stderr to help with diagnostics.
1600pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
1601    let exit_code = result
1602        .exit_code
1603        .map(|c| c.to_string())
1604        .unwrap_or_else(|| "unknown".to_string());
1605
1606    let mut sections = Vec::new();
1607    sections.push(format!(
1608        "Task '{}' failed with exit code {}.",
1609        result.name, exit_code
1610    ));
1611
1612    let output = format_failure_streams(result, max_output_lines);
1613    if output.is_empty() {
1614        sections.push(
1615            "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
1616                .to_string(),
1617        );
1618    } else {
1619        sections.push(output);
1620    }
1621
1622    sections.join("\n\n")
1623}
1624
1625fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
1626    let mut streams = Vec::new();
1627
1628    if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
1629        streams.push(stdout);
1630    }
1631
1632    if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
1633        streams.push(stderr);
1634    }
1635
1636    streams.join("\n\n")
1637}
1638
1639fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
1640    let normalized = content.trim_end();
1641    if normalized.is_empty() {
1642        return None;
1643    }
1644
1645    let lines: Vec<&str> = normalized.lines().collect();
1646    let total = lines.len();
1647    let start = total.saturating_sub(max_output_lines);
1648    let snippet = lines[start..].join("\n");
1649
1650    let header = if total > max_output_lines {
1651        format!("{label} (last {max_output_lines} of {total} lines):")
1652    } else {
1653        format!("{label}:")
1654    };
1655
1656    Some(format!("{header}\n{snippet}"))
1657}
1658
1659fn infer_manager_from_lockfile(path: &Path) -> Option<PackageManager> {
1660    match path.file_name().and_then(|n| n.to_str())? {
1661        "package-lock.json" => Some(PackageManager::Npm),
1662        "bun.lock" => Some(PackageManager::Bun),
1663        "pnpm-lock.yaml" => Some(PackageManager::Pnpm),
1664        "yarn.lock" => Some(PackageManager::YarnModern),
1665        "Cargo.lock" => Some(PackageManager::Cargo),
1666        _ => None,
1667    }
1668}
1669
1670fn create_hermetic_dir(task_name: &str, key: &str) -> Result<PathBuf> {
1671    // Use OS temp dir; name scoped by task and cache key prefix.
1672    // IMPORTANT: Ensure the workdir is clean on every run to preserve hermeticity.
1673    let sanitized_task = task_name
1674        .chars()
1675        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1676        .collect::<String>();
1677
1678    let base = std::env::temp_dir().join(format!(
1679        "cuenv-work-{}-{}",
1680        sanitized_task,
1681        &key[..12.min(key.len())]
1682    ));
1683
1684    // If a directory from a previous run exists, remove it before reuse.
1685    // This avoids contamination from artifacts left by failed runs where no cache was saved.
1686    if base.exists()
1687        && let Err(e) = std::fs::remove_dir_all(&base)
1688    {
1689        // If we cannot remove the previous directory (e.g. in-use on Windows),
1690        // fall back to a unique, fresh directory to maintain hermetic execution.
1691        let ts = Utc::now().format("%Y%m%d%H%M%S%3f");
1692        let fallback = std::env::temp_dir().join(format!(
1693            "cuenv-work-{}-{}-{}",
1694            sanitized_task,
1695            &key[..12.min(key.len())],
1696            ts
1697        ));
1698        tracing::warn!(
1699            previous = %base.display(),
1700            fallback = %fallback.display(),
1701            error = %e,
1702            "Failed to clean previous hermetic workdir; using fresh fallback directory"
1703        );
1704        std::fs::create_dir_all(&fallback).map_err(|e| Error::Io {
1705            source: e,
1706            path: Some(fallback.clone().into()),
1707            operation: "create_dir_all".into(),
1708        })?;
1709        return Ok(fallback);
1710    }
1711
1712    std::fs::create_dir_all(&base).map_err(|e| Error::Io {
1713        source: e,
1714        path: Some(base.clone().into()),
1715        operation: "create_dir_all".into(),
1716    })?;
1717    Ok(base)
1718}
1719
1720/// Execute an arbitrary command with the cuenv environment
1721pub async fn execute_command(
1722    command: &str,
1723    args: &[String],
1724    environment: &Environment,
1725) -> Result<i32> {
1726    tracing::info!("Executing command: {} {:?}", command, args);
1727    let mut cmd = Command::new(command);
1728    cmd.args(args);
1729    let env_vars = environment.merge_with_system();
1730    for (key, value) in env_vars {
1731        cmd.env(key, value);
1732    }
1733    cmd.stdout(Stdio::inherit());
1734    cmd.stderr(Stdio::inherit());
1735    cmd.stdin(Stdio::inherit());
1736    let status = cmd.status().await.map_err(|e| {
1737        Error::configuration(format!("Failed to execute command '{}': {}", command, e))
1738    })?;
1739    Ok(status.code().unwrap_or(1))
1740}
1741
1742#[cfg(test)]
1743mod tests {
1744    use super::*;
1745    use crate::tasks::Input;
1746    use std::fs;
1747    use tempfile::TempDir;
1748
1749    #[tokio::test]
1750    async fn test_executor_config_default() {
1751        let config = ExecutorConfig::default();
1752        assert!(!config.capture_output);
1753        assert_eq!(config.max_parallel, 0);
1754        assert!(config.environment.is_empty());
1755    }
1756
1757    #[tokio::test]
1758    async fn test_task_result() {
1759        let result = TaskResult {
1760            name: "test".to_string(),
1761            exit_code: Some(0),
1762            stdout: "output".to_string(),
1763            stderr: String::new(),
1764            success: true,
1765        };
1766        assert_eq!(result.name, "test");
1767        assert_eq!(result.exit_code, Some(0));
1768        assert!(result.success);
1769        assert_eq!(result.stdout, "output");
1770    }
1771
1772    #[tokio::test]
1773    async fn test_execute_simple_task() {
1774        let config = ExecutorConfig {
1775            capture_output: true,
1776            ..Default::default()
1777        };
1778        let executor = TaskExecutor::new(config);
1779        let task = Task {
1780            command: "echo".to_string(),
1781            args: vec!["hello".to_string()],
1782            description: Some("Hello task".to_string()),
1783            ..Default::default()
1784        };
1785        let result = executor.execute_task("test", &task).await.unwrap();
1786        assert!(result.success);
1787        assert_eq!(result.exit_code, Some(0));
1788        assert!(result.stdout.contains("hello"));
1789    }
1790
1791    #[tokio::test]
1792    async fn test_execute_with_environment() {
1793        let mut config = ExecutorConfig {
1794            capture_output: true,
1795            ..Default::default()
1796        };
1797        config
1798            .environment
1799            .set("TEST_VAR".to_string(), "test_value".to_string());
1800        let executor = TaskExecutor::new(config);
1801        let task = Task {
1802            command: "printenv".to_string(),
1803            args: vec!["TEST_VAR".to_string()],
1804            description: Some("Print env task".to_string()),
1805            ..Default::default()
1806        };
1807        let result = executor.execute_task("test", &task).await.unwrap();
1808        assert!(result.success);
1809        assert!(result.stdout.contains("test_value"));
1810    }
1811
1812    #[tokio::test]
1813    async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
1814        let tmp = TempDir::new().unwrap();
1815        let root = tmp.path();
1816
1817        // Workspace root with workspaces + lockfile
1818        fs::write(
1819            root.join("package.json"),
1820            r#"{
1821  "name": "root-app",
1822  "version": "0.0.0",
1823  "workspaces": ["packages/*", "apps/*"],
1824  "dependencies": {
1825    "@rawkodeacademy/content-technologies": "workspace:*"
1826  }
1827}"#,
1828        )
1829        .unwrap();
1830        // Deliberately omit the workspace member name for apps/site to mimic lockfiles
1831        // that only record member paths, ensuring we can still discover dependencies.
1832        fs::write(
1833            root.join("bun.lock"),
1834            r#"{
1835  "lockfileVersion": 1,
1836  "workspaces": {
1837    "": {
1838      "name": "root-app",
1839      "dependencies": {
1840        "@rawkodeacademy/content-technologies": "workspace:*"
1841      }
1842    },
1843    "packages/content-technologies": {
1844      "name": "@rawkodeacademy/content-technologies",
1845      "version": "0.0.1"
1846    },
1847    "apps/site": {
1848      "version": "0.0.0",
1849      "dependencies": {
1850        "@rawkodeacademy/content-technologies": "workspace:*"
1851      }
1852    }
1853  },
1854  "packages": {}
1855}"#,
1856        )
1857        .unwrap();
1858
1859        // Workspace member packages
1860        fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
1861        fs::write(
1862            root.join("packages/content-technologies/package.json"),
1863            r#"{
1864  "name": "@rawkodeacademy/content-technologies",
1865  "version": "0.0.1"
1866}"#,
1867        )
1868        .unwrap();
1869
1870        fs::create_dir_all(root.join("apps/site")).unwrap();
1871        fs::write(
1872            root.join("apps/site/package.json"),
1873            r#"{
1874  "name": "site",
1875  "version": "0.0.0",
1876  "dependencies": {
1877    "@rawkodeacademy/content-technologies": "workspace:*"
1878  }
1879}"#,
1880        )
1881        .unwrap();
1882
1883        let mut workspaces = HashMap::new();
1884        workspaces.insert(
1885            "bun".to_string(),
1886            WorkspaceConfig {
1887                enabled: true,
1888                package_manager: Some("bun".to_string()),
1889                root: None,
1890                hooks: None,
1891            },
1892        );
1893
1894        let config = ExecutorConfig {
1895            capture_output: true,
1896            project_root: root.join("apps/site"),
1897            workspaces: Some(workspaces),
1898            ..Default::default()
1899        };
1900        let executor = TaskExecutor::new(config);
1901
1902        let task = Task {
1903            command: "sh".to_string(),
1904            args: vec![
1905                "-c".to_string(),
1906                "find ../.. -maxdepth 4 -type d | sort".to_string(),
1907            ],
1908            inputs: vec![Input::Path("package.json".to_string())],
1909            workspaces: vec!["bun".to_string()],
1910            ..Default::default()
1911        };
1912
1913        let result = executor.execute_task("install", &task).await.unwrap();
1914        assert!(
1915            result.success,
1916            "command failed stdout='{}' stderr='{}'",
1917            result.stdout, result.stderr
1918        );
1919        assert!(
1920            result
1921                .stdout
1922                .split_whitespace()
1923                .any(|line| line.ends_with("packages/content-technologies")),
1924            "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1925            result.stdout,
1926            result.stderr
1927        );
1928    }
1929
1930    #[tokio::test]
1931    async fn test_execute_failing_task() {
1932        let config = ExecutorConfig {
1933            capture_output: true,
1934            ..Default::default()
1935        };
1936        let executor = TaskExecutor::new(config);
1937        let task = Task {
1938            command: "false".to_string(),
1939            description: Some("Failing task".to_string()),
1940            ..Default::default()
1941        };
1942        let result = executor.execute_task("test", &task).await.unwrap();
1943        assert!(!result.success);
1944        assert_eq!(result.exit_code, Some(1));
1945    }
1946
1947    #[tokio::test]
1948    async fn test_execute_sequential_group() {
1949        let config = ExecutorConfig {
1950            capture_output: true,
1951            ..Default::default()
1952        };
1953        let executor = TaskExecutor::new(config);
1954        let task1 = Task {
1955            command: "echo".to_string(),
1956            args: vec!["first".to_string()],
1957            description: Some("First task".to_string()),
1958            ..Default::default()
1959        };
1960        let task2 = Task {
1961            command: "echo".to_string(),
1962            args: vec!["second".to_string()],
1963            description: Some("Second task".to_string()),
1964            ..Default::default()
1965        };
1966        let group = TaskGroup::Sequential(vec![
1967            TaskDefinition::Single(Box::new(task1)),
1968            TaskDefinition::Single(Box::new(task2)),
1969        ]);
1970        let all_tasks = Tasks::new();
1971        let results = executor
1972            .execute_group("seq", &group, &all_tasks)
1973            .await
1974            .unwrap();
1975        assert_eq!(results.len(), 2);
1976        assert!(results[0].stdout.contains("first"));
1977        assert!(results[1].stdout.contains("second"));
1978    }
1979
1980    #[tokio::test]
1981    async fn test_command_injection_prevention() {
1982        let config = ExecutorConfig {
1983            capture_output: true,
1984            ..Default::default()
1985        };
1986        let executor = TaskExecutor::new(config);
1987        let malicious_task = Task {
1988            command: "echo".to_string(),
1989            args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1990            description: Some("Malicious task test".to_string()),
1991            ..Default::default()
1992        };
1993        let result = executor
1994            .execute_task("malicious", &malicious_task)
1995            .await
1996            .unwrap();
1997        assert!(result.success);
1998        assert!(result.stdout.contains("hello ; rm -rf /"));
1999    }
2000
2001    #[tokio::test]
2002    async fn test_special_characters_in_args() {
2003        let config = ExecutorConfig {
2004            capture_output: true,
2005            ..Default::default()
2006        };
2007        let executor = TaskExecutor::new(config);
2008        let special_chars = vec![
2009            "$USER",
2010            "$(whoami)",
2011            "`whoami`",
2012            "&& echo hacked",
2013            "|| echo failed",
2014            "> /tmp/hack",
2015            "| cat",
2016        ];
2017        for special_arg in special_chars {
2018            let task = Task {
2019                command: "echo".to_string(),
2020                args: vec!["safe".to_string(), special_arg.to_string()],
2021                description: Some("Special character test".to_string()),
2022                ..Default::default()
2023            };
2024            let result = executor.execute_task("special", &task).await.unwrap();
2025            assert!(result.success);
2026            assert!(result.stdout.contains("safe"));
2027            assert!(result.stdout.contains(special_arg));
2028        }
2029    }
2030
2031    #[tokio::test]
2032    async fn test_environment_variable_safety() {
2033        let mut config = ExecutorConfig {
2034            capture_output: true,
2035            ..Default::default()
2036        };
2037        config
2038            .environment
2039            .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
2040        let executor = TaskExecutor::new(config);
2041        let task = Task {
2042            command: "printenv".to_string(),
2043            args: vec!["DANGEROUS_VAR".to_string()],
2044            description: Some("Environment variable safety test".to_string()),
2045            ..Default::default()
2046        };
2047        let result = executor.execute_task("env_test", &task).await.unwrap();
2048        assert!(result.success);
2049        assert!(result.stdout.contains("; rm -rf /"));
2050    }
2051
2052    #[tokio::test]
2053    async fn test_execute_graph_parallel_groups() {
2054        // two independent tasks -> can run in same parallel group
2055        let config = ExecutorConfig {
2056            capture_output: true,
2057            max_parallel: 2,
2058            ..Default::default()
2059        };
2060        let executor = TaskExecutor::new(config);
2061        let mut graph = TaskGraph::new();
2062
2063        let t1 = Task {
2064            command: "echo".into(),
2065            args: vec!["A".into()],
2066            ..Default::default()
2067        };
2068        let t2 = Task {
2069            command: "echo".into(),
2070            args: vec!["B".into()],
2071            ..Default::default()
2072        };
2073
2074        graph.add_task("t1", t1).unwrap();
2075        graph.add_task("t2", t2).unwrap();
2076        let results = executor.execute_graph(&graph).await.unwrap();
2077        assert_eq!(results.len(), 2);
2078        let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
2079        assert!(joined.contains("A") && joined.contains("B"));
2080    }
2081
2082    #[tokio::test]
2083    async fn test_execute_graph_respects_dependency_levels() {
2084        let tmp = TempDir::new().unwrap();
2085        let root = tmp.path();
2086
2087        let config = ExecutorConfig {
2088            capture_output: true,
2089            max_parallel: 2,
2090            project_root: root.to_path_buf(),
2091            ..Default::default()
2092        };
2093        let executor = TaskExecutor::new(config);
2094
2095        let mut tasks = Tasks::new();
2096        tasks.tasks.insert(
2097            "dep".into(),
2098            TaskDefinition::Single(Box::new(Task {
2099                command: "sh".into(),
2100                args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
2101                ..Default::default()
2102            })),
2103        );
2104        tasks.tasks.insert(
2105            "consumer".into(),
2106            TaskDefinition::Single(Box::new(Task {
2107                command: "sh".into(),
2108                args: vec!["-c".into(), "cat marker.txt".into()],
2109                depends_on: vec!["dep".into()],
2110                ..Default::default()
2111            })),
2112        );
2113
2114        let mut graph = TaskGraph::new();
2115        graph.build_for_task("consumer", &tasks).unwrap();
2116
2117        let results = executor.execute_graph(&graph).await.unwrap();
2118        assert_eq!(results.len(), 2);
2119
2120        let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
2121        assert!(consumer.success);
2122        assert!(consumer.stdout.contains("ok"));
2123    }
2124}