cuenv_core/tasks/
executor.rs

1//! Task executor for running tasks with environment support.
2//!
3//! - Environment variable propagation
4//! - Parallel and sequential execution
5//! - Host execution; isolation/caching is delegated to other backends
6
7use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
8use super::{Task, TaskGraph, TaskGroup, TaskNode, Tasks};
9use crate::config::BackendConfig;
10use crate::environment::Environment;
11use crate::{Error, Result};
12use async_recursion::async_recursion;
13use cuenv_workspaces::PackageManager;
14use std::path::{Path, PathBuf};
15use std::process::Stdio;
16use std::sync::Arc;
17use tokio::process::Command;
18use tokio::task::JoinSet;
19use tracing::instrument;
20
21/// Task execution result
22#[derive(Debug, Clone)]
23pub struct TaskResult {
24    pub name: String,
25    pub exit_code: Option<i32>,
26    pub stdout: String,
27    pub stderr: String,
28    pub success: bool,
29}
30
31/// Number of lines from stdout/stderr to include when summarizing failures
32pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
33
34/// Task executor configuration
35#[derive(Debug, Clone)]
36pub struct ExecutorConfig {
37    /// Whether to capture output (vs streaming to stdout/stderr)
38    pub capture_output: bool,
39    /// Maximum parallel tasks (0 = unlimited)
40    pub max_parallel: usize,
41    /// Environment variables to propagate (resolved via policies)
42    pub environment: Environment,
43    /// Optional working directory override (reserved for future backends)
44    pub working_dir: Option<PathBuf>,
45    /// Project root for resolving inputs/outputs (env.cue root)
46    pub project_root: PathBuf,
47    /// Path to cue.mod root for resolving relative source paths
48    pub cue_module_root: Option<PathBuf>,
49    /// Optional: materialize cached outputs on cache hit
50    pub materialize_outputs: Option<PathBuf>,
51    /// Optional: cache directory override
52    pub cache_dir: Option<PathBuf>,
53    /// Optional: print cache path on hits/misses
54    pub show_cache_path: bool,
55    /// Backend configuration
56    pub backend_config: Option<BackendConfig>,
57    /// CLI backend selection override
58    pub cli_backend: Option<String>,
59}
60
61impl Default for ExecutorConfig {
62    fn default() -> Self {
63        Self {
64            capture_output: true,
65            max_parallel: 0,
66            environment: Environment::new(),
67            working_dir: None,
68            project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
69            cue_module_root: None,
70            materialize_outputs: None,
71            cache_dir: None,
72            show_cache_path: false,
73            backend_config: None,
74            cli_backend: None,
75        }
76    }
77}
78
79/// Task executor
80pub struct TaskExecutor {
81    config: ExecutorConfig,
82    backend: Arc<dyn TaskBackend>,
83}
84impl TaskExecutor {
85    /// Create a new executor with host backend only
86    pub fn new(config: ExecutorConfig) -> Self {
87        Self::with_dagger_factory(config, None)
88    }
89
90    /// Create a new executor with optional dagger backend support.
91    ///
92    /// Pass `Some(cuenv_dagger::create_dagger_backend)` to enable dagger backend.
93    pub fn with_dagger_factory(
94        config: ExecutorConfig,
95        dagger_factory: Option<BackendFactory>,
96    ) -> Self {
97        let backend = create_backend_with_factory(
98            config.backend_config.as_ref(),
99            config.project_root.clone(),
100            config.cli_backend.as_deref(),
101            dagger_factory,
102        );
103        Self { config, backend }
104    }
105
106    /// Create a new executor with the given config but sharing the backend
107    fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
108        Self { config, backend }
109    }
110
111    /// Execute a single task
112    #[instrument(name = "execute_task", skip(self, task), fields(task_name = %name))]
113    pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
114        // Delegate execution to the configured backend.
115        // The backend implementation handles the specific execution details.
116
117        // If using Dagger backend, execute in a containerized context.
118        if self.backend.name() == "dagger" {
119            return self
120                .backend
121                .execute(
122                    name,
123                    task,
124                    &self.config.environment,
125                    &self.config.project_root,
126                    self.config.capture_output,
127                )
128                .await;
129        }
130
131        // Host backend runs tasks directly in the workspace.
132        self.execute_task_non_hermetic(name, task).await
133    }
134
135    /// Execute a task non-hermetically (directly in workspace/project root)
136    ///
137    /// Used for tasks like `bun install` that need to write to the real filesystem.
138    async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
139        // Check if this is an unresolved TaskRef (should have been resolved before execution)
140        if task.is_task_ref() && task.project_root.is_none() {
141            return Err(Error::configuration(format!(
142                "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
143                 This usually means:\n\
144                 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
145                 - The referenced task '{}' doesn't exist in that project\n\
146                 - There was an error loading the referenced project's env.cue\n\
147                 Run with RUST_LOG=debug for more details.",
148                name,
149                task.task_ref.as_deref().unwrap_or("unknown"),
150                task.task_ref
151                    .as_deref()
152                    .and_then(|r| r.split(':').next_back())
153                    .unwrap_or("unknown")
154            )));
155        }
156
157        // Determine working directory (in priority order):
158        // 1. Explicit directory field on task (relative to cue.mod root)
159        // 2. TaskRef project_root (from resolution)
160        // 3. Source file directory (from _source metadata)
161        // 4. Install tasks (hermetic: false with workspaces) run from workspace root
162        // 5. Default to project root
163        let workdir = if let Some(ref dir) = task.directory {
164            // Explicit directory override: resolve relative to cue.mod root or project root
165            self.config
166                .cue_module_root
167                .as_ref()
168                .unwrap_or(&self.config.project_root)
169                .join(dir)
170        } else if let Some(ref project_root) = task.project_root {
171            // TaskRef tasks run in their original project directory
172            project_root.clone()
173        } else if let Some(ref source) = task.source {
174            // Default: run in the directory of the source file
175            if let Some(dir) = source.directory() {
176                self.config
177                    .cue_module_root
178                    .as_ref()
179                    .unwrap_or(&self.config.project_root)
180                    .join(dir)
181            } else {
182                // Source is at root (e.g., "env.cue"), use cue_module_root if available
183                // This ensures tasks defined in root env.cue run from module root,
184                // even when invoked from a subdirectory
185                self.config
186                    .cue_module_root
187                    .clone()
188                    .unwrap_or_else(|| self.config.project_root.clone())
189            }
190        } else if !task.hermetic {
191            // For non-hermetic tasks, detect package manager from command to find workspace root
192            if let Some(manager) = cuenv_workspaces::detect_from_command(&task.command) {
193                find_workspace_root(manager, &self.config.project_root)
194            } else {
195                self.config.project_root.clone()
196            }
197        } else {
198            self.config.project_root.clone()
199        };
200
201        tracing::info!(
202            task = %name,
203            workdir = %workdir.display(),
204            hermetic = false,
205            "Executing non-hermetic task"
206        );
207
208        // Emit command being run - always emit task_started for all modes
209        // (TUI needs events even when capture_output is true)
210        let cmd_str = if let Some(script) = &task.script {
211            format!("[script: {} bytes]", script.len())
212        } else if task.command.is_empty() {
213            task.args.join(" ")
214        } else {
215            format!("{} {}", task.command, task.args.join(" "))
216        };
217
218        cuenv_events::emit_task_started!(name, cmd_str, false);
219
220        // Build command - handle script mode vs command mode
221        let mut cmd = if let Some(script) = &task.script {
222            // Script mode: use shell to execute the script
223            let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
224                (
225                    shell.command.clone().unwrap_or_else(|| "bash".to_string()),
226                    shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
227                )
228            } else {
229                // Default to bash for scripts
230                ("bash".to_string(), "-c".to_string())
231            };
232
233            let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
234            let mut cmd = Command::new(&resolved_shell);
235            cmd.arg(&shell_flag);
236            cmd.arg(script);
237            cmd
238        } else {
239            // Command mode: existing behavior
240            let resolved_command = self.config.environment.resolve_command(&task.command);
241
242            if let Some(shell) = &task.shell {
243                if let (Some(shell_command), Some(shell_flag)) = (&shell.command, &shell.flag) {
244                    let resolved_shell = self.config.environment.resolve_command(shell_command);
245                    let mut cmd = Command::new(&resolved_shell);
246                    cmd.arg(shell_flag);
247                    if task.args.is_empty() {
248                        cmd.arg(&resolved_command);
249                    } else {
250                        let full_command = if task.command.is_empty() {
251                            task.args.join(" ")
252                        } else {
253                            format!("{} {}", resolved_command, task.args.join(" "))
254                        };
255                        cmd.arg(full_command);
256                    }
257                    cmd
258                } else {
259                    let mut cmd = Command::new(&resolved_command);
260                    for arg in &task.args {
261                        cmd.arg(arg);
262                    }
263                    cmd
264                }
265            } else {
266                let mut cmd = Command::new(&resolved_command);
267                for arg in &task.args {
268                    cmd.arg(arg);
269                }
270                cmd
271            }
272        };
273
274        // Set working directory and environment (hermetic - no host PATH pollution)
275        cmd.current_dir(&workdir);
276        let env_vars = self.config.environment.merge_with_system_hermetic();
277        for (k, v) in &env_vars {
278            cmd.env(k, v);
279        }
280
281        // Force color output even when stdout is piped (for capture mode)
282        // These are widely supported: FORCE_COLOR by Node.js/chalk, CLICOLOR_FORCE by BSD/macOS
283        if !env_vars.contains_key("FORCE_COLOR") {
284            cmd.env("FORCE_COLOR", "1");
285        }
286        if !env_vars.contains_key("CLICOLOR_FORCE") {
287            cmd.env("CLICOLOR_FORCE", "1");
288        }
289
290        // Execute - always capture output for consistent behavior
291        // If not in capture mode, stream output to terminal in real-time
292        if self.config.capture_output {
293            use tokio::io::{AsyncBufReadExt, BufReader};
294
295            let start_time = std::time::Instant::now();
296
297            // Spawn with piped stdout/stderr for streaming
298            let mut child = cmd
299                .stdout(Stdio::piped())
300                .stderr(Stdio::piped())
301                .spawn()
302                .map_err(|e| Error::Io {
303                    source: e,
304                    path: None,
305                    operation: format!("spawn task {}", name),
306                })?;
307
308            // Take ownership of stdout/stderr handles
309            let stdout_handle = child.stdout.take();
310            let stderr_handle = child.stderr.take();
311
312            // Collect output while streaming events in real-time
313            let mut stdout_lines = Vec::new();
314            let mut stderr_lines = Vec::new();
315
316            // Stream stdout
317            let name_for_stdout = name.to_string();
318            let stdout_task = tokio::spawn(async move {
319                let mut lines = Vec::new();
320                if let Some(stdout) = stdout_handle {
321                    let mut reader = BufReader::new(stdout).lines();
322                    while let Ok(Some(line)) = reader.next_line().await {
323                        cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
324                        lines.push(line);
325                    }
326                }
327                lines
328            });
329
330            // Stream stderr
331            let name_for_stderr = name.to_string();
332            let stderr_task = tokio::spawn(async move {
333                let mut lines = Vec::new();
334                if let Some(stderr) = stderr_handle {
335                    let mut reader = BufReader::new(stderr).lines();
336                    while let Ok(Some(line)) = reader.next_line().await {
337                        cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
338                        lines.push(line);
339                    }
340                }
341                lines
342            });
343
344            // Wait for process to complete and collect output
345            let status = child.wait().await.map_err(|e| Error::Io {
346                source: e,
347                path: None,
348                operation: format!("wait for task {}", name),
349            })?;
350
351            // Collect streamed output
352            if let Ok(lines) = stdout_task.await {
353                stdout_lines = lines;
354            }
355            if let Ok(lines) = stderr_task.await {
356                stderr_lines = lines;
357            }
358
359            let duration_ms = start_time.elapsed().as_millis() as u64;
360            let stdout = stdout_lines.join("\n");
361            let stderr = stderr_lines.join("\n");
362            let exit_code = status.code().unwrap_or(-1);
363            let success = status.success();
364
365            // Emit task completion event
366            cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
367
368            if !success {
369                tracing::warn!(task = %name, exit = exit_code, "Task failed");
370                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
371                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
372            }
373
374            Ok(TaskResult {
375                name: name.to_string(),
376                exit_code: Some(exit_code),
377                stdout,
378                stderr,
379                success,
380            })
381        } else {
382            // Stream output directly to terminal (interactive mode)
383            let status = cmd
384                .stdout(Stdio::inherit())
385                .stderr(Stdio::inherit())
386                .status()
387                .await
388                .map_err(|e| Error::Io {
389                    source: e,
390                    path: None,
391                    operation: format!("spawn task {}", name),
392                })?;
393
394            let exit_code = status.code().unwrap_or(-1);
395            let success = status.success();
396
397            if !success {
398                tracing::warn!(task = %name, exit = exit_code, "Task failed");
399            }
400
401            Ok(TaskResult {
402                name: name.to_string(),
403                exit_code: Some(exit_code),
404                stdout: String::new(), // Output went to terminal
405                stderr: String::new(),
406                success,
407            })
408        }
409    }
410
411    /// Execute a task node (single task, group, or list)
412    #[async_recursion]
413    pub async fn execute_node(
414        &self,
415        name: &str,
416        node: &TaskNode,
417        all_tasks: &Tasks,
418    ) -> Result<Vec<TaskResult>> {
419        match node {
420            TaskNode::Task(task) => {
421                let result = self.execute_task(name, task.as_ref()).await?;
422                Ok(vec![result])
423            }
424            TaskNode::Group(group) => self.execute_parallel(name, group, all_tasks).await,
425            TaskNode::Sequence(seq) => self.execute_sequential(name, seq, all_tasks).await,
426        }
427    }
428
429    /// Execute a task definition (legacy alias for execute_node)
430    #[async_recursion]
431    pub async fn execute_definition(
432        &self,
433        name: &str,
434        node: &TaskNode,
435        all_tasks: &Tasks,
436    ) -> Result<Vec<TaskResult>> {
437        self.execute_node(name, node, all_tasks).await
438    }
439
440    async fn execute_sequential(
441        &self,
442        prefix: &str,
443        sequence: &[TaskNode],
444        all_tasks: &Tasks,
445    ) -> Result<Vec<TaskResult>> {
446        if !self.config.capture_output {
447            cuenv_events::emit_task_group_started!(prefix, true, sequence.len());
448        }
449        let mut results = Vec::new();
450        for (i, step) in sequence.iter().enumerate() {
451            let task_name = format!("{}[{}]", prefix, i);
452            let task_results = self.execute_node(&task_name, step, all_tasks).await?;
453            for result in &task_results {
454                // Sequences always stop on first error (no configuration option)
455                if !result.success {
456                    return Err(Error::task_failed(
457                        &result.name,
458                        result.exit_code.unwrap_or(-1),
459                        &result.stdout,
460                        &result.stderr,
461                    ));
462                }
463            }
464            results.extend(task_results);
465        }
466        Ok(results)
467    }
468
469    async fn execute_parallel(
470        &self,
471        prefix: &str,
472        group: &TaskGroup,
473        all_tasks: &Tasks,
474    ) -> Result<Vec<TaskResult>> {
475        // Check for "default" task to override parallel execution
476        if let Some(default_task) = group.children.get("default") {
477            if !self.config.capture_output {
478                cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
479            }
480            // Execute only the default task, using the group prefix directly
481            // since "default" is implicit when invoking the group name
482            let task_name = format!("{}.default", prefix);
483            return self.execute_node(&task_name, default_task, all_tasks).await;
484        }
485
486        if !self.config.capture_output {
487            cuenv_events::emit_task_group_started!(prefix, false, group.children.len());
488        }
489        let mut join_set = JoinSet::new();
490        let all_tasks = Arc::new(all_tasks.clone());
491        let mut all_results = Vec::new();
492        let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
493            if let Some(failed) = results.iter().find(|r| !r.success) {
494                return Err(Error::task_failed(
495                    &failed.name,
496                    failed.exit_code.unwrap_or(-1),
497                    &failed.stdout,
498                    &failed.stderr,
499                ));
500            }
501            all_results.extend(results);
502            Ok(())
503        };
504        for (name, child_node) in &group.children {
505            let task_name = format!("{}.{}", prefix, name);
506            let child_node = child_node.clone();
507            let all_tasks = Arc::clone(&all_tasks);
508            let executor = self.clone_with_config();
509            join_set.spawn(async move {
510                executor
511                    .execute_node(&task_name, &child_node, &all_tasks)
512                    .await
513            });
514            if self.config.max_parallel > 0
515                && join_set.len() >= self.config.max_parallel
516                && let Some(result) = join_set.join_next().await
517            {
518                match result {
519                    Ok(Ok(results)) => merge_results(results)?,
520                    Ok(Err(e)) => return Err(e),
521                    Err(e) => {
522                        return Err(Error::execution(format!("Task execution panicked: {}", e)));
523                    }
524                }
525            }
526        }
527        while let Some(result) = join_set.join_next().await {
528            match result {
529                Ok(Ok(results)) => merge_results(results)?,
530                Ok(Err(e)) => return Err(e),
531                Err(e) => {
532                    return Err(Error::execution(format!("Task execution panicked: {}", e)));
533                }
534            }
535        }
536        Ok(all_results)
537    }
538
539    #[instrument(name = "execute_graph", skip(self, graph), fields(task_count = graph.task_count()))]
540    pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
541        let parallel_groups = graph.get_parallel_groups()?;
542        let mut all_results = Vec::new();
543
544        // IMPORTANT:
545        // Each parallel group represents a dependency "level". We must not start tasks from the
546        // next group until *all* tasks from the current group have completed successfully.
547        //
548        // The previous implementation pipelined groups (starting the next group as soon as all
549        // tasks from the current group were spawned), which allowed dependent tasks to run before
550        // their dependencies finished (especially visible with long-running tasks like dev servers).
551        for mut group in parallel_groups {
552            let mut join_set = JoinSet::new();
553
554            while !group.is_empty() || !join_set.is_empty() {
555                // Fill the concurrency window for this group
556                while let Some(node) = group.pop() {
557                    let task = node.task.clone();
558                    let name = node.name.clone();
559                    let executor = self.clone_with_config();
560                    join_set.spawn(async move { executor.execute_task(&name, &task).await });
561
562                    if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
563                        break;
564                    }
565                }
566
567                if let Some(result) = join_set.join_next().await {
568                    match result {
569                        Ok(Ok(task_result)) => {
570                            if !task_result.success {
571                                join_set.abort_all();
572                                return Err(Error::task_failed(
573                                    &task_result.name,
574                                    task_result.exit_code.unwrap_or(-1),
575                                    &task_result.stdout,
576                                    &task_result.stderr,
577                                ));
578                            }
579                            all_results.push(task_result);
580                        }
581                        Ok(Err(e)) => {
582                            join_set.abort_all();
583                            return Err(e);
584                        }
585                        Err(e) => {
586                            join_set.abort_all();
587                            return Err(Error::execution(format!(
588                                "Task execution panicked: {}",
589                                e
590                            )));
591                        }
592                    }
593                }
594            }
595        }
596
597        Ok(all_results)
598    }
599
600    fn clone_with_config(&self) -> Self {
601        // Share the backend across clones to preserve container cache for Dagger chaining
602        Self::with_shared_backend(self.config.clone(), self.backend.clone())
603    }
604}
605
606fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
607    let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
608
609    loop {
610        let is_root = match manager {
611            PackageManager::Npm
612            | PackageManager::Bun
613            | PackageManager::YarnClassic
614            | PackageManager::YarnModern => package_json_has_workspaces(&current),
615            PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
616            PackageManager::Cargo => cargo_toml_has_workspace(&current),
617            PackageManager::Deno => deno_json_has_workspace(&current),
618        };
619
620        if is_root {
621            return current;
622        }
623
624        if let Some(parent) = current.parent() {
625            current = parent.to_path_buf();
626        } else {
627            return start.to_path_buf();
628        }
629    }
630}
631
632fn package_json_has_workspaces(dir: &Path) -> bool {
633    let path = dir.join("package.json");
634    let content = std::fs::read_to_string(&path);
635    let Ok(json) = content.and_then(|s| {
636        serde_json::from_str::<serde_json::Value>(&s)
637            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
638    }) else {
639        return false;
640    };
641
642    match json.get("workspaces") {
643        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
644        Some(serde_json::Value::Object(map)) => map
645            .get("packages")
646            .and_then(|packages| packages.as_array())
647            .map(|arr| !arr.is_empty())
648            .unwrap_or(false),
649        _ => false,
650    }
651}
652
653fn cargo_toml_has_workspace(dir: &Path) -> bool {
654    let path = dir.join("Cargo.toml");
655    let Ok(content) = std::fs::read_to_string(&path) else {
656        return false;
657    };
658
659    content.contains("[workspace]")
660}
661
662fn deno_json_has_workspace(dir: &Path) -> bool {
663    let path = dir.join("deno.json");
664    let content = std::fs::read_to_string(&path);
665    let Ok(json) = content.and_then(|s| {
666        serde_json::from_str::<serde_json::Value>(&s)
667            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
668    }) else {
669        return false;
670    };
671
672    // Deno uses "workspace" (not "workspaces") for workspace configuration
673    match json.get("workspace") {
674        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
675        Some(serde_json::Value::Object(_)) => true,
676        _ => false,
677    }
678}
679
680/// Build a compact, user-friendly summary for a failed task, including the
681/// exit code and the tail of stdout/stderr to help with diagnostics.
682pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
683    let exit_code = result
684        .exit_code
685        .map(|c| c.to_string())
686        .unwrap_or_else(|| "unknown".to_string());
687
688    let mut sections = Vec::new();
689    sections.push(format!(
690        "Task '{}' failed with exit code {}.",
691        result.name, exit_code
692    ));
693
694    let output = format_failure_streams(result, max_output_lines);
695    if output.is_empty() {
696        sections.push(
697            "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
698                .to_string(),
699        );
700    } else {
701        sections.push(output);
702    }
703
704    sections.join("\n\n")
705}
706
707fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
708    let mut streams = Vec::new();
709
710    if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
711        streams.push(stdout);
712    }
713
714    if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
715        streams.push(stderr);
716    }
717
718    streams.join("\n\n")
719}
720
721fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
722    let normalized = content.trim_end();
723    if normalized.is_empty() {
724        return None;
725    }
726
727    let lines: Vec<&str> = normalized.lines().collect();
728    let total = lines.len();
729    let start = total.saturating_sub(max_output_lines);
730    let snippet = lines[start..].join("\n");
731
732    let header = if total > max_output_lines {
733        format!("{label} (last {max_output_lines} of {total} lines):")
734    } else {
735        format!("{label}:")
736    };
737
738    Some(format!("{header}\n{snippet}"))
739}
740
741/// Execute an arbitrary command with the cuenv environment
742///
743/// If `secrets` is provided, output will be captured and redacted before printing.
744pub async fn execute_command(
745    command: &str,
746    args: &[String],
747    environment: &Environment,
748) -> Result<i32> {
749    execute_command_with_redaction(command, args, environment, &[]).await
750}
751
752/// Execute a command with secret redaction
753///
754/// Secret values in stdout/stderr are replaced with [REDACTED].
755pub async fn execute_command_with_redaction(
756    command: &str,
757    args: &[String],
758    environment: &Environment,
759    secrets: &[String],
760) -> Result<i32> {
761    use tokio::io::{AsyncBufReadExt, BufReader};
762
763    tracing::info!("Executing command: {} {:?}", command, args);
764    let mut cmd = Command::new(command);
765    cmd.args(args);
766    // Use hermetic environment - no host PATH pollution
767    let env_vars = environment.merge_with_system_hermetic();
768    for (key, value) in env_vars {
769        cmd.env(key, value);
770    }
771
772    if secrets.is_empty() {
773        // No secrets to redact - inherit stdio directly
774        cmd.stdout(Stdio::inherit());
775        cmd.stderr(Stdio::inherit());
776        cmd.stdin(Stdio::inherit());
777        let status = cmd.status().await.map_err(|e| {
778            Error::configuration(format!("Failed to execute command '{}': {}", command, e))
779        })?;
780        return Ok(status.code().unwrap_or(1));
781    }
782
783    // Capture output for redaction
784    cmd.stdout(Stdio::piped());
785    cmd.stderr(Stdio::piped());
786    cmd.stdin(Stdio::inherit());
787
788    let mut child = cmd.spawn().map_err(|e| {
789        Error::configuration(format!("Failed to execute command '{}': {}", command, e))
790    })?;
791
792    let stdout = child
793        .stdout
794        .take()
795        .ok_or_else(|| Error::execution("stdout pipe not available"))?;
796    let stderr = child
797        .stderr
798        .take()
799        .ok_or_else(|| Error::execution("stderr pipe not available"))?;
800
801    // Build sorted secrets for greedy matching (longer first)
802    let mut sorted_secrets: Vec<&str> = secrets.iter().map(String::as_str).collect();
803    sorted_secrets.sort_by_key(|s| std::cmp::Reverse(s.len()));
804    let sorted_secrets: Vec<String> = sorted_secrets.into_iter().map(String::from).collect();
805
806    // Stream stdout with redaction
807    let secrets_clone = sorted_secrets.clone();
808    let stdout_task = tokio::spawn(async move {
809        let reader = BufReader::new(stdout);
810        let mut lines = reader.lines();
811        while let Ok(Some(line)) = lines.next_line().await {
812            let mut redacted = line;
813            for secret in &secrets_clone {
814                if secret.len() >= 4 {
815                    redacted = redacted.replace(secret, "[REDACTED]");
816                }
817            }
818            cuenv_events::emit_stdout!(&redacted);
819        }
820    });
821
822    // Stream stderr with redaction
823    let stderr_task = tokio::spawn(async move {
824        let reader = BufReader::new(stderr);
825        let mut lines = reader.lines();
826        while let Ok(Some(line)) = lines.next_line().await {
827            let mut redacted = line;
828            for secret in &sorted_secrets {
829                if secret.len() >= 4 {
830                    redacted = redacted.replace(secret, "[REDACTED]");
831                }
832            }
833            cuenv_events::emit_stderr!(&redacted);
834        }
835    });
836
837    // Wait for command and streams
838    let status = child.wait().await.map_err(|e| {
839        Error::configuration(format!("Failed to wait for command '{}': {}", command, e))
840    })?;
841
842    let _ = stdout_task.await;
843    let _ = stderr_task.await;
844
845    Ok(status.code().unwrap_or(1))
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use crate::tasks::TaskDependency;
852    use tempfile::TempDir;
853
854    #[tokio::test]
855    async fn test_executor_config_default() {
856        let config = ExecutorConfig::default();
857        assert!(config.capture_output);
858        assert_eq!(config.max_parallel, 0);
859        assert!(config.environment.is_empty());
860    }
861
862    #[tokio::test]
863    async fn test_task_result() {
864        let result = TaskResult {
865            name: "test".to_string(),
866            exit_code: Some(0),
867            stdout: "output".to_string(),
868            stderr: String::new(),
869            success: true,
870        };
871        assert_eq!(result.name, "test");
872        assert_eq!(result.exit_code, Some(0));
873        assert!(result.success);
874        assert_eq!(result.stdout, "output");
875    }
876
877    #[tokio::test]
878    async fn test_execute_simple_task() {
879        let config = ExecutorConfig {
880            capture_output: true,
881            ..Default::default()
882        };
883        let executor = TaskExecutor::new(config);
884        let task = Task {
885            command: "echo".to_string(),
886            args: vec!["hello".to_string()],
887            description: Some("Hello task".to_string()),
888            ..Default::default()
889        };
890        let result = executor.execute_task("test", &task).await.unwrap();
891        assert!(result.success);
892        assert_eq!(result.exit_code, Some(0));
893        assert!(result.stdout.contains("hello"));
894    }
895
896    #[tokio::test]
897    async fn test_execute_with_environment() {
898        let mut config = ExecutorConfig {
899            capture_output: true,
900            ..Default::default()
901        };
902        config
903            .environment
904            .set("TEST_VAR".to_string(), "test_value".to_string());
905        let executor = TaskExecutor::new(config);
906        let task = Task {
907            command: "printenv".to_string(),
908            args: vec!["TEST_VAR".to_string()],
909            description: Some("Print env task".to_string()),
910            ..Default::default()
911        };
912        let result = executor.execute_task("test", &task).await.unwrap();
913        assert!(result.success);
914        assert!(result.stdout.contains("test_value"));
915    }
916
917    #[tokio::test]
918    async fn test_execute_failing_task() {
919        let config = ExecutorConfig {
920            capture_output: true,
921            ..Default::default()
922        };
923        let executor = TaskExecutor::new(config);
924        let task = Task {
925            command: "false".to_string(),
926            description: Some("Failing task".to_string()),
927            ..Default::default()
928        };
929        let result = executor.execute_task("test", &task).await.unwrap();
930        assert!(!result.success);
931        assert_eq!(result.exit_code, Some(1));
932    }
933
934    #[tokio::test]
935    async fn test_execute_sequential_group() {
936        let config = ExecutorConfig {
937            capture_output: true,
938            ..Default::default()
939        };
940        let executor = TaskExecutor::new(config);
941        let task1 = Task {
942            command: "echo".to_string(),
943            args: vec!["first".to_string()],
944            description: Some("First task".to_string()),
945            ..Default::default()
946        };
947        let task2 = Task {
948            command: "echo".to_string(),
949            args: vec!["second".to_string()],
950            description: Some("Second task".to_string()),
951            ..Default::default()
952        };
953        let sequence = vec![
954            TaskNode::Task(Box::new(task1)),
955            TaskNode::Task(Box::new(task2)),
956        ];
957        let all_tasks = Tasks::new();
958        let node = TaskNode::Sequence(sequence);
959        let results = executor
960            .execute_node("seq", &node, &all_tasks)
961            .await
962            .unwrap();
963        assert_eq!(results.len(), 2);
964        assert!(results[0].stdout.contains("first"));
965        assert!(results[1].stdout.contains("second"));
966    }
967
968    #[tokio::test]
969    async fn test_command_injection_prevention() {
970        let config = ExecutorConfig {
971            capture_output: true,
972            ..Default::default()
973        };
974        let executor = TaskExecutor::new(config);
975        let malicious_task = Task {
976            command: "echo".to_string(),
977            args: vec!["hello".to_string(), "; rm -rf /".to_string()],
978            description: Some("Malicious task test".to_string()),
979            ..Default::default()
980        };
981        let result = executor
982            .execute_task("malicious", &malicious_task)
983            .await
984            .unwrap();
985        assert!(result.success);
986        assert!(result.stdout.contains("hello ; rm -rf /"));
987    }
988
989    #[tokio::test]
990    async fn test_special_characters_in_args() {
991        let config = ExecutorConfig {
992            capture_output: true,
993            ..Default::default()
994        };
995        let executor = TaskExecutor::new(config);
996        let special_chars = vec![
997            "$USER",
998            "$(whoami)",
999            "`whoami`",
1000            "&& echo hacked",
1001            "|| echo failed",
1002            "> /tmp/hack",
1003            "| cat",
1004        ];
1005        for special_arg in special_chars {
1006            let task = Task {
1007                command: "echo".to_string(),
1008                args: vec!["safe".to_string(), special_arg.to_string()],
1009                description: Some("Special character test".to_string()),
1010                ..Default::default()
1011            };
1012            let result = executor.execute_task("special", &task).await.unwrap();
1013            assert!(result.success);
1014            assert!(result.stdout.contains("safe"));
1015            assert!(result.stdout.contains(special_arg));
1016        }
1017    }
1018
1019    #[tokio::test]
1020    async fn test_environment_variable_safety() {
1021        let mut config = ExecutorConfig {
1022            capture_output: true,
1023            ..Default::default()
1024        };
1025        config
1026            .environment
1027            .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1028        let executor = TaskExecutor::new(config);
1029        let task = Task {
1030            command: "printenv".to_string(),
1031            args: vec!["DANGEROUS_VAR".to_string()],
1032            description: Some("Environment variable safety test".to_string()),
1033            ..Default::default()
1034        };
1035        let result = executor.execute_task("env_test", &task).await.unwrap();
1036        assert!(result.success);
1037        assert!(result.stdout.contains("; rm -rf /"));
1038    }
1039
1040    #[tokio::test]
1041    async fn test_execute_graph_parallel_groups() {
1042        // two independent tasks -> can run in same parallel group
1043        let config = ExecutorConfig {
1044            capture_output: true,
1045            max_parallel: 2,
1046            ..Default::default()
1047        };
1048        let executor = TaskExecutor::new(config);
1049        let mut graph = TaskGraph::new();
1050
1051        let t1 = Task {
1052            command: "echo".into(),
1053            args: vec!["A".into()],
1054            ..Default::default()
1055        };
1056        let t2 = Task {
1057            command: "echo".into(),
1058            args: vec!["B".into()],
1059            ..Default::default()
1060        };
1061
1062        graph.add_task("t1", t1).unwrap();
1063        graph.add_task("t2", t2).unwrap();
1064        let results = executor.execute_graph(&graph).await.unwrap();
1065        assert_eq!(results.len(), 2);
1066        let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1067        assert!(joined.contains("A") && joined.contains("B"));
1068    }
1069
1070    #[tokio::test]
1071    async fn test_execute_graph_respects_dependency_levels() {
1072        let tmp = TempDir::new().unwrap();
1073        let root = tmp.path();
1074
1075        let config = ExecutorConfig {
1076            capture_output: true,
1077            max_parallel: 2,
1078            project_root: root.to_path_buf(),
1079            ..Default::default()
1080        };
1081        let executor = TaskExecutor::new(config);
1082
1083        let mut tasks = Tasks::new();
1084        tasks.tasks.insert(
1085            "dep".into(),
1086            TaskNode::Task(Box::new(Task {
1087                command: "sh".into(),
1088                args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
1089                ..Default::default()
1090            })),
1091        );
1092        tasks.tasks.insert(
1093            "consumer".into(),
1094            TaskNode::Task(Box::new(Task {
1095                command: "sh".into(),
1096                args: vec!["-c".into(), "cat marker.txt".into()],
1097                depends_on: vec![TaskDependency::from_name("dep")],
1098                ..Default::default()
1099            })),
1100        );
1101
1102        let mut graph = TaskGraph::new();
1103        graph.build_for_task("consumer", &tasks).unwrap();
1104
1105        let results = executor.execute_graph(&graph).await.unwrap();
1106        assert_eq!(results.len(), 2);
1107
1108        let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
1109        assert!(consumer.success);
1110        assert!(consumer.stdout.contains("ok"));
1111    }
1112
1113    #[test]
1114    fn test_summarize_task_failure_with_exit_code() {
1115        let result = TaskResult {
1116            name: "build".to_string(),
1117            exit_code: Some(127),
1118            stdout: String::new(),
1119            stderr: "command not found".to_string(),
1120            success: false,
1121        };
1122        let summary = summarize_task_failure(&result, 10);
1123        assert!(summary.contains("build"));
1124        assert!(summary.contains("127"));
1125        assert!(summary.contains("command not found"));
1126    }
1127
1128    #[test]
1129    fn test_summarize_task_failure_no_exit_code() {
1130        let result = TaskResult {
1131            name: "killed".to_string(),
1132            exit_code: None,
1133            stdout: String::new(),
1134            stderr: String::new(),
1135            success: false,
1136        };
1137        let summary = summarize_task_failure(&result, 10);
1138        assert!(summary.contains("killed"));
1139        assert!(summary.contains("unknown"));
1140    }
1141
1142    #[test]
1143    fn test_summarize_task_failure_no_output() {
1144        let result = TaskResult {
1145            name: "silent".to_string(),
1146            exit_code: Some(1),
1147            stdout: String::new(),
1148            stderr: String::new(),
1149            success: false,
1150        };
1151        let summary = summarize_task_failure(&result, 10);
1152        assert!(summary.contains("No stdout/stderr"));
1153        assert!(summary.contains("RUST_LOG=debug"));
1154    }
1155
1156    #[test]
1157    fn test_summarize_task_failure_truncates_long_output() {
1158        let result = TaskResult {
1159            name: "verbose".to_string(),
1160            exit_code: Some(1),
1161            stdout: (1..=50)
1162                .map(|i| format!("line {}", i))
1163                .collect::<Vec<_>>()
1164                .join("\n"),
1165            stderr: String::new(),
1166            success: false,
1167        };
1168        let summary = summarize_task_failure(&result, 10);
1169        assert!(summary.contains("last 10 of 50 lines"));
1170        assert!(summary.contains("line 50"));
1171        assert!(!summary.contains("line 1\n")); // First line should be truncated
1172    }
1173
1174    #[test]
1175    fn test_summarize_stream_empty() {
1176        assert!(summarize_stream("test", "", 10).is_none());
1177        assert!(summarize_stream("test", "   ", 10).is_none());
1178        assert!(summarize_stream("test", "\n\n", 10).is_none());
1179    }
1180
1181    #[test]
1182    fn test_summarize_stream_short() {
1183        let result = summarize_stream("stdout", "line 1\nline 2", 10).unwrap();
1184        assert!(result.contains("stdout:"));
1185        assert!(result.contains("line 1"));
1186        assert!(result.contains("line 2"));
1187        assert!(!result.contains("last"));
1188    }
1189
1190    #[test]
1191    fn test_format_failure_streams_both() {
1192        let result = TaskResult {
1193            name: "test".to_string(),
1194            exit_code: Some(1),
1195            stdout: "stdout content".to_string(),
1196            stderr: "stderr content".to_string(),
1197            success: false,
1198        };
1199        let formatted = format_failure_streams(&result, 10);
1200        assert!(formatted.contains("stdout:"));
1201        assert!(formatted.contains("stderr:"));
1202        assert!(formatted.contains("stdout content"));
1203        assert!(formatted.contains("stderr content"));
1204    }
1205
1206    #[test]
1207    fn test_find_workspace_root_with_npm() {
1208        let tmp = TempDir::new().unwrap();
1209        // Create a workspace structure
1210        std::fs::write(
1211            tmp.path().join("package.json"),
1212            r#"{"workspaces": ["packages/*"]}"#,
1213        )
1214        .unwrap();
1215        let subdir = tmp.path().join("packages").join("subpkg");
1216        std::fs::create_dir_all(&subdir).unwrap();
1217
1218        let root = find_workspace_root(PackageManager::Npm, &subdir);
1219        assert_eq!(root, tmp.path().canonicalize().unwrap());
1220    }
1221
1222    #[test]
1223    fn test_find_workspace_root_with_pnpm() {
1224        let tmp = TempDir::new().unwrap();
1225        std::fs::write(
1226            tmp.path().join("pnpm-workspace.yaml"),
1227            "packages:\n  - 'packages/*'",
1228        )
1229        .unwrap();
1230        let subdir = tmp.path().join("packages").join("app");
1231        std::fs::create_dir_all(&subdir).unwrap();
1232
1233        let root = find_workspace_root(PackageManager::Pnpm, &subdir);
1234        assert_eq!(root, tmp.path().canonicalize().unwrap());
1235    }
1236
1237    #[test]
1238    fn test_find_workspace_root_with_cargo() {
1239        let tmp = TempDir::new().unwrap();
1240        std::fs::write(
1241            tmp.path().join("Cargo.toml"),
1242            "[workspace]\nmembers = [\"crates/*\"]",
1243        )
1244        .unwrap();
1245        let subdir = tmp.path().join("crates").join("core");
1246        std::fs::create_dir_all(&subdir).unwrap();
1247
1248        let root = find_workspace_root(PackageManager::Cargo, &subdir);
1249        assert_eq!(root, tmp.path().canonicalize().unwrap());
1250    }
1251
1252    #[test]
1253    fn test_find_workspace_root_no_workspace() {
1254        let tmp = TempDir::new().unwrap();
1255        let root = find_workspace_root(PackageManager::Npm, tmp.path());
1256        // Should return the start path when no workspace is found
1257        assert_eq!(root, tmp.path().to_path_buf());
1258    }
1259
1260    #[test]
1261    fn test_package_json_has_workspaces_array() {
1262        let tmp = TempDir::new().unwrap();
1263        std::fs::write(
1264            tmp.path().join("package.json"),
1265            r#"{"workspaces": ["packages/*"]}"#,
1266        )
1267        .unwrap();
1268        assert!(package_json_has_workspaces(tmp.path()));
1269    }
1270
1271    #[test]
1272    fn test_package_json_has_workspaces_object() {
1273        let tmp = TempDir::new().unwrap();
1274        std::fs::write(
1275            tmp.path().join("package.json"),
1276            r#"{"workspaces": {"packages": ["packages/*"]}}"#,
1277        )
1278        .unwrap();
1279        assert!(package_json_has_workspaces(tmp.path()));
1280    }
1281
1282    #[test]
1283    fn test_package_json_no_workspaces() {
1284        let tmp = TempDir::new().unwrap();
1285        std::fs::write(tmp.path().join("package.json"), r#"{"name": "test"}"#).unwrap();
1286        assert!(!package_json_has_workspaces(tmp.path()));
1287    }
1288
1289    #[test]
1290    fn test_package_json_empty_workspaces() {
1291        let tmp = TempDir::new().unwrap();
1292        std::fs::write(tmp.path().join("package.json"), r#"{"workspaces": []}"#).unwrap();
1293        assert!(!package_json_has_workspaces(tmp.path()));
1294    }
1295
1296    #[test]
1297    fn test_package_json_missing() {
1298        let tmp = TempDir::new().unwrap();
1299        assert!(!package_json_has_workspaces(tmp.path()));
1300    }
1301
1302    #[test]
1303    fn test_cargo_toml_has_workspace() {
1304        let tmp = TempDir::new().unwrap();
1305        std::fs::write(
1306            tmp.path().join("Cargo.toml"),
1307            "[workspace]\nmembers = [\"crates/*\"]",
1308        )
1309        .unwrap();
1310        assert!(cargo_toml_has_workspace(tmp.path()));
1311    }
1312
1313    #[test]
1314    fn test_cargo_toml_no_workspace() {
1315        let tmp = TempDir::new().unwrap();
1316        std::fs::write(tmp.path().join("Cargo.toml"), "[package]\nname = \"test\"").unwrap();
1317        assert!(!cargo_toml_has_workspace(tmp.path()));
1318    }
1319
1320    #[test]
1321    fn test_cargo_toml_missing() {
1322        let tmp = TempDir::new().unwrap();
1323        assert!(!cargo_toml_has_workspace(tmp.path()));
1324    }
1325
1326    #[test]
1327    fn test_deno_json_has_workspace_array() {
1328        let tmp = TempDir::new().unwrap();
1329        std::fs::write(
1330            tmp.path().join("deno.json"),
1331            r#"{"workspace": ["./packages/*"]}"#,
1332        )
1333        .unwrap();
1334        assert!(deno_json_has_workspace(tmp.path()));
1335    }
1336
1337    #[test]
1338    fn test_deno_json_has_workspace_object() {
1339        let tmp = TempDir::new().unwrap();
1340        std::fs::write(
1341            tmp.path().join("deno.json"),
1342            r#"{"workspace": {"members": ["./packages/*"]}}"#,
1343        )
1344        .unwrap();
1345        assert!(deno_json_has_workspace(tmp.path()));
1346    }
1347
1348    #[test]
1349    fn test_deno_json_no_workspace() {
1350        let tmp = TempDir::new().unwrap();
1351        std::fs::write(tmp.path().join("deno.json"), r#"{"name": "test"}"#).unwrap();
1352        assert!(!deno_json_has_workspace(tmp.path()));
1353    }
1354
1355    #[test]
1356    fn test_deno_json_missing() {
1357        let tmp = TempDir::new().unwrap();
1358        assert!(!deno_json_has_workspace(tmp.path()));
1359    }
1360
1361    #[test]
1362    fn test_executor_config_with_fields() {
1363        let config = ExecutorConfig {
1364            capture_output: true,
1365            max_parallel: 4,
1366            working_dir: Some(PathBuf::from("/tmp")),
1367            project_root: PathBuf::from("/project"),
1368            cue_module_root: Some(PathBuf::from("/project/cue.mod")),
1369            materialize_outputs: Some(PathBuf::from("/outputs")),
1370            cache_dir: Some(PathBuf::from("/cache")),
1371            show_cache_path: true,
1372            cli_backend: Some("host".to_string()),
1373            ..Default::default()
1374        };
1375        assert!(config.capture_output);
1376        assert_eq!(config.max_parallel, 4);
1377        assert_eq!(config.working_dir, Some(PathBuf::from("/tmp")));
1378        assert!(config.show_cache_path);
1379    }
1380
1381    #[test]
1382    fn test_task_result_clone() {
1383        let result = TaskResult {
1384            name: "test".to_string(),
1385            exit_code: Some(0),
1386            stdout: "output".to_string(),
1387            stderr: "error".to_string(),
1388            success: true,
1389        };
1390        let cloned = result.clone();
1391        assert_eq!(cloned.name, result.name);
1392        assert_eq!(cloned.exit_code, result.exit_code);
1393        assert_eq!(cloned.stdout, result.stdout);
1394        assert_eq!(cloned.stderr, result.stderr);
1395        assert_eq!(cloned.success, result.success);
1396    }
1397
1398    #[test]
1399    fn test_task_failure_snippet_lines_constant() {
1400        assert_eq!(TASK_FAILURE_SNIPPET_LINES, 20);
1401    }
1402}