Skip to main content

cuenv_core/tasks/
executor.rs

1//! Task executor for running tasks with environment support.
2//!
3//! - Environment variable propagation
4//! - Parallel and sequential execution
5//! - Host execution; isolation/caching is delegated to other backends
6
7use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
8use super::cache::{BuildActionInput, RecordInput, TaskCacheConfig};
9use super::process_registry::global_registry;
10use super::{Task, TaskGraph, TaskGroup, TaskNode, Tasks};
11use crate::OutputCapture;
12use crate::config::BackendConfig;
13use crate::environment::Environment;
14use crate::{Error, Result};
15use async_recursion::async_recursion;
16use cuenv_workspaces::PackageManager;
17use std::path::{Path, PathBuf};
18use std::process::Stdio;
19use std::sync::Arc;
20use tokio::process::Command;
21use tokio::task::JoinSet;
22use tracing::instrument;
23
24// Unix process group setup requires CommandExt trait for pre_exec method.
25// The unused_imports warning is a false positive - the trait is used via cmd.pre_exec().
26#[cfg(unix)]
27#[allow(unused_imports)]
28use std::os::unix::process::CommandExt;
29
30/// Set up process group on Unix so we can kill the entire process tree on Ctrl-C.
31///
32/// This creates a new process group with the spawned process as the leader,
33/// allowing us to send signals to all descendants when terminating.
34#[cfg(unix)]
35fn setup_process_group(cmd: &mut Command) {
36    // SAFETY: setpgid(0, 0) creates a new process group with this process as leader.
37    // This is safe to call in the pre-spawn hook as it only affects the child process.
38    // It allows us to send signals to the entire process group when terminating.
39    #[expect(unsafe_code, reason = "Required for POSIX process group management")]
40    unsafe {
41        cmd.pre_exec(|| {
42            libc::setpgid(0, 0);
43            Ok(())
44        });
45    }
46}
47
48/// Task execution result
49#[derive(Debug, Clone)]
50pub struct TaskResult {
51    pub name: String,
52    pub exit_code: Option<i32>,
53    pub stdout: String,
54    pub stderr: String,
55    pub success: bool,
56}
57
58/// Number of lines from stdout/stderr to include when summarizing failures
59pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
60
61/// Task executor configuration
62#[derive(Debug, Clone)]
63pub struct ExecutorConfig {
64    /// Whether to capture output (vs streaming to stdout/stderr)
65    pub capture_output: OutputCapture,
66    /// Maximum parallel tasks (0 = unlimited)
67    pub max_parallel: usize,
68    /// Environment variables to propagate (resolved via policies)
69    pub environment: Environment,
70    /// Optional working directory override (reserved for future backends)
71    pub working_dir: Option<PathBuf>,
72    /// Project root for resolving inputs/outputs (env.cue root)
73    pub project_root: PathBuf,
74    /// Path to cue.mod root for resolving relative source paths
75    pub cue_module_root: Option<PathBuf>,
76    /// Optional: materialize cached outputs on cache hit
77    pub materialize_outputs: Option<PathBuf>,
78    /// Optional: cache directory override
79    pub cache_dir: Option<PathBuf>,
80    /// Optional: print cache path on hits/misses
81    pub show_cache_path: bool,
82    /// Backend configuration
83    pub backend_config: Option<BackendConfig>,
84    /// CLI backend selection override
85    pub cli_backend: Option<String>,
86    /// Optional task-result caching infrastructure (CAS + action cache + VCS hasher).
87    /// When `None`, the executor behaves exactly as it did before content-addressed
88    /// caching was wired in: tasks always run, nothing is persisted, nothing is read.
89    pub cache: Option<TaskCacheConfig>,
90}
91
92impl Default for ExecutorConfig {
93    fn default() -> Self {
94        Self {
95            capture_output: OutputCapture::Capture,
96            max_parallel: 0,
97            environment: Environment::new(),
98            working_dir: None,
99            project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
100            cue_module_root: None,
101            materialize_outputs: None,
102            cache_dir: None,
103            show_cache_path: false,
104            backend_config: None,
105            cli_backend: None,
106            cache: None,
107        }
108    }
109}
110
111/// Task executor
112pub struct TaskExecutor {
113    config: ExecutorConfig,
114    backend: Arc<dyn TaskBackend>,
115}
116impl TaskExecutor {
117    /// Create a new executor with host backend only
118    pub fn new(config: ExecutorConfig) -> Self {
119        Self::with_dagger_factory(config, None)
120    }
121
122    /// Create a new executor with optional dagger backend support.
123    ///
124    /// Pass `Some(cuenv_dagger::create_dagger_backend)` to enable dagger backend.
125    pub fn with_dagger_factory(
126        config: ExecutorConfig,
127        dagger_factory: Option<BackendFactory>,
128    ) -> Self {
129        let backend = create_backend_with_factory(
130            config.backend_config.as_ref(),
131            config.project_root.clone(),
132            config.cli_backend.as_deref(),
133            dagger_factory,
134        );
135        Self { config, backend }
136    }
137
138    /// Create a new executor with the given config but sharing the backend
139    fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
140        Self { config, backend }
141    }
142
143    /// Execute a single task, consulting the action cache when configured.
144    ///
145    /// Flow:
146    /// 1. If a [`TaskCacheConfig`] is set and the task declares `inputs`,
147    ///    build the [`cuenv_cas::Action`] envelope and look it up in the
148    ///    action cache. On a hit, materialize cached outputs into the
149    ///    workdir and return without spawning anything.
150    /// 2. Otherwise dispatch to the configured backend (host or dagger).
151    /// 3. On a successful miss, persist outputs + result to the cache so
152    ///    the next invocation hits.
153    #[instrument(name = "execute_task", skip(self, task), fields(task_name = %name))]
154    pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
155        // Cache plumbing — None means no caching, behave as before.
156        // We compute the action digest up front so we can reuse it on the
157        // record path without re-walking the inputs.
158        let cache_handle: Option<(TaskCacheConfig, cuenv_cas::Digest, PathBuf)> =
159            if let Some(cache) = self.config.cache.clone() {
160                let workdir = self.workdir_for_task(task);
161                match super::cache::build_action(BuildActionInput {
162                    task,
163                    task_name: name,
164                    environment: &self.config.environment,
165                    cache: &cache,
166                    workdir: &workdir,
167                    project_root: self.project_root_for_task(task),
168                    module_root: self
169                        .config
170                        .cue_module_root
171                        .as_deref()
172                        .unwrap_or(&self.config.project_root),
173                })
174                .await?
175                {
176                    Some((_, action_digest)) => {
177                        // Cache lookup. On a hit, short-circuit execution.
178                        if let Some(cached) = super::cache::lookup(&cache, &action_digest, task)? {
179                            tracing::debug!(task = %name, "action cache hit");
180                            return self.return_cache_hit(CacheHitInput {
181                                name,
182                                task,
183                                cache: &cache,
184                                workdir: &workdir,
185                                cached: &cached,
186                            });
187                        }
188                        tracing::debug!(task = %name, "action cache miss");
189                        Some((cache, action_digest, workdir))
190                    }
191                    None => None,
192                }
193            } else {
194                None
195            };
196
197        // Real execution. Both backends produce the same `TaskResult`.
198        let start = std::time::Instant::now();
199        let result = if self.backend.name() == "dagger" {
200            let ctx = super::backend::TaskExecutionContext {
201                name,
202                task,
203                environment: &self.config.environment,
204                project_root: &self.config.project_root,
205                capture_output: self.config.capture_output,
206            };
207            self.backend.execute(&ctx).await?
208        } else {
209            self.execute_task_non_hermetic(name, task).await?
210        };
211        let duration_ms = start.elapsed().as_millis();
212
213        // Persist on successful miss. Cache writes are best-effort: a write
214        // failure logs but does not fail the user's task.
215        if let Some((cache, action_digest, workdir)) = cache_handle
216            && super::cache::effective_policy(task).mode.allows_write()
217            && result.exit_code == Some(0)
218            && let Err(e) = super::cache::record(RecordInput {
219                cache: &cache,
220                action_digest: &action_digest,
221                workdir: &workdir,
222                task,
223                stdout: &result.stdout,
224                stderr: &result.stderr,
225                exit_code: 0,
226                duration_ms,
227            })
228        {
229            tracing::warn!(task = %name, error = %e, "cache write failed");
230        }
231
232        Ok(result)
233    }
234
235    /// Reproduce a [`TaskResult`] from a cache hit and emit the same
236    /// lifecycle events the executor would emit on a normal run, so
237    /// downstream renderers (CLI / TUI / JSON) see no behavioral
238    /// difference between a cached and an uncached task.
239    fn return_cache_hit(&self, input: CacheHitInput<'_>) -> Result<TaskResult> {
240        let CacheHitInput {
241            name,
242            task,
243            cache,
244            workdir,
245            cached,
246        } = input;
247
248        let (stdout, stderr, exit_code) = super::cache::materialize_hit(cache, workdir, cached)?;
249        let success = exit_code == 0;
250
251        let cmd_str = if let Some(script) = &task.script {
252            format!("[script: {} bytes] (cached)", script.len())
253        } else if task.command.is_empty() {
254            format!("{} (cached)", task.args.join(" "))
255        } else {
256            format!("{} {} (cached)", task.command, task.args.join(" "))
257        };
258        cuenv_events::emit_task_started!(name, cmd_str, false);
259        emit_cached_output_events(name, "stdout", &stdout);
260        emit_cached_output_events(name, "stderr", &stderr);
261        cuenv_events::emit_task_completed!(
262            name,
263            success,
264            exit_code,
265            cached.execution_metadata.duration_ms
266        );
267
268        Ok(TaskResult {
269            name: name.to_string(),
270            exit_code: Some(exit_code),
271            stdout,
272            stderr,
273            success,
274        })
275    }
276
277    /// Compute the working directory the executor would use for a task.
278    ///
279    /// Extracted from `execute_task_non_hermetic` so that the cache wrapper
280    /// in `execute_task` can resolve outputs against the same path the
281    /// task itself ran in.
282    fn workdir_for_task(&self, task: &Task) -> PathBuf {
283        if let Some(ref dir) = task.directory {
284            self.config
285                .cue_module_root
286                .as_ref()
287                .unwrap_or(&self.config.project_root)
288                .join(dir)
289        } else if let Some(ref project_root) = task.project_root {
290            project_root.clone()
291        } else if let Some(ref source) = task.source {
292            if let Some(dir) = source.directory() {
293                self.config
294                    .cue_module_root
295                    .as_ref()
296                    .unwrap_or(&self.config.project_root)
297                    .join(dir)
298            } else if let Some(ref project_root) = task.project_root {
299                project_root.clone()
300            } else {
301                self.config
302                    .cue_module_root
303                    .clone()
304                    .unwrap_or_else(|| self.config.project_root.clone())
305            }
306        } else if !task.hermetic {
307            if let Some(manager) = cuenv_workspaces::detect_from_command(&task.command) {
308                find_workspace_root(manager, &self.config.project_root)
309            } else {
310                self.config.project_root.clone()
311            }
312        } else {
313            self.config.project_root.clone()
314        }
315    }
316
317    fn project_root_for_task<'a>(&'a self, task: &'a Task) -> &'a Path {
318        task.project_root
319            .as_deref()
320            .unwrap_or(&self.config.project_root)
321    }
322
323    /// Execute a task non-hermetically (directly in workspace/project root)
324    ///
325    /// Used for tasks like `bun install` that need to write to the real filesystem.
326    async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
327        // Check if this is an unresolved TaskRef (should have been resolved before execution)
328        if task.is_task_ref() && task.project_root.is_none() {
329            return Err(Error::configuration(format!(
330                "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
331                 This usually means:\n\
332                 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
333                 - The referenced task '{}' doesn't exist in that project\n\
334                 - There was an error loading the referenced project's env.cue\n\
335                 Run with RUST_LOG=debug for more details.",
336                name,
337                task.task_ref.as_deref().unwrap_or("unknown"),
338                task.task_ref
339                    .as_deref()
340                    .and_then(|r| r.split(':').next_back())
341                    .unwrap_or("unknown")
342            )));
343        }
344
345        // Determine working directory (in priority order: see `workdir_for_task`).
346        let workdir = self.workdir_for_task(task);
347
348        tracing::info!(
349            task = %name,
350            workdir = %workdir.display(),
351            hermetic = false,
352            "Executing non-hermetic task"
353        );
354
355        // Emit command being run - always emit task_started for all modes
356        // (TUI needs events even when capture_output is true)
357        let cmd_str = if let Some(script) = &task.script {
358            format!("[script: {} bytes]", script.len())
359        } else if task.command.is_empty() {
360            task.args.join(" ")
361        } else {
362            format!("{} {}", task.command, task.args.join(" "))
363        };
364
365        cuenv_events::emit_task_started!(name, cmd_str, false);
366
367        // Build command - handle script mode vs command mode
368        let command_spec =
369            task.command_spec(|command| self.config.environment.resolve_command(command))?;
370        let mut cmd = Command::new(&command_spec.program);
371        cmd.args(&command_spec.args);
372
373        // Set working directory and environment (hermetic - no host PATH pollution)
374        cmd.current_dir(&workdir);
375        let env_vars = self.config.environment.merge_with_system_hermetic();
376        for (k, v) in &env_vars {
377            cmd.env(k, v);
378        }
379
380        // Apply task-level env vars (plain values and passthrough from host)
381        for (key, value) in &task.env {
382            if let Some(s) = value.as_str() {
383                if let Some(host_var) = super::output_refs::parse_passthrough(s) {
384                    if let Ok(host_val) = std::env::var(host_var) {
385                        cmd.env(key, host_val);
386                    }
387                } else if !s.starts_with("cuenv:ref:") {
388                    // Plain string value — apply directly
389                    cmd.env(key, s);
390                }
391                // Output refs are resolved separately by OutputRefResolver
392            } else if let Some(n) = value.as_i64() {
393                cmd.env(key, n.to_string());
394            } else if let Some(b) = value.as_bool() {
395                cmd.env(key, b.to_string());
396            }
397            // Skip objects (secret refs etc.) — handled by project-level env resolution
398        }
399
400        // Force color output even when stdout is piped (for capture mode)
401        // These are widely supported: FORCE_COLOR by Node.js/chalk, CLICOLOR_FORCE by BSD/macOS
402        if !env_vars.contains_key("FORCE_COLOR") {
403            cmd.env("FORCE_COLOR", "1");
404        }
405        if !env_vars.contains_key("CLICOLOR_FORCE") {
406            cmd.env("CLICOLOR_FORCE", "1");
407        }
408
409        // Execute - always capture output for consistent behavior
410        // If not in capture mode, stream output to terminal in real-time
411        if self.config.capture_output.should_capture() {
412            use tokio::io::{AsyncBufReadExt, BufReader};
413
414            let start_time = std::time::Instant::now();
415
416            // Set up process group on Unix so we can kill the entire tree on Ctrl-C
417            #[cfg(unix)]
418            setup_process_group(&mut cmd);
419
420            // Spawn with piped stdout/stderr for streaming
421            let mut child = cmd
422                .stdout(Stdio::piped())
423                .stderr(Stdio::piped())
424                .spawn()
425                .map_err(|e| Error::Io {
426                    source: e,
427                    path: None,
428                    operation: format!("spawn task {}", name),
429                })?;
430
431            // Register process with global registry for cleanup on Ctrl-C
432            let child_pid = child.id();
433            if let Some(pid) = child_pid {
434                global_registry().register(pid, name.to_string()).await;
435            }
436
437            // Take ownership of stdout/stderr handles
438            let stdout_handle = child.stdout.take();
439            let stderr_handle = child.stderr.take();
440
441            // Collect output while streaming events in real-time
442            let mut stdout_lines = Vec::new();
443            let mut stderr_lines = Vec::new();
444
445            // Stream stdout
446            let name_for_stdout = name.to_string();
447            let stdout_task = tokio::spawn(async move {
448                let mut lines = Vec::new();
449                if let Some(stdout) = stdout_handle {
450                    let mut reader = BufReader::new(stdout).lines();
451                    while let Ok(Some(line)) = reader.next_line().await {
452                        cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
453                        lines.push(line);
454                    }
455                }
456                lines
457            });
458
459            // Stream stderr
460            let name_for_stderr = name.to_string();
461            let stderr_task = tokio::spawn(async move {
462                let mut lines = Vec::new();
463                if let Some(stderr) = stderr_handle {
464                    let mut reader = BufReader::new(stderr).lines();
465                    while let Ok(Some(line)) = reader.next_line().await {
466                        cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
467                        lines.push(line);
468                    }
469                }
470                lines
471            });
472
473            // Wait for process to complete and collect output
474            let status = child.wait().await.map_err(|e| Error::Io {
475                source: e,
476                path: None,
477                operation: format!("wait for task {}", name),
478            })?;
479
480            // Unregister process from global registry now that it has completed
481            if let Some(pid) = child_pid {
482                global_registry().unregister(pid).await;
483            }
484
485            // Collect streamed output
486            if let Ok(lines) = stdout_task.await {
487                stdout_lines = lines;
488            }
489            if let Ok(lines) = stderr_task.await {
490                stderr_lines = lines;
491            }
492
493            let duration_ms = start_time.elapsed().as_millis() as u64;
494            let stdout = stdout_lines.join("\n");
495            let stderr = stderr_lines.join("\n");
496            let exit_code = status.code().unwrap_or(-1);
497            let success = status.success();
498
499            // Emit task completion event
500            cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
501
502            if !success {
503                tracing::warn!(task = %name, exit = exit_code, "Task failed");
504                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
505                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
506            }
507
508            Ok(TaskResult {
509                name: name.to_string(),
510                exit_code: Some(exit_code),
511                stdout,
512                stderr,
513                success,
514            })
515        } else {
516            // Stream output directly to terminal (interactive mode)
517
518            // Set up process group on Unix so we can kill the entire tree on Ctrl-C
519            #[cfg(unix)]
520            setup_process_group(&mut cmd);
521
522            // Use spawn + wait instead of status() to get access to the PID
523            let mut child = cmd
524                .stdout(Stdio::inherit())
525                .stderr(Stdio::inherit())
526                .stdin(Stdio::inherit())
527                .spawn()
528                .map_err(|e| Error::Io {
529                    source: e,
530                    path: None,
531                    operation: format!("spawn task {}", name),
532                })?;
533
534            // Register process with global registry for cleanup on Ctrl-C
535            let child_pid = child.id();
536            if let Some(pid) = child_pid {
537                global_registry().register(pid, name.to_string()).await;
538            }
539
540            let status = child.wait().await.map_err(|e| Error::Io {
541                source: e,
542                path: None,
543                operation: format!("wait for task {}", name),
544            })?;
545
546            // Unregister process from global registry
547            if let Some(pid) = child_pid {
548                global_registry().unregister(pid).await;
549            }
550
551            let exit_code = status.code().unwrap_or(-1);
552            let success = status.success();
553
554            if !success {
555                tracing::warn!(task = %name, exit = exit_code, "Task failed");
556            }
557
558            Ok(TaskResult {
559                name: name.to_string(),
560                exit_code: Some(exit_code),
561                stdout: String::new(), // Output went to terminal
562                stderr: String::new(),
563                success,
564            })
565        }
566    }
567
568    /// Execute a task node (single task, group, or list)
569    #[async_recursion]
570    pub async fn execute_node(
571        &self,
572        name: &str,
573        node: &TaskNode,
574        all_tasks: &Tasks,
575    ) -> Result<Vec<TaskResult>> {
576        match node {
577            TaskNode::Task(task) => {
578                let result = self.execute_task(name, task.as_ref()).await?;
579                Ok(vec![result])
580            }
581            TaskNode::Group(group) => self.execute_parallel(name, group, all_tasks).await,
582            TaskNode::Sequence(seq) => self.execute_sequential(name, seq, all_tasks).await,
583        }
584    }
585
586    /// Execute a task definition (legacy alias for execute_node)
587    #[async_recursion]
588    pub async fn execute_definition(
589        &self,
590        name: &str,
591        node: &TaskNode,
592        all_tasks: &Tasks,
593    ) -> Result<Vec<TaskResult>> {
594        self.execute_node(name, node, all_tasks).await
595    }
596
597    async fn execute_sequential(
598        &self,
599        prefix: &str,
600        sequence: &[TaskNode],
601        all_tasks: &Tasks,
602    ) -> Result<Vec<TaskResult>> {
603        if !self.config.capture_output.should_capture() {
604            cuenv_events::emit_task_group_started!(prefix, true, sequence.len());
605        }
606        let mut results = Vec::new();
607        // Track completed step results for output ref resolution within sequences.
608        let mut seq_results: std::collections::HashMap<String, TaskResult> =
609            std::collections::HashMap::new();
610
611        for (i, step) in sequence.iter().enumerate() {
612            let task_name = format!("{}[{}]", prefix, i);
613
614            // For Task steps, resolve any output ref placeholders from prior steps.
615            // Only clone when placeholders are actually present to avoid
616            // unnecessary allocations in the common (no-refs) case.
617            let step = if let TaskNode::Task(task) = step
618                && super::output_refs::has_output_refs(&task.args, &task.env)
619            {
620                let mut resolved_task = (**task).clone();
621                let resolver = super::output_refs::OutputRefResolver {
622                    task_name: &task_name,
623                    results: &seq_results,
624                };
625                resolver.resolve(&mut resolved_task.args, &mut resolved_task.env)?;
626                TaskNode::Task(Box::new(resolved_task))
627            } else {
628                step.clone()
629            };
630
631            let task_results = self.execute_node(&task_name, &step, all_tasks).await?;
632            for result in &task_results {
633                // Sequences always stop on first error (no configuration option)
634                if !result.success {
635                    return Err(Error::task_failed(
636                        &result.name,
637                        result.exit_code.unwrap_or(-1),
638                        &result.stdout,
639                        &result.stderr,
640                    ));
641                }
642                seq_results.insert(result.name.clone(), result.clone());
643            }
644            results.extend(task_results);
645        }
646        Ok(results)
647    }
648
649    async fn execute_parallel(
650        &self,
651        prefix: &str,
652        group: &TaskGroup,
653        all_tasks: &Tasks,
654    ) -> Result<Vec<TaskResult>> {
655        // Check for "default" task to override parallel execution
656        if let Some(default_task) = group.children.get("default") {
657            if !self.config.capture_output.should_capture() {
658                cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
659            }
660            // Execute only the default task, using the group prefix directly
661            // since "default" is implicit when invoking the group name
662            let task_name = format!("{}.default", prefix);
663            return self.execute_node(&task_name, default_task, all_tasks).await;
664        }
665
666        if !self.config.capture_output.should_capture() {
667            cuenv_events::emit_task_group_started!(prefix, false, group.children.len());
668        }
669        let mut join_set = JoinSet::new();
670        let all_tasks = Arc::new(all_tasks.clone());
671        let mut all_results = Vec::new();
672        let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
673            if let Some(failed) = results.iter().find(|r| !r.success) {
674                return Err(Error::task_failed(
675                    &failed.name,
676                    failed.exit_code.unwrap_or(-1),
677                    &failed.stdout,
678                    &failed.stderr,
679                ));
680            }
681            all_results.extend(results);
682            Ok(())
683        };
684        for (name, child_node) in &group.children {
685            let task_name = format!("{}.{}", prefix, name);
686            let child_node = child_node.clone();
687            let all_tasks = Arc::clone(&all_tasks);
688            let executor = self.clone_with_config();
689            join_set.spawn(async move {
690                executor
691                    .execute_node(&task_name, &child_node, &all_tasks)
692                    .await
693            });
694            if self.config.max_parallel > 0
695                && join_set.len() >= self.config.max_parallel
696                && let Some(result) = join_set.join_next().await
697            {
698                match result {
699                    Ok(Ok(results)) => merge_results(results)?,
700                    Ok(Err(e)) => return Err(e),
701                    Err(e) => {
702                        return Err(Error::execution(format!("Task execution panicked: {}", e)));
703                    }
704                }
705            }
706        }
707        while let Some(result) = join_set.join_next().await {
708            match result {
709                Ok(Ok(results)) => merge_results(results)?,
710                Ok(Err(e)) => return Err(e),
711                Err(e) => {
712                    return Err(Error::execution(format!("Task execution panicked: {}", e)));
713                }
714            }
715        }
716        Ok(all_results)
717    }
718
719    #[instrument(name = "execute_graph", skip(self, graph), fields(task_count = graph.task_count()))]
720    pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
721        let parallel_groups = graph.get_parallel_groups()?;
722        let mut all_results = Vec::new();
723        // Map of completed task results for resolving output references.
724        // Populated between sequential parallel groups — no concurrent access.
725        let mut results_map: std::collections::HashMap<String, TaskResult> =
726            std::collections::HashMap::new();
727
728        // IMPORTANT:
729        // Each parallel group represents a dependency "level". We must not start tasks from the
730        // next group until *all* tasks from the current group have completed successfully.
731        //
732        // The previous implementation pipelined groups (starting the next group as soon as all
733        // tasks from the current group were spawned), which allowed dependent tasks to run before
734        // their dependencies finished (especially visible with long-running tasks like dev servers).
735        for mut group in parallel_groups {
736            let mut join_set = JoinSet::new();
737
738            while !group.is_empty() || !join_set.is_empty() {
739                // Fill the concurrency window for this group
740                while let Some(node) = group.pop() {
741                    let mut task = node.task.clone();
742                    let name = node.name.clone();
743
744                    // Resolve any task output reference placeholders in args/env
745                    // before spawning. The results_map contains all completed
746                    // upstream tasks (from prior parallel groups).
747                    let resolver = super::output_refs::OutputRefResolver {
748                        task_name: &name,
749                        results: &results_map,
750                    };
751                    resolver.resolve(&mut task.args, &mut task.env)?;
752
753                    let executor = self.clone_with_config();
754                    join_set.spawn(async move { executor.execute_task(&name, &task).await });
755
756                    if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
757                        break;
758                    }
759                }
760
761                if let Some(result) = join_set.join_next().await {
762                    match result {
763                        Ok(Ok(task_result)) => {
764                            if !task_result.success {
765                                join_set.abort_all();
766                                return Err(Error::task_failed(
767                                    &task_result.name,
768                                    task_result.exit_code.unwrap_or(-1),
769                                    &task_result.stdout,
770                                    &task_result.stderr,
771                                ));
772                            }
773                            results_map.insert(task_result.name.clone(), task_result.clone());
774                            all_results.push(task_result);
775                        }
776                        Ok(Err(e)) => {
777                            join_set.abort_all();
778                            return Err(e);
779                        }
780                        Err(e) => {
781                            join_set.abort_all();
782                            return Err(Error::execution(format!(
783                                "Task execution panicked: {}",
784                                e
785                            )));
786                        }
787                    }
788                }
789            }
790        }
791
792        Ok(all_results)
793    }
794
795    fn clone_with_config(&self) -> Self {
796        // Share the backend across clones to preserve container cache for Dagger chaining
797        Self::with_shared_backend(self.config.clone(), self.backend.clone())
798    }
799}
800
801struct CacheHitInput<'a> {
802    name: &'a str,
803    task: &'a Task,
804    cache: &'a TaskCacheConfig,
805    workdir: &'a Path,
806    cached: &'a cuenv_cas::ActionResult,
807}
808
809fn emit_cached_output_events(name: &str, stream: &'static str, content: &str) {
810    for line in content.lines() {
811        cuenv_events::emit_task_output!(name, stream, line);
812    }
813}
814
815fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
816    let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
817
818    loop {
819        let is_root = match manager {
820            PackageManager::Npm
821            | PackageManager::Bun
822            | PackageManager::YarnClassic
823            | PackageManager::YarnModern => package_json_has_workspaces(&current),
824            PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
825            PackageManager::Cargo => cargo_toml_has_workspace(&current),
826            PackageManager::Deno => deno_json_has_workspace(&current),
827        };
828
829        if is_root {
830            return current;
831        }
832
833        if let Some(parent) = current.parent() {
834            current = parent.to_path_buf();
835        } else {
836            return start.to_path_buf();
837        }
838    }
839}
840
841fn package_json_has_workspaces(dir: &Path) -> bool {
842    let path = dir.join("package.json");
843    let content = std::fs::read_to_string(&path);
844    let Ok(json) = content.and_then(|s| {
845        serde_json::from_str::<serde_json::Value>(&s)
846            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
847    }) else {
848        return false;
849    };
850
851    match json.get("workspaces") {
852        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
853        Some(serde_json::Value::Object(map)) => map
854            .get("packages")
855            .and_then(|packages| packages.as_array())
856            .map(|arr| !arr.is_empty())
857            .unwrap_or(false),
858        _ => false,
859    }
860}
861
862fn cargo_toml_has_workspace(dir: &Path) -> bool {
863    let path = dir.join("Cargo.toml");
864    let Ok(content) = std::fs::read_to_string(&path) else {
865        return false;
866    };
867
868    content.contains("[workspace]")
869}
870
871fn deno_json_has_workspace(dir: &Path) -> bool {
872    let path = dir.join("deno.json");
873    let content = std::fs::read_to_string(&path);
874    let Ok(json) = content.and_then(|s| {
875        serde_json::from_str::<serde_json::Value>(&s)
876            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
877    }) else {
878        return false;
879    };
880
881    // Deno uses "workspace" (not "workspaces") for workspace configuration
882    match json.get("workspace") {
883        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
884        Some(serde_json::Value::Object(_)) => true,
885        _ => false,
886    }
887}
888
889/// Build a compact, user-friendly summary for a failed task, including the
890/// exit code and the tail of stdout/stderr to help with diagnostics.
891pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
892    let exit_code = result
893        .exit_code
894        .map(|c| c.to_string())
895        .unwrap_or_else(|| "unknown".to_string());
896
897    let mut sections = Vec::new();
898    sections.push(format!(
899        "Task '{}' failed with exit code {}.",
900        result.name, exit_code
901    ));
902
903    let output = format_failure_streams(result, max_output_lines);
904    if output.is_empty() {
905        sections.push(
906            "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
907                .to_string(),
908        );
909    } else {
910        sections.push(output);
911    }
912
913    sections.join("\n\n")
914}
915
916fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
917    let mut streams = Vec::new();
918
919    if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
920        streams.push(stdout);
921    }
922
923    if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
924        streams.push(stderr);
925    }
926
927    streams.join("\n\n")
928}
929
930fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
931    let normalized = content.trim_end();
932    if normalized.is_empty() {
933        return None;
934    }
935
936    let lines: Vec<&str> = normalized.lines().collect();
937    let total = lines.len();
938    let start = total.saturating_sub(max_output_lines);
939    let snippet = lines[start..].join("\n");
940
941    let header = if total > max_output_lines {
942        format!("{label} (last {max_output_lines} of {total} lines):")
943    } else {
944        format!("{label}:")
945    };
946
947    Some(format!("{header}\n{snippet}"))
948}
949
950/// Execute an arbitrary command with the cuenv environment
951///
952/// If `secrets` is provided, output will be captured and redacted before printing.
953pub async fn execute_command(
954    command: &str,
955    args: &[String],
956    environment: &Environment,
957) -> Result<i32> {
958    execute_command_with_redaction(command, args, environment, &[]).await
959}
960
961/// Execute a command with secret redaction
962///
963/// Secret values in stdout/stderr are replaced with [REDACTED].
964pub async fn execute_command_with_redaction(
965    command: &str,
966    args: &[String],
967    environment: &Environment,
968    secrets: &[String],
969) -> Result<i32> {
970    use tokio::io::{AsyncBufReadExt, BufReader};
971
972    tracing::info!("Executing command: {} {:?}", command, args);
973    let mut cmd = Command::new(command);
974    cmd.args(args);
975    // Use hermetic environment - no host PATH pollution
976    let env_vars = environment.merge_with_system_hermetic();
977    for (key, value) in env_vars {
978        cmd.env(key, value);
979    }
980
981    if secrets.is_empty() {
982        // No secrets to redact - inherit stdio directly
983        cmd.stdout(Stdio::inherit());
984        cmd.stderr(Stdio::inherit());
985        cmd.stdin(Stdio::inherit());
986        let status = cmd.status().await.map_err(|e| {
987            Error::configuration(format!("Failed to execute command '{}': {}", command, e))
988        })?;
989        return Ok(status.code().unwrap_or(1));
990    }
991
992    // Capture output for redaction
993    cmd.stdout(Stdio::piped());
994    cmd.stderr(Stdio::piped());
995    cmd.stdin(Stdio::inherit());
996
997    let mut child = cmd.spawn().map_err(|e| {
998        Error::configuration(format!("Failed to execute command '{}': {}", command, e))
999    })?;
1000
1001    let stdout = child
1002        .stdout
1003        .take()
1004        .ok_or_else(|| Error::execution("stdout pipe not available"))?;
1005    let stderr = child
1006        .stderr
1007        .take()
1008        .ok_or_else(|| Error::execution("stderr pipe not available"))?;
1009
1010    // Build sorted secrets for greedy matching (longer first)
1011    let mut sorted_secrets: Vec<&str> = secrets.iter().map(String::as_str).collect();
1012    sorted_secrets.sort_by_key(|s| std::cmp::Reverse(s.len()));
1013    let sorted_secrets: Vec<String> = sorted_secrets.into_iter().map(String::from).collect();
1014
1015    // Stream stdout with redaction
1016    let secrets_clone = sorted_secrets.clone();
1017    let stdout_task = tokio::spawn(async move {
1018        let reader = BufReader::new(stdout);
1019        let mut lines = reader.lines();
1020        while let Ok(Some(line)) = lines.next_line().await {
1021            let mut redacted = line;
1022            for secret in &secrets_clone {
1023                if secret.len() >= 4 {
1024                    redacted = redacted.replace(secret, "[REDACTED]");
1025                }
1026            }
1027            cuenv_events::emit_stdout!(&redacted);
1028        }
1029    });
1030
1031    // Stream stderr with redaction
1032    let stderr_task = tokio::spawn(async move {
1033        let reader = BufReader::new(stderr);
1034        let mut lines = reader.lines();
1035        while let Ok(Some(line)) = lines.next_line().await {
1036            let mut redacted = line;
1037            for secret in &sorted_secrets {
1038                if secret.len() >= 4 {
1039                    redacted = redacted.replace(secret, "[REDACTED]");
1040                }
1041            }
1042            cuenv_events::emit_stderr!(&redacted);
1043        }
1044    });
1045
1046    // Wait for command and streams
1047    let status = child.wait().await.map_err(|e| {
1048        Error::configuration(format!("Failed to wait for command '{}': {}", command, e))
1049    })?;
1050
1051    let _ = stdout_task.await;
1052    let _ = stderr_task.await;
1053
1054    Ok(status.code().unwrap_or(1))
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059    use super::*;
1060    use crate::tasks::TaskDependency;
1061    use crate::tasks::cache::TaskCacheConfig;
1062    use cuenv_cas::{LocalActionCache, LocalCas};
1063    use cuenv_events::{CuenvEventLayer, EventCategory, TaskEvent};
1064    use cuenv_vcs::WalkHasher;
1065    use tempfile::TempDir;
1066    use tokio::sync::mpsc;
1067    use tracing_subscriber::layer::SubscriberExt;
1068
1069    #[tokio::test]
1070    async fn test_executor_config_default() {
1071        let config = ExecutorConfig::default();
1072        assert!(config.capture_output.should_capture());
1073        assert_eq!(config.max_parallel, 0);
1074        assert!(config.environment.is_empty());
1075    }
1076
1077    #[tokio::test]
1078    async fn test_task_result() {
1079        let result = TaskResult {
1080            name: "test".to_string(),
1081            exit_code: Some(0),
1082            stdout: "output".to_string(),
1083            stderr: String::new(),
1084            success: true,
1085        };
1086        assert_eq!(result.name, "test");
1087        assert_eq!(result.exit_code, Some(0));
1088        assert!(result.success);
1089        assert_eq!(result.stdout, "output");
1090    }
1091
1092    #[tokio::test]
1093    async fn test_execute_simple_task() {
1094        let config = ExecutorConfig {
1095            capture_output: OutputCapture::Capture,
1096            ..Default::default()
1097        };
1098        let executor = TaskExecutor::new(config);
1099        let task = Task {
1100            command: "echo".to_string(),
1101            args: vec!["hello".to_string()],
1102            description: Some("Hello task".to_string()),
1103            ..Default::default()
1104        };
1105        let result = executor.execute_task("test", &task).await.unwrap();
1106        assert!(result.success);
1107        assert_eq!(result.exit_code, Some(0));
1108        assert!(result.stdout.contains("hello"));
1109    }
1110
1111    #[tokio::test]
1112    async fn test_execute_with_environment() {
1113        let mut config = ExecutorConfig {
1114            capture_output: OutputCapture::Capture,
1115            ..Default::default()
1116        };
1117        config
1118            .environment
1119            .set("TEST_VAR".to_string(), "test_value".to_string());
1120        let executor = TaskExecutor::new(config);
1121        let task = Task {
1122            command: "printenv".to_string(),
1123            args: vec!["TEST_VAR".to_string()],
1124            description: Some("Print env task".to_string()),
1125            ..Default::default()
1126        };
1127        let result = executor.execute_task("test", &task).await.unwrap();
1128        assert!(result.success);
1129        assert!(result.stdout.contains("test_value"));
1130    }
1131
1132    #[tokio::test]
1133    async fn test_execute_failing_task() {
1134        let config = ExecutorConfig {
1135            capture_output: OutputCapture::Capture,
1136            ..Default::default()
1137        };
1138        let executor = TaskExecutor::new(config);
1139        let task = Task {
1140            command: "false".to_string(),
1141            description: Some("Failing task".to_string()),
1142            ..Default::default()
1143        };
1144        let result = executor.execute_task("test", &task).await.unwrap();
1145        assert!(!result.success);
1146        assert_eq!(result.exit_code, Some(1));
1147    }
1148
1149    #[tokio::test]
1150    async fn test_execute_script_task() {
1151        let config = ExecutorConfig {
1152            capture_output: OutputCapture::Capture,
1153            ..Default::default()
1154        };
1155        let executor = TaskExecutor::new(config);
1156        let task = Task {
1157            script: Some("echo hello from script".to_string()),
1158            ..Default::default()
1159        };
1160
1161        let result = executor.execute_task("script", &task).await.unwrap();
1162
1163        assert!(result.success);
1164        assert_eq!(result.exit_code, Some(0));
1165        assert!(result.stdout.contains("hello from script"));
1166    }
1167
1168    #[tokio::test]
1169    async fn test_execute_script_task_with_shell_options() {
1170        let config = ExecutorConfig {
1171            capture_output: OutputCapture::Capture,
1172            ..Default::default()
1173        };
1174        let executor = TaskExecutor::new(config);
1175        let task = Task {
1176            script: Some("false\necho should-not-run".to_string()),
1177            script_shell: Some(super::super::ScriptShell::Bash),
1178            shell_options: Some(super::super::ShellOptions::default()),
1179            ..Default::default()
1180        };
1181
1182        let result = executor
1183            .execute_task("script.failfast", &task)
1184            .await
1185            .unwrap();
1186
1187        assert!(!result.success);
1188        assert_eq!(result.exit_code, Some(1));
1189        assert!(!result.stdout.contains("should-not-run"));
1190    }
1191
1192    #[tokio::test]
1193    async fn test_execute_script_task_rejects_pipefail_for_sh() {
1194        let config = ExecutorConfig {
1195            capture_output: OutputCapture::Capture,
1196            ..Default::default()
1197        };
1198        let executor = TaskExecutor::new(config);
1199        let task = Task {
1200            script: Some("echo hello".to_string()),
1201            script_shell: Some(super::super::ScriptShell::Sh),
1202            shell_options: Some(super::super::ShellOptions::default()),
1203            ..Default::default()
1204        };
1205
1206        let err = executor.execute_task("script.sh", &task).await.unwrap_err();
1207
1208        assert!(
1209            err.to_string()
1210                .contains("shellOptions.pipefail with unsupported script shell 'sh'"),
1211            "unexpected error: {err}"
1212        );
1213    }
1214
1215    #[tokio::test]
1216    async fn test_execute_script_task_rejects_unsupported_shell_options() {
1217        let config = ExecutorConfig {
1218            capture_output: OutputCapture::Capture,
1219            ..Default::default()
1220        };
1221        let executor = TaskExecutor::new(config);
1222        let task = Task {
1223            script: Some("console.log('hello')".to_string()),
1224            script_shell: Some(super::super::ScriptShell::Node),
1225            shell_options: Some(super::super::ShellOptions::default()),
1226            ..Default::default()
1227        };
1228
1229        let err = executor
1230            .execute_task("script.node", &task)
1231            .await
1232            .unwrap_err();
1233
1234        assert!(
1235            err.to_string().contains("unsupported script shell 'node'"),
1236            "unexpected error: {err}"
1237        );
1238    }
1239
1240    #[tokio::test]
1241    async fn test_execute_sequential_group() {
1242        let config = ExecutorConfig {
1243            capture_output: OutputCapture::Capture,
1244            ..Default::default()
1245        };
1246        let executor = TaskExecutor::new(config);
1247        let task1 = Task {
1248            command: "echo".to_string(),
1249            args: vec!["first".to_string()],
1250            description: Some("First task".to_string()),
1251            ..Default::default()
1252        };
1253        let task2 = Task {
1254            command: "echo".to_string(),
1255            args: vec!["second".to_string()],
1256            description: Some("Second task".to_string()),
1257            ..Default::default()
1258        };
1259        let sequence = vec![
1260            TaskNode::Task(Box::new(task1)),
1261            TaskNode::Task(Box::new(task2)),
1262        ];
1263        let all_tasks = Tasks::new();
1264        let node = TaskNode::Sequence(sequence);
1265        let results = executor
1266            .execute_node("seq", &node, &all_tasks)
1267            .await
1268            .unwrap();
1269        assert_eq!(results.len(), 2);
1270        assert!(results[0].stdout.contains("first"));
1271        assert!(results[1].stdout.contains("second"));
1272    }
1273
1274    #[tokio::test]
1275    async fn test_command_injection_prevention() {
1276        let config = ExecutorConfig {
1277            capture_output: OutputCapture::Capture,
1278            ..Default::default()
1279        };
1280        let executor = TaskExecutor::new(config);
1281        let malicious_task = Task {
1282            command: "echo".to_string(),
1283            args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1284            description: Some("Malicious task test".to_string()),
1285            ..Default::default()
1286        };
1287        let result = executor
1288            .execute_task("malicious", &malicious_task)
1289            .await
1290            .unwrap();
1291        assert!(result.success);
1292        assert!(result.stdout.contains("hello ; rm -rf /"));
1293    }
1294
1295    #[tokio::test]
1296    async fn test_special_characters_in_args() {
1297        let config = ExecutorConfig {
1298            capture_output: OutputCapture::Capture,
1299            ..Default::default()
1300        };
1301        let executor = TaskExecutor::new(config);
1302        let special_chars = vec![
1303            "$USER",
1304            "$(whoami)",
1305            "`whoami`",
1306            "&& echo hacked",
1307            "|| echo failed",
1308            "> /tmp/hack",
1309            "| cat",
1310        ];
1311        for special_arg in special_chars {
1312            let task = Task {
1313                command: "echo".to_string(),
1314                args: vec!["safe".to_string(), special_arg.to_string()],
1315                description: Some("Special character test".to_string()),
1316                ..Default::default()
1317            };
1318            let result = executor.execute_task("special", &task).await.unwrap();
1319            assert!(result.success);
1320            assert!(result.stdout.contains("safe"));
1321            assert!(result.stdout.contains(special_arg));
1322        }
1323    }
1324
1325    #[tokio::test]
1326    async fn test_environment_variable_safety() {
1327        let mut config = ExecutorConfig {
1328            capture_output: OutputCapture::Capture,
1329            ..Default::default()
1330        };
1331        config
1332            .environment
1333            .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1334        let executor = TaskExecutor::new(config);
1335        let task = Task {
1336            command: "printenv".to_string(),
1337            args: vec!["DANGEROUS_VAR".to_string()],
1338            description: Some("Environment variable safety test".to_string()),
1339            ..Default::default()
1340        };
1341        let result = executor.execute_task("env_test", &task).await.unwrap();
1342        assert!(result.success);
1343        assert!(result.stdout.contains("; rm -rf /"));
1344    }
1345
1346    #[tokio::test]
1347    async fn test_execute_graph_parallel_groups() {
1348        // two independent tasks -> can run in same parallel group
1349        let config = ExecutorConfig {
1350            capture_output: OutputCapture::Capture,
1351            max_parallel: 2,
1352            ..Default::default()
1353        };
1354        let executor = TaskExecutor::new(config);
1355        let mut graph = TaskGraph::new();
1356
1357        let t1 = Task {
1358            command: "echo".into(),
1359            args: vec!["A".into()],
1360            ..Default::default()
1361        };
1362        let t2 = Task {
1363            command: "echo".into(),
1364            args: vec!["B".into()],
1365            ..Default::default()
1366        };
1367
1368        graph.add_task("t1", t1).unwrap();
1369        graph.add_task("t2", t2).unwrap();
1370        let results = executor.execute_graph(&graph).await.unwrap();
1371        assert_eq!(results.len(), 2);
1372        let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1373        assert!(joined.contains("A") && joined.contains("B"));
1374    }
1375
1376    #[tokio::test]
1377    async fn test_execute_graph_respects_dependency_levels() {
1378        let tmp = TempDir::new().unwrap();
1379        let root = tmp.path();
1380
1381        let config = ExecutorConfig {
1382            capture_output: OutputCapture::Capture,
1383            max_parallel: 2,
1384            project_root: root.to_path_buf(),
1385            ..Default::default()
1386        };
1387        let executor = TaskExecutor::new(config);
1388
1389        let mut tasks = Tasks::new();
1390        tasks.tasks.insert(
1391            "dep".into(),
1392            TaskNode::Task(Box::new(Task {
1393                command: "sh".into(),
1394                args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
1395                ..Default::default()
1396            })),
1397        );
1398        tasks.tasks.insert(
1399            "consumer".into(),
1400            TaskNode::Task(Box::new(Task {
1401                command: "sh".into(),
1402                args: vec!["-c".into(), "cat marker.txt".into()],
1403                depends_on: vec![TaskDependency::from_name("dep")],
1404                ..Default::default()
1405            })),
1406        );
1407
1408        let mut graph = TaskGraph::new();
1409        graph.build_for_task("consumer", &tasks).unwrap();
1410
1411        let results = executor.execute_graph(&graph).await.unwrap();
1412        assert_eq!(results.len(), 2);
1413
1414        let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
1415        assert!(consumer.success);
1416        assert!(consumer.stdout.contains("ok"));
1417    }
1418
1419    #[tokio::test]
1420    async fn test_cache_hit_replays_task_output_events() {
1421        let workspace = TempDir::new().unwrap();
1422        let cache_root = TempDir::new().unwrap();
1423        std::fs::write(workspace.path().join("input.txt"), "v1").unwrap();
1424
1425        let cache = TaskCacheConfig {
1426            cas: Arc::new(LocalCas::open(cache_root.path()).unwrap()),
1427            action_cache: Arc::new(LocalActionCache::open(cache_root.path()).unwrap()),
1428            vcs_hasher: Arc::new(WalkHasher::new(workspace.path())),
1429            vcs_hasher_root: workspace.path().to_path_buf(),
1430            cuenv_version: "test".to_string(),
1431            runtime_identity_properties: std::collections::BTreeMap::new(),
1432            cache_disabled_reason: None,
1433        };
1434        let executor = TaskExecutor::new(ExecutorConfig {
1435            capture_output: OutputCapture::Capture,
1436            project_root: workspace.path().to_path_buf(),
1437            cache: Some(cache),
1438            ..Default::default()
1439        });
1440        let task = Task {
1441            command: "sh".to_string(),
1442            args: vec![
1443                "-c".to_string(),
1444                "printf 'hello\\n' && cat input.txt > out.txt".to_string(),
1445            ],
1446            inputs: vec![super::super::Input::Path("input.txt".to_string())],
1447            outputs: vec!["out.txt".to_string()],
1448            cache: Some(super::super::TaskCachePolicy {
1449                mode: super::super::TaskCacheMode::ReadWrite,
1450                max_age: None,
1451            }),
1452            ..Task::default()
1453        };
1454
1455        executor.execute_task("cached", &task).await.unwrap();
1456        std::fs::remove_file(workspace.path().join("out.txt")).unwrap();
1457
1458        let (tx, mut rx) = mpsc::unbounded_channel();
1459        let layer = CuenvEventLayer::new(tx);
1460        let subscriber = tracing_subscriber::registry().with(layer);
1461        let _guard = tracing::subscriber::set_default(subscriber);
1462
1463        let result = executor.execute_task("cached", &task).await.unwrap();
1464        assert!(result.success);
1465
1466        let mut saw_output = false;
1467        while let Ok(event) = rx.try_recv() {
1468            if let EventCategory::Task(TaskEvent::Output { name, content, .. }) = event.category
1469                && name == "cached"
1470                && content == "hello"
1471            {
1472                saw_output = true;
1473                break;
1474            }
1475        }
1476
1477        assert!(
1478            saw_output,
1479            "expected cached task output event to be replayed"
1480        );
1481    }
1482
1483    #[test]
1484    fn test_summarize_task_failure_with_exit_code() {
1485        let result = TaskResult {
1486            name: "build".to_string(),
1487            exit_code: Some(127),
1488            stdout: String::new(),
1489            stderr: "command not found".to_string(),
1490            success: false,
1491        };
1492        let summary = summarize_task_failure(&result, 10);
1493        assert!(summary.contains("build"));
1494        assert!(summary.contains("127"));
1495        assert!(summary.contains("command not found"));
1496    }
1497
1498    #[test]
1499    fn test_summarize_task_failure_no_exit_code() {
1500        let result = TaskResult {
1501            name: "killed".to_string(),
1502            exit_code: None,
1503            stdout: String::new(),
1504            stderr: String::new(),
1505            success: false,
1506        };
1507        let summary = summarize_task_failure(&result, 10);
1508        assert!(summary.contains("killed"));
1509        assert!(summary.contains("unknown"));
1510    }
1511
1512    #[test]
1513    fn test_summarize_task_failure_no_output() {
1514        let result = TaskResult {
1515            name: "silent".to_string(),
1516            exit_code: Some(1),
1517            stdout: String::new(),
1518            stderr: String::new(),
1519            success: false,
1520        };
1521        let summary = summarize_task_failure(&result, 10);
1522        assert!(summary.contains("No stdout/stderr"));
1523        assert!(summary.contains("RUST_LOG=debug"));
1524    }
1525
1526    #[test]
1527    fn test_summarize_task_failure_truncates_long_output() {
1528        let result = TaskResult {
1529            name: "verbose".to_string(),
1530            exit_code: Some(1),
1531            stdout: (1..=50)
1532                .map(|i| format!("line {}", i))
1533                .collect::<Vec<_>>()
1534                .join("\n"),
1535            stderr: String::new(),
1536            success: false,
1537        };
1538        let summary = summarize_task_failure(&result, 10);
1539        assert!(summary.contains("last 10 of 50 lines"));
1540        assert!(summary.contains("line 50"));
1541        assert!(!summary.contains("line 1\n")); // First line should be truncated
1542    }
1543
1544    #[test]
1545    fn test_summarize_stream_empty() {
1546        assert!(summarize_stream("test", "", 10).is_none());
1547        assert!(summarize_stream("test", "   ", 10).is_none());
1548        assert!(summarize_stream("test", "\n\n", 10).is_none());
1549    }
1550
1551    #[test]
1552    fn test_summarize_stream_short() {
1553        let result = summarize_stream("stdout", "line 1\nline 2", 10).unwrap();
1554        assert!(result.contains("stdout:"));
1555        assert!(result.contains("line 1"));
1556        assert!(result.contains("line 2"));
1557        assert!(!result.contains("last"));
1558    }
1559
1560    #[test]
1561    fn test_format_failure_streams_both() {
1562        let result = TaskResult {
1563            name: "test".to_string(),
1564            exit_code: Some(1),
1565            stdout: "stdout content".to_string(),
1566            stderr: "stderr content".to_string(),
1567            success: false,
1568        };
1569        let formatted = format_failure_streams(&result, 10);
1570        assert!(formatted.contains("stdout:"));
1571        assert!(formatted.contains("stderr:"));
1572        assert!(formatted.contains("stdout content"));
1573        assert!(formatted.contains("stderr content"));
1574    }
1575
1576    #[test]
1577    fn test_find_workspace_root_with_npm() {
1578        let tmp = TempDir::new().unwrap();
1579        // Create a workspace structure
1580        std::fs::write(
1581            tmp.path().join("package.json"),
1582            r#"{"workspaces": ["packages/*"]}"#,
1583        )
1584        .unwrap();
1585        let subdir = tmp.path().join("packages").join("subpkg");
1586        std::fs::create_dir_all(&subdir).unwrap();
1587
1588        let root = find_workspace_root(PackageManager::Npm, &subdir);
1589        assert_eq!(root, tmp.path().canonicalize().unwrap());
1590    }
1591
1592    #[test]
1593    fn test_find_workspace_root_with_pnpm() {
1594        let tmp = TempDir::new().unwrap();
1595        std::fs::write(
1596            tmp.path().join("pnpm-workspace.yaml"),
1597            "packages:\n  - 'packages/*'",
1598        )
1599        .unwrap();
1600        let subdir = tmp.path().join("packages").join("app");
1601        std::fs::create_dir_all(&subdir).unwrap();
1602
1603        let root = find_workspace_root(PackageManager::Pnpm, &subdir);
1604        assert_eq!(root, tmp.path().canonicalize().unwrap());
1605    }
1606
1607    #[test]
1608    fn test_find_workspace_root_with_cargo() {
1609        let tmp = TempDir::new().unwrap();
1610        std::fs::write(
1611            tmp.path().join("Cargo.toml"),
1612            "[workspace]\nmembers = [\"crates/*\"]",
1613        )
1614        .unwrap();
1615        let subdir = tmp.path().join("crates").join("core");
1616        std::fs::create_dir_all(&subdir).unwrap();
1617
1618        let root = find_workspace_root(PackageManager::Cargo, &subdir);
1619        assert_eq!(root, tmp.path().canonicalize().unwrap());
1620    }
1621
1622    #[test]
1623    fn test_find_workspace_root_no_workspace() {
1624        let tmp = TempDir::new().unwrap();
1625        let root = find_workspace_root(PackageManager::Npm, tmp.path());
1626        // Should return the start path when no workspace is found
1627        assert_eq!(root, tmp.path().to_path_buf());
1628    }
1629
1630    #[test]
1631    fn test_package_json_has_workspaces_array() {
1632        let tmp = TempDir::new().unwrap();
1633        std::fs::write(
1634            tmp.path().join("package.json"),
1635            r#"{"workspaces": ["packages/*"]}"#,
1636        )
1637        .unwrap();
1638        assert!(package_json_has_workspaces(tmp.path()));
1639    }
1640
1641    #[test]
1642    fn test_package_json_has_workspaces_object() {
1643        let tmp = TempDir::new().unwrap();
1644        std::fs::write(
1645            tmp.path().join("package.json"),
1646            r#"{"workspaces": {"packages": ["packages/*"]}}"#,
1647        )
1648        .unwrap();
1649        assert!(package_json_has_workspaces(tmp.path()));
1650    }
1651
1652    #[test]
1653    fn test_package_json_no_workspaces() {
1654        let tmp = TempDir::new().unwrap();
1655        std::fs::write(tmp.path().join("package.json"), r#"{"name": "test"}"#).unwrap();
1656        assert!(!package_json_has_workspaces(tmp.path()));
1657    }
1658
1659    #[test]
1660    fn test_package_json_empty_workspaces() {
1661        let tmp = TempDir::new().unwrap();
1662        std::fs::write(tmp.path().join("package.json"), r#"{"workspaces": []}"#).unwrap();
1663        assert!(!package_json_has_workspaces(tmp.path()));
1664    }
1665
1666    #[test]
1667    fn test_package_json_missing() {
1668        let tmp = TempDir::new().unwrap();
1669        assert!(!package_json_has_workspaces(tmp.path()));
1670    }
1671
1672    #[test]
1673    fn test_cargo_toml_has_workspace() {
1674        let tmp = TempDir::new().unwrap();
1675        std::fs::write(
1676            tmp.path().join("Cargo.toml"),
1677            "[workspace]\nmembers = [\"crates/*\"]",
1678        )
1679        .unwrap();
1680        assert!(cargo_toml_has_workspace(tmp.path()));
1681    }
1682
1683    #[test]
1684    fn test_cargo_toml_no_workspace() {
1685        let tmp = TempDir::new().unwrap();
1686        std::fs::write(tmp.path().join("Cargo.toml"), "[package]\nname = \"test\"").unwrap();
1687        assert!(!cargo_toml_has_workspace(tmp.path()));
1688    }
1689
1690    #[test]
1691    fn test_cargo_toml_missing() {
1692        let tmp = TempDir::new().unwrap();
1693        assert!(!cargo_toml_has_workspace(tmp.path()));
1694    }
1695
1696    #[test]
1697    fn test_deno_json_has_workspace_array() {
1698        let tmp = TempDir::new().unwrap();
1699        std::fs::write(
1700            tmp.path().join("deno.json"),
1701            r#"{"workspace": ["./packages/*"]}"#,
1702        )
1703        .unwrap();
1704        assert!(deno_json_has_workspace(tmp.path()));
1705    }
1706
1707    #[test]
1708    fn test_deno_json_has_workspace_object() {
1709        let tmp = TempDir::new().unwrap();
1710        std::fs::write(
1711            tmp.path().join("deno.json"),
1712            r#"{"workspace": {"members": ["./packages/*"]}}"#,
1713        )
1714        .unwrap();
1715        assert!(deno_json_has_workspace(tmp.path()));
1716    }
1717
1718    #[test]
1719    fn test_deno_json_no_workspace() {
1720        let tmp = TempDir::new().unwrap();
1721        std::fs::write(tmp.path().join("deno.json"), r#"{"name": "test"}"#).unwrap();
1722        assert!(!deno_json_has_workspace(tmp.path()));
1723    }
1724
1725    #[test]
1726    fn test_deno_json_missing() {
1727        let tmp = TempDir::new().unwrap();
1728        assert!(!deno_json_has_workspace(tmp.path()));
1729    }
1730
1731    #[test]
1732    fn test_executor_config_with_fields() {
1733        let config = ExecutorConfig {
1734            capture_output: OutputCapture::Capture,
1735            max_parallel: 4,
1736            working_dir: Some(PathBuf::from("/tmp")),
1737            project_root: PathBuf::from("/project"),
1738            cue_module_root: Some(PathBuf::from("/project/cue.mod")),
1739            materialize_outputs: Some(PathBuf::from("/outputs")),
1740            cache_dir: Some(PathBuf::from("/cache")),
1741            show_cache_path: true,
1742            cli_backend: Some("host".to_string()),
1743            ..Default::default()
1744        };
1745        assert!(config.capture_output.should_capture());
1746        assert_eq!(config.max_parallel, 4);
1747        assert_eq!(config.working_dir, Some(PathBuf::from("/tmp")));
1748        assert!(config.show_cache_path);
1749    }
1750
1751    #[test]
1752    fn test_task_result_clone() {
1753        let result = TaskResult {
1754            name: "test".to_string(),
1755            exit_code: Some(0),
1756            stdout: "output".to_string(),
1757            stderr: "error".to_string(),
1758            success: true,
1759        };
1760        let cloned = result.clone();
1761        assert_eq!(cloned.name, result.name);
1762        assert_eq!(cloned.exit_code, result.exit_code);
1763        assert_eq!(cloned.stdout, result.stdout);
1764        assert_eq!(cloned.stderr, result.stderr);
1765        assert_eq!(cloned.success, result.success);
1766    }
1767
1768    #[test]
1769    fn test_task_failure_snippet_lines_constant() {
1770        assert_eq!(TASK_FAILURE_SNIPPET_LINES, 20);
1771    }
1772}