cuenv_core/tasks/
executor.rs

1//! Task executor for running tasks with environment support.
2//!
3//! - Environment variable propagation
4//! - Parallel and sequential execution
5//! - Host execution; isolation/caching is delegated to other backends
6
7use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
8use super::{ParallelGroup, Task, TaskDefinition, TaskGraph, TaskGroup, Tasks};
9use crate::config::BackendConfig;
10use crate::environment::Environment;
11use crate::manifest::WorkspaceConfig;
12use crate::{Error, Result};
13use async_recursion::async_recursion;
14use cuenv_workspaces::PackageManager;
15use std::collections::HashMap;
16use std::path::{Path, PathBuf};
17use std::process::Stdio;
18use std::sync::Arc;
19use tokio::process::Command;
20use tokio::task::JoinSet;
21
22/// Task execution result
23#[derive(Debug, Clone)]
24pub struct TaskResult {
25    pub name: String,
26    pub exit_code: Option<i32>,
27    pub stdout: String,
28    pub stderr: String,
29    pub success: bool,
30}
31
32/// Number of lines from stdout/stderr to include when summarizing failures
33pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
34
35/// Task executor configuration
36#[derive(Debug, Clone)]
37pub struct ExecutorConfig {
38    /// Whether to capture output (vs streaming to stdout/stderr)
39    pub capture_output: bool,
40    /// Maximum parallel tasks (0 = unlimited)
41    pub max_parallel: usize,
42    /// Environment variables to propagate (resolved via policies)
43    pub environment: Environment,
44    /// Optional working directory override (reserved for future backends)
45    pub working_dir: Option<PathBuf>,
46    /// Project root for resolving inputs/outputs (env.cue root)
47    pub project_root: PathBuf,
48    /// Path to cue.mod root for resolving relative source paths
49    pub cue_module_root: Option<PathBuf>,
50    /// Optional: materialize cached outputs on cache hit
51    pub materialize_outputs: Option<PathBuf>,
52    /// Optional: cache directory override
53    pub cache_dir: Option<PathBuf>,
54    /// Optional: print cache path on hits/misses
55    pub show_cache_path: bool,
56    /// Global workspace configuration
57    pub workspaces: Option<HashMap<String, WorkspaceConfig>>,
58    /// Backend configuration
59    pub backend_config: Option<BackendConfig>,
60    /// CLI backend selection override
61    pub cli_backend: Option<String>,
62}
63
64impl Default for ExecutorConfig {
65    fn default() -> Self {
66        Self {
67            capture_output: false,
68            max_parallel: 0,
69            environment: Environment::new(),
70            working_dir: None,
71            project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
72            cue_module_root: None,
73            materialize_outputs: None,
74            cache_dir: None,
75            show_cache_path: false,
76            workspaces: None,
77            backend_config: None,
78            cli_backend: None,
79        }
80    }
81}
82
83/// Task executor
84pub struct TaskExecutor {
85    config: ExecutorConfig,
86    backend: Arc<dyn TaskBackend>,
87}
88impl TaskExecutor {
89    /// Create a new executor with host backend only
90    pub fn new(config: ExecutorConfig) -> Self {
91        Self::with_dagger_factory(config, None)
92    }
93
94    /// Create a new executor with optional dagger backend support.
95    ///
96    /// Pass `Some(cuenv_dagger::create_dagger_backend)` to enable dagger backend.
97    pub fn with_dagger_factory(
98        config: ExecutorConfig,
99        dagger_factory: Option<BackendFactory>,
100    ) -> Self {
101        let backend = create_backend_with_factory(
102            config.backend_config.as_ref(),
103            config.project_root.clone(),
104            config.cli_backend.as_deref(),
105            dagger_factory,
106        );
107        Self { config, backend }
108    }
109
110    /// Create a new executor with the given config but sharing the backend
111    fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
112        Self { config, backend }
113    }
114
115    /// Execute a single task
116    pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
117        // Delegate execution to the configured backend.
118        // The backend implementation handles the specific execution details.
119
120        // If using Dagger backend, execute in a containerized context.
121        if self.backend.name() == "dagger" {
122            return self
123                .backend
124                .execute(
125                    name,
126                    task,
127                    &self.config.environment,
128                    &self.config.project_root,
129                    self.config.capture_output,
130                )
131                .await;
132        }
133
134        // Host backend runs tasks directly in the workspace.
135        self.execute_task_non_hermetic(name, task).await
136    }
137
138    /// Execute a task non-hermetically (directly in workspace/project root)
139    ///
140    /// Used for tasks like `bun install` that need to write to the real filesystem.
141    async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
142        // Check if this is an unresolved TaskRef (should have been resolved before execution)
143        if task.is_task_ref() && task.project_root.is_none() {
144            return Err(Error::configuration(format!(
145                "Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
146                 This usually means:\n\
147                 - The referenced project doesn't exist or has no 'name' field in env.cue\n\
148                 - The referenced task '{}' doesn't exist in that project\n\
149                 - There was an error loading the referenced project's env.cue\n\
150                 Run with RUST_LOG=debug for more details.",
151                name,
152                task.task_ref.as_deref().unwrap_or("unknown"),
153                task.task_ref
154                    .as_deref()
155                    .and_then(|r| r.split(':').next_back())
156                    .unwrap_or("unknown")
157            )));
158        }
159
160        // Determine working directory (in priority order):
161        // 1. Explicit directory field on task (relative to cue.mod root)
162        // 2. TaskRef project_root (from resolution)
163        // 3. Source file directory (from _source metadata)
164        // 4. Install tasks (hermetic: false with workspaces) run from workspace root
165        // 5. Default to project root
166        let workdir = if let Some(ref dir) = task.directory {
167            // Explicit directory override: resolve relative to cue.mod root or project root
168            self.config
169                .cue_module_root
170                .as_ref()
171                .unwrap_or(&self.config.project_root)
172                .join(dir)
173        } else if let Some(ref project_root) = task.project_root {
174            // TaskRef tasks run in their original project directory
175            project_root.clone()
176        } else if let Some(ref source) = task.source {
177            // Default: run in the directory of the source file
178            if let Some(dir) = source.directory() {
179                self.config
180                    .cue_module_root
181                    .as_ref()
182                    .unwrap_or(&self.config.project_root)
183                    .join(dir)
184            } else {
185                // Source is at root (e.g., "env.cue"), use cue_module_root if available
186                // This ensures tasks defined in root env.cue run from module root,
187                // even when invoked from a subdirectory
188                self.config
189                    .cue_module_root
190                    .clone()
191                    .unwrap_or_else(|| self.config.project_root.clone())
192            }
193        } else if !task.hermetic && !task.workspaces.is_empty() {
194            // Find workspace root for install tasks
195            let workspace_name = &task.workspaces[0];
196            let manager = match workspace_name.as_str() {
197                "bun" => PackageManager::Bun,
198                "npm" => PackageManager::Npm,
199                "pnpm" => PackageManager::Pnpm,
200                "yarn" => PackageManager::YarnModern,
201                "cargo" => PackageManager::Cargo,
202                _ => PackageManager::Npm, // fallback
203            };
204            find_workspace_root(manager, &self.config.project_root)
205        } else {
206            self.config.project_root.clone()
207        };
208
209        tracing::info!(
210            task = %name,
211            workdir = %workdir.display(),
212            hermetic = false,
213            "Executing non-hermetic task"
214        );
215
216        // Emit command being run - always emit task_started for all modes
217        // (TUI needs events even when capture_output is true)
218        let cmd_str = if let Some(script) = &task.script {
219            format!("[script: {} bytes]", script.len())
220        } else if task.command.is_empty() {
221            task.args.join(" ")
222        } else {
223            format!("{} {}", task.command, task.args.join(" "))
224        };
225
226        cuenv_events::emit_task_started!(name, cmd_str, false);
227
228        // Build command - handle script mode vs command mode
229        let mut cmd = if let Some(script) = &task.script {
230            // Script mode: use shell to execute the script
231            let (shell_cmd, shell_flag) = if let Some(shell) = &task.shell {
232                (
233                    shell.command.clone().unwrap_or_else(|| "bash".to_string()),
234                    shell.flag.clone().unwrap_or_else(|| "-c".to_string()),
235                )
236            } else {
237                // Default to bash for scripts
238                ("bash".to_string(), "-c".to_string())
239            };
240
241            let resolved_shell = self.config.environment.resolve_command(&shell_cmd);
242            let mut cmd = Command::new(&resolved_shell);
243            cmd.arg(&shell_flag);
244            cmd.arg(script);
245            cmd
246        } else {
247            // Command mode: existing behavior
248            let resolved_command = self.config.environment.resolve_command(&task.command);
249
250            if let Some(shell) = &task.shell {
251                if let (Some(shell_command), Some(shell_flag)) = (&shell.command, &shell.flag) {
252                    let resolved_shell = self.config.environment.resolve_command(shell_command);
253                    let mut cmd = Command::new(&resolved_shell);
254                    cmd.arg(shell_flag);
255                    if task.args.is_empty() {
256                        cmd.arg(&resolved_command);
257                    } else {
258                        let full_command = if task.command.is_empty() {
259                            task.args.join(" ")
260                        } else {
261                            format!("{} {}", resolved_command, task.args.join(" "))
262                        };
263                        cmd.arg(full_command);
264                    }
265                    cmd
266                } else {
267                    let mut cmd = Command::new(&resolved_command);
268                    for arg in &task.args {
269                        cmd.arg(arg);
270                    }
271                    cmd
272                }
273            } else {
274                let mut cmd = Command::new(&resolved_command);
275                for arg in &task.args {
276                    cmd.arg(arg);
277                }
278                cmd
279            }
280        };
281
282        // Set working directory and environment
283        cmd.current_dir(&workdir);
284        let env_vars = self.config.environment.merge_with_system();
285        for (k, v) in &env_vars {
286            cmd.env(k, v);
287        }
288
289        // Execute - always capture output for consistent behavior
290        // If not in capture mode, stream output to terminal in real-time
291        if self.config.capture_output {
292            use tokio::io::{AsyncBufReadExt, BufReader};
293
294            let start_time = std::time::Instant::now();
295
296            // Spawn with piped stdout/stderr for streaming
297            let mut child = cmd
298                .stdout(Stdio::piped())
299                .stderr(Stdio::piped())
300                .spawn()
301                .map_err(|e| Error::Io {
302                    source: e,
303                    path: None,
304                    operation: format!("spawn task {}", name),
305                })?;
306
307            // Take ownership of stdout/stderr handles
308            let stdout_handle = child.stdout.take();
309            let stderr_handle = child.stderr.take();
310
311            // Collect output while streaming events in real-time
312            let mut stdout_lines = Vec::new();
313            let mut stderr_lines = Vec::new();
314
315            // Stream stdout
316            let name_for_stdout = name.to_string();
317            let stdout_task = tokio::spawn(async move {
318                let mut lines = Vec::new();
319                if let Some(stdout) = stdout_handle {
320                    let mut reader = BufReader::new(stdout).lines();
321                    while let Ok(Some(line)) = reader.next_line().await {
322                        cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
323                        lines.push(line);
324                    }
325                }
326                lines
327            });
328
329            // Stream stderr
330            let name_for_stderr = name.to_string();
331            let stderr_task = tokio::spawn(async move {
332                let mut lines = Vec::new();
333                if let Some(stderr) = stderr_handle {
334                    let mut reader = BufReader::new(stderr).lines();
335                    while let Ok(Some(line)) = reader.next_line().await {
336                        cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
337                        lines.push(line);
338                    }
339                }
340                lines
341            });
342
343            // Wait for process to complete and collect output
344            let status = child.wait().await.map_err(|e| Error::Io {
345                source: e,
346                path: None,
347                operation: format!("wait for task {}", name),
348            })?;
349
350            // Collect streamed output
351            if let Ok(lines) = stdout_task.await {
352                stdout_lines = lines;
353            }
354            if let Ok(lines) = stderr_task.await {
355                stderr_lines = lines;
356            }
357
358            let duration_ms = start_time.elapsed().as_millis() as u64;
359            let stdout = stdout_lines.join("\n");
360            let stderr = stderr_lines.join("\n");
361            let exit_code = status.code().unwrap_or(-1);
362            let success = status.success();
363
364            // Emit task completion event
365            cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
366
367            if !success {
368                tracing::warn!(task = %name, exit = exit_code, "Task failed");
369                tracing::error!(task = %name, "Task stdout:\n{}", stdout);
370                tracing::error!(task = %name, "Task stderr:\n{}", stderr);
371            }
372
373            Ok(TaskResult {
374                name: name.to_string(),
375                exit_code: Some(exit_code),
376                stdout,
377                stderr,
378                success,
379            })
380        } else {
381            // Stream output directly to terminal (interactive mode)
382            let status = cmd
383                .stdout(Stdio::inherit())
384                .stderr(Stdio::inherit())
385                .status()
386                .await
387                .map_err(|e| Error::Io {
388                    source: e,
389                    path: None,
390                    operation: format!("spawn task {}", name),
391                })?;
392
393            let exit_code = status.code().unwrap_or(-1);
394            let success = status.success();
395
396            if !success {
397                tracing::warn!(task = %name, exit = exit_code, "Task failed");
398            }
399
400            Ok(TaskResult {
401                name: name.to_string(),
402                exit_code: Some(exit_code),
403                stdout: String::new(), // Output went to terminal
404                stderr: String::new(),
405                success,
406            })
407        }
408    }
409
410    /// Execute a task definition (single task or group)
411    #[async_recursion]
412    pub async fn execute_definition(
413        &self,
414        name: &str,
415        definition: &TaskDefinition,
416        all_tasks: &Tasks,
417    ) -> Result<Vec<TaskResult>> {
418        match definition {
419            TaskDefinition::Single(task) => {
420                let result = self.execute_task(name, task.as_ref()).await?;
421                Ok(vec![result])
422            }
423            TaskDefinition::Group(group) => self.execute_group(name, group, all_tasks).await,
424        }
425    }
426
427    async fn execute_group(
428        &self,
429        prefix: &str,
430        group: &TaskGroup,
431        all_tasks: &Tasks,
432    ) -> Result<Vec<TaskResult>> {
433        match group {
434            TaskGroup::Sequential(tasks) => self.execute_sequential(prefix, tasks, all_tasks).await,
435            TaskGroup::Parallel(group) => self.execute_parallel(prefix, group, all_tasks).await,
436        }
437    }
438
439    async fn execute_sequential(
440        &self,
441        prefix: &str,
442        tasks: &[TaskDefinition],
443        all_tasks: &Tasks,
444    ) -> Result<Vec<TaskResult>> {
445        if !self.config.capture_output {
446            cuenv_events::emit_task_group_started!(prefix, true, tasks.len());
447        }
448        let mut results = Vec::new();
449        for (i, task_def) in tasks.iter().enumerate() {
450            let task_name = format!("{}[{}]", prefix, i);
451            let task_results = self
452                .execute_definition(&task_name, task_def, all_tasks)
453                .await?;
454            for result in &task_results {
455                if !result.success {
456                    let message = format!(
457                        "Sequential task group '{prefix}' halted.\n\n{}",
458                        summarize_task_failure(result, TASK_FAILURE_SNIPPET_LINES)
459                    );
460                    return Err(Error::configuration(message));
461                }
462            }
463            results.extend(task_results);
464        }
465        Ok(results)
466    }
467
468    async fn execute_parallel(
469        &self,
470        prefix: &str,
471        group: &ParallelGroup,
472        all_tasks: &Tasks,
473    ) -> Result<Vec<TaskResult>> {
474        // Check for "default" task to override parallel execution
475        if let Some(default_task) = group.tasks.get("default") {
476            if !self.config.capture_output {
477                cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
478            }
479            // Execute only the default task, using the group prefix directly
480            // since "default" is implicit when invoking the group name
481            let task_name = format!("{}.default", prefix);
482            return self
483                .execute_definition(&task_name, default_task, all_tasks)
484                .await;
485        }
486
487        if !self.config.capture_output {
488            cuenv_events::emit_task_group_started!(prefix, false, group.tasks.len());
489        }
490        let mut join_set = JoinSet::new();
491        let all_tasks = Arc::new(all_tasks.clone());
492        let mut all_results = Vec::new();
493        let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
494            if let Some(failed) = results.iter().find(|r| !r.success) {
495                let message = format!(
496                    "Parallel task group '{prefix}' halted.\n\n{}",
497                    summarize_task_failure(failed, TASK_FAILURE_SNIPPET_LINES)
498                );
499                return Err(Error::configuration(message));
500            }
501            all_results.extend(results);
502            Ok(())
503        };
504        for (name, task_def) in &group.tasks {
505            let task_name = format!("{}.{}", prefix, name);
506            let task_def = task_def.clone();
507            let all_tasks = Arc::clone(&all_tasks);
508            let executor = self.clone_with_config();
509            join_set.spawn(async move {
510                executor
511                    .execute_definition(&task_name, &task_def, &all_tasks)
512                    .await
513            });
514            if self.config.max_parallel > 0
515                && join_set.len() >= self.config.max_parallel
516                && let Some(result) = join_set.join_next().await
517            {
518                match result {
519                    Ok(Ok(results)) => merge_results(results)?,
520                    Ok(Err(e)) => return Err(e),
521                    Err(e) => {
522                        return Err(Error::configuration(format!(
523                            "Task execution panicked: {}",
524                            e
525                        )));
526                    }
527                }
528            }
529        }
530        while let Some(result) = join_set.join_next().await {
531            match result {
532                Ok(Ok(results)) => merge_results(results)?,
533                Ok(Err(e)) => return Err(e),
534                Err(e) => {
535                    return Err(Error::configuration(format!(
536                        "Task execution panicked: {}",
537                        e
538                    )));
539                }
540            }
541        }
542        Ok(all_results)
543    }
544
545    pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
546        let parallel_groups = graph.get_parallel_groups()?;
547        let mut all_results = Vec::new();
548
549        // IMPORTANT:
550        // Each parallel group represents a dependency "level". We must not start tasks from the
551        // next group until *all* tasks from the current group have completed successfully.
552        //
553        // The previous implementation pipelined groups (starting the next group as soon as all
554        // tasks from the current group were spawned), which allowed dependent tasks to run before
555        // their dependencies finished (especially visible with long-running tasks like dev servers).
556        for mut group in parallel_groups {
557            let mut join_set = JoinSet::new();
558
559            while !group.is_empty() || !join_set.is_empty() {
560                // Fill the concurrency window for this group
561                while let Some(node) = group.pop() {
562                    let task = node.task.clone();
563                    let name = node.name.clone();
564                    let executor = self.clone_with_config();
565                    join_set.spawn(async move { executor.execute_task(&name, &task).await });
566
567                    if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
568                        break;
569                    }
570                }
571
572                if let Some(result) = join_set.join_next().await {
573                    match result {
574                        Ok(Ok(task_result)) => {
575                            if !task_result.success {
576                                join_set.abort_all();
577                                let message = format!(
578                                    "Task graph execution halted.\n\n{}",
579                                    summarize_task_failure(
580                                        &task_result,
581                                        TASK_FAILURE_SNIPPET_LINES,
582                                    )
583                                );
584                                return Err(Error::configuration(message));
585                            }
586                            all_results.push(task_result);
587                        }
588                        Ok(Err(e)) => {
589                            join_set.abort_all();
590                            return Err(e);
591                        }
592                        Err(e) => {
593                            join_set.abort_all();
594                            return Err(Error::configuration(format!(
595                                "Task execution panicked: {}",
596                                e
597                            )));
598                        }
599                    }
600                }
601            }
602        }
603
604        Ok(all_results)
605    }
606
607    fn clone_with_config(&self) -> Self {
608        // Share the backend across clones to preserve container cache for Dagger chaining
609        Self::with_shared_backend(self.config.clone(), self.backend.clone())
610    }
611}
612
613fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
614    let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
615
616    loop {
617        let is_root = match manager {
618            PackageManager::Npm
619            | PackageManager::Bun
620            | PackageManager::YarnClassic
621            | PackageManager::YarnModern => package_json_has_workspaces(&current),
622            PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
623            PackageManager::Cargo => cargo_toml_has_workspace(&current),
624            PackageManager::Deno => deno_json_has_workspace(&current),
625        };
626
627        if is_root {
628            return current;
629        }
630
631        if let Some(parent) = current.parent() {
632            current = parent.to_path_buf();
633        } else {
634            return start.to_path_buf();
635        }
636    }
637}
638
639fn package_json_has_workspaces(dir: &Path) -> bool {
640    let path = dir.join("package.json");
641    let content = std::fs::read_to_string(&path);
642    let Ok(json) = content.and_then(|s| {
643        serde_json::from_str::<serde_json::Value>(&s)
644            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
645    }) else {
646        return false;
647    };
648
649    match json.get("workspaces") {
650        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
651        Some(serde_json::Value::Object(map)) => map
652            .get("packages")
653            .and_then(|packages| packages.as_array())
654            .map(|arr| !arr.is_empty())
655            .unwrap_or(false),
656        _ => false,
657    }
658}
659
660fn cargo_toml_has_workspace(dir: &Path) -> bool {
661    let path = dir.join("Cargo.toml");
662    let Ok(content) = std::fs::read_to_string(&path) else {
663        return false;
664    };
665
666    content.contains("[workspace]")
667}
668
669fn deno_json_has_workspace(dir: &Path) -> bool {
670    let path = dir.join("deno.json");
671    let content = std::fs::read_to_string(&path);
672    let Ok(json) = content.and_then(|s| {
673        serde_json::from_str::<serde_json::Value>(&s)
674            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
675    }) else {
676        return false;
677    };
678
679    // Deno uses "workspace" (not "workspaces") for workspace configuration
680    match json.get("workspace") {
681        Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
682        Some(serde_json::Value::Object(_)) => true,
683        _ => false,
684    }
685}
686
687/// Build a compact, user-friendly summary for a failed task, including the
688/// exit code and the tail of stdout/stderr to help with diagnostics.
689pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
690    let exit_code = result
691        .exit_code
692        .map(|c| c.to_string())
693        .unwrap_or_else(|| "unknown".to_string());
694
695    let mut sections = Vec::new();
696    sections.push(format!(
697        "Task '{}' failed with exit code {}.",
698        result.name, exit_code
699    ));
700
701    let output = format_failure_streams(result, max_output_lines);
702    if output.is_empty() {
703        sections.push(
704            "No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
705                .to_string(),
706        );
707    } else {
708        sections.push(output);
709    }
710
711    sections.join("\n\n")
712}
713
714fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
715    let mut streams = Vec::new();
716
717    if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
718        streams.push(stdout);
719    }
720
721    if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
722        streams.push(stderr);
723    }
724
725    streams.join("\n\n")
726}
727
728fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
729    let normalized = content.trim_end();
730    if normalized.is_empty() {
731        return None;
732    }
733
734    let lines: Vec<&str> = normalized.lines().collect();
735    let total = lines.len();
736    let start = total.saturating_sub(max_output_lines);
737    let snippet = lines[start..].join("\n");
738
739    let header = if total > max_output_lines {
740        format!("{label} (last {max_output_lines} of {total} lines):")
741    } else {
742        format!("{label}:")
743    };
744
745    Some(format!("{header}\n{snippet}"))
746}
747
748/// Execute an arbitrary command with the cuenv environment
749///
750/// If `secrets` is provided, output will be captured and redacted before printing.
751pub async fn execute_command(
752    command: &str,
753    args: &[String],
754    environment: &Environment,
755) -> Result<i32> {
756    execute_command_with_redaction(command, args, environment, &[]).await
757}
758
759/// Execute a command with secret redaction
760///
761/// Secret values in stdout/stderr are replaced with [REDACTED].
762pub async fn execute_command_with_redaction(
763    command: &str,
764    args: &[String],
765    environment: &Environment,
766    secrets: &[String],
767) -> Result<i32> {
768    use tokio::io::{AsyncBufReadExt, BufReader};
769
770    tracing::info!("Executing command: {} {:?}", command, args);
771    let mut cmd = Command::new(command);
772    cmd.args(args);
773    let env_vars = environment.merge_with_system();
774    for (key, value) in env_vars {
775        cmd.env(key, value);
776    }
777
778    if secrets.is_empty() {
779        // No secrets to redact - inherit stdio directly
780        cmd.stdout(Stdio::inherit());
781        cmd.stderr(Stdio::inherit());
782        cmd.stdin(Stdio::inherit());
783        let status = cmd.status().await.map_err(|e| {
784            Error::configuration(format!("Failed to execute command '{}': {}", command, e))
785        })?;
786        return Ok(status.code().unwrap_or(1));
787    }
788
789    // Capture output for redaction
790    cmd.stdout(Stdio::piped());
791    cmd.stderr(Stdio::piped());
792    cmd.stdin(Stdio::inherit());
793
794    let mut child = cmd.spawn().map_err(|e| {
795        Error::configuration(format!("Failed to execute command '{}': {}", command, e))
796    })?;
797
798    let stdout = child
799        .stdout
800        .take()
801        .ok_or_else(|| Error::execution("stdout pipe not available"))?;
802    let stderr = child
803        .stderr
804        .take()
805        .ok_or_else(|| Error::execution("stderr pipe not available"))?;
806
807    // Build sorted secrets for greedy matching (longer first)
808    let mut sorted_secrets: Vec<&str> = secrets.iter().map(String::as_str).collect();
809    sorted_secrets.sort_by_key(|s| std::cmp::Reverse(s.len()));
810    let sorted_secrets: Vec<String> = sorted_secrets.into_iter().map(String::from).collect();
811
812    // Stream stdout with redaction
813    let secrets_clone = sorted_secrets.clone();
814    let stdout_task = tokio::spawn(async move {
815        let reader = BufReader::new(stdout);
816        let mut lines = reader.lines();
817        while let Ok(Some(line)) = lines.next_line().await {
818            let mut redacted = line;
819            for secret in &secrets_clone {
820                if secret.len() >= 4 {
821                    redacted = redacted.replace(secret, "[REDACTED]");
822                }
823            }
824            cuenv_events::emit_stdout!(&redacted);
825        }
826    });
827
828    // Stream stderr with redaction
829    let stderr_task = tokio::spawn(async move {
830        let reader = BufReader::new(stderr);
831        let mut lines = reader.lines();
832        while let Ok(Some(line)) = lines.next_line().await {
833            let mut redacted = line;
834            for secret in &sorted_secrets {
835                if secret.len() >= 4 {
836                    redacted = redacted.replace(secret, "[REDACTED]");
837                }
838            }
839            cuenv_events::emit_stderr!(&redacted);
840        }
841    });
842
843    // Wait for command and streams
844    let status = child.wait().await.map_err(|e| {
845        Error::configuration(format!("Failed to wait for command '{}': {}", command, e))
846    })?;
847
848    let _ = stdout_task.await;
849    let _ = stderr_task.await;
850
851    Ok(status.code().unwrap_or(1))
852}
853
854#[cfg(test)]
855mod tests {
856    use super::*;
857    use crate::tasks::Input;
858    use std::fs;
859    use tempfile::TempDir;
860
861    #[tokio::test]
862    async fn test_executor_config_default() {
863        let config = ExecutorConfig::default();
864        assert!(!config.capture_output);
865        assert_eq!(config.max_parallel, 0);
866        assert!(config.environment.is_empty());
867    }
868
869    #[tokio::test]
870    async fn test_task_result() {
871        let result = TaskResult {
872            name: "test".to_string(),
873            exit_code: Some(0),
874            stdout: "output".to_string(),
875            stderr: String::new(),
876            success: true,
877        };
878        assert_eq!(result.name, "test");
879        assert_eq!(result.exit_code, Some(0));
880        assert!(result.success);
881        assert_eq!(result.stdout, "output");
882    }
883
884    #[tokio::test]
885    async fn test_execute_simple_task() {
886        let config = ExecutorConfig {
887            capture_output: true,
888            ..Default::default()
889        };
890        let executor = TaskExecutor::new(config);
891        let task = Task {
892            command: "echo".to_string(),
893            args: vec!["hello".to_string()],
894            description: Some("Hello task".to_string()),
895            ..Default::default()
896        };
897        let result = executor.execute_task("test", &task).await.unwrap();
898        assert!(result.success);
899        assert_eq!(result.exit_code, Some(0));
900        assert!(result.stdout.contains("hello"));
901    }
902
903    #[tokio::test]
904    async fn test_execute_with_environment() {
905        let mut config = ExecutorConfig {
906            capture_output: true,
907            ..Default::default()
908        };
909        config
910            .environment
911            .set("TEST_VAR".to_string(), "test_value".to_string());
912        let executor = TaskExecutor::new(config);
913        let task = Task {
914            command: "printenv".to_string(),
915            args: vec!["TEST_VAR".to_string()],
916            description: Some("Print env task".to_string()),
917            ..Default::default()
918        };
919        let result = executor.execute_task("test", &task).await.unwrap();
920        assert!(result.success);
921        assert!(result.stdout.contains("test_value"));
922    }
923
924    #[tokio::test]
925    async fn test_workspace_inputs_include_workspace_root_when_project_is_nested() {
926        let tmp = TempDir::new().unwrap();
927        let root = tmp.path();
928
929        // Workspace root with workspaces + lockfile
930        fs::write(
931            root.join("package.json"),
932            r#"{
933  "name": "root-app",
934  "version": "0.0.0",
935  "workspaces": ["packages/*", "apps/*"],
936  "dependencies": {
937    "@rawkodeacademy/content-technologies": "workspace:*"
938  }
939}"#,
940        )
941        .unwrap();
942        // Deliberately omit the workspace member name for apps/site to mimic lockfiles
943        // that only record member paths, ensuring we can still discover dependencies.
944        fs::write(
945            root.join("bun.lock"),
946            r#"{
947  "lockfileVersion": 1,
948  "workspaces": {
949    "": {
950      "name": "root-app",
951      "dependencies": {
952        "@rawkodeacademy/content-technologies": "workspace:*"
953      }
954    },
955    "packages/content-technologies": {
956      "name": "@rawkodeacademy/content-technologies",
957      "version": "0.0.1"
958    },
959    "apps/site": {
960      "version": "0.0.0",
961      "dependencies": {
962        "@rawkodeacademy/content-technologies": "workspace:*"
963      }
964    }
965  },
966  "packages": {}
967}"#,
968        )
969        .unwrap();
970
971        // Workspace member packages
972        fs::create_dir_all(root.join("packages/content-technologies")).unwrap();
973        fs::write(
974            root.join("packages/content-technologies/package.json"),
975            r#"{
976  "name": "@rawkodeacademy/content-technologies",
977  "version": "0.0.1"
978}"#,
979        )
980        .unwrap();
981
982        fs::create_dir_all(root.join("apps/site")).unwrap();
983        fs::write(
984            root.join("apps/site/package.json"),
985            r#"{
986  "name": "site",
987  "version": "0.0.0",
988  "dependencies": {
989    "@rawkodeacademy/content-technologies": "workspace:*"
990  }
991}"#,
992        )
993        .unwrap();
994
995        let mut workspaces = HashMap::new();
996        workspaces.insert(
997            "bun".to_string(),
998            WorkspaceConfig {
999                enabled: true,
1000                package_manager: Some("bun".to_string()),
1001                root: None,
1002                hooks: None,
1003            },
1004        );
1005
1006        let config = ExecutorConfig {
1007            capture_output: true,
1008            project_root: root.join("apps/site"),
1009            workspaces: Some(workspaces),
1010            ..Default::default()
1011        };
1012        let executor = TaskExecutor::new(config);
1013
1014        let task = Task {
1015            command: "sh".to_string(),
1016            args: vec![
1017                "-c".to_string(),
1018                "find ../.. -maxdepth 4 -type d | sort".to_string(),
1019            ],
1020            inputs: vec![Input::Path("package.json".to_string())],
1021            workspaces: vec!["bun".to_string()],
1022            ..Default::default()
1023        };
1024
1025        let result = executor.execute_task("install", &task).await.unwrap();
1026        assert!(
1027            result.success,
1028            "command failed stdout='{}' stderr='{}'",
1029            result.stdout, result.stderr
1030        );
1031        assert!(
1032            result
1033                .stdout
1034                .split_whitespace()
1035                .any(|line| line.ends_with("packages/content-technologies")),
1036            "should include workspace member from workspace root; stdout='{}' stderr='{}'",
1037            result.stdout,
1038            result.stderr
1039        );
1040    }
1041
1042    #[tokio::test]
1043    async fn test_execute_failing_task() {
1044        let config = ExecutorConfig {
1045            capture_output: true,
1046            ..Default::default()
1047        };
1048        let executor = TaskExecutor::new(config);
1049        let task = Task {
1050            command: "false".to_string(),
1051            description: Some("Failing task".to_string()),
1052            ..Default::default()
1053        };
1054        let result = executor.execute_task("test", &task).await.unwrap();
1055        assert!(!result.success);
1056        assert_eq!(result.exit_code, Some(1));
1057    }
1058
1059    #[tokio::test]
1060    async fn test_execute_sequential_group() {
1061        let config = ExecutorConfig {
1062            capture_output: true,
1063            ..Default::default()
1064        };
1065        let executor = TaskExecutor::new(config);
1066        let task1 = Task {
1067            command: "echo".to_string(),
1068            args: vec!["first".to_string()],
1069            description: Some("First task".to_string()),
1070            ..Default::default()
1071        };
1072        let task2 = Task {
1073            command: "echo".to_string(),
1074            args: vec!["second".to_string()],
1075            description: Some("Second task".to_string()),
1076            ..Default::default()
1077        };
1078        let group = TaskGroup::Sequential(vec![
1079            TaskDefinition::Single(Box::new(task1)),
1080            TaskDefinition::Single(Box::new(task2)),
1081        ]);
1082        let all_tasks = Tasks::new();
1083        let results = executor
1084            .execute_group("seq", &group, &all_tasks)
1085            .await
1086            .unwrap();
1087        assert_eq!(results.len(), 2);
1088        assert!(results[0].stdout.contains("first"));
1089        assert!(results[1].stdout.contains("second"));
1090    }
1091
1092    #[tokio::test]
1093    async fn test_command_injection_prevention() {
1094        let config = ExecutorConfig {
1095            capture_output: true,
1096            ..Default::default()
1097        };
1098        let executor = TaskExecutor::new(config);
1099        let malicious_task = Task {
1100            command: "echo".to_string(),
1101            args: vec!["hello".to_string(), "; rm -rf /".to_string()],
1102            description: Some("Malicious task test".to_string()),
1103            ..Default::default()
1104        };
1105        let result = executor
1106            .execute_task("malicious", &malicious_task)
1107            .await
1108            .unwrap();
1109        assert!(result.success);
1110        assert!(result.stdout.contains("hello ; rm -rf /"));
1111    }
1112
1113    #[tokio::test]
1114    async fn test_special_characters_in_args() {
1115        let config = ExecutorConfig {
1116            capture_output: true,
1117            ..Default::default()
1118        };
1119        let executor = TaskExecutor::new(config);
1120        let special_chars = vec![
1121            "$USER",
1122            "$(whoami)",
1123            "`whoami`",
1124            "&& echo hacked",
1125            "|| echo failed",
1126            "> /tmp/hack",
1127            "| cat",
1128        ];
1129        for special_arg in special_chars {
1130            let task = Task {
1131                command: "echo".to_string(),
1132                args: vec!["safe".to_string(), special_arg.to_string()],
1133                description: Some("Special character test".to_string()),
1134                ..Default::default()
1135            };
1136            let result = executor.execute_task("special", &task).await.unwrap();
1137            assert!(result.success);
1138            assert!(result.stdout.contains("safe"));
1139            assert!(result.stdout.contains(special_arg));
1140        }
1141    }
1142
1143    #[tokio::test]
1144    async fn test_environment_variable_safety() {
1145        let mut config = ExecutorConfig {
1146            capture_output: true,
1147            ..Default::default()
1148        };
1149        config
1150            .environment
1151            .set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
1152        let executor = TaskExecutor::new(config);
1153        let task = Task {
1154            command: "printenv".to_string(),
1155            args: vec!["DANGEROUS_VAR".to_string()],
1156            description: Some("Environment variable safety test".to_string()),
1157            ..Default::default()
1158        };
1159        let result = executor.execute_task("env_test", &task).await.unwrap();
1160        assert!(result.success);
1161        assert!(result.stdout.contains("; rm -rf /"));
1162    }
1163
1164    #[tokio::test]
1165    async fn test_execute_graph_parallel_groups() {
1166        // two independent tasks -> can run in same parallel group
1167        let config = ExecutorConfig {
1168            capture_output: true,
1169            max_parallel: 2,
1170            ..Default::default()
1171        };
1172        let executor = TaskExecutor::new(config);
1173        let mut graph = TaskGraph::new();
1174
1175        let t1 = Task {
1176            command: "echo".into(),
1177            args: vec!["A".into()],
1178            ..Default::default()
1179        };
1180        let t2 = Task {
1181            command: "echo".into(),
1182            args: vec!["B".into()],
1183            ..Default::default()
1184        };
1185
1186        graph.add_task("t1", t1).unwrap();
1187        graph.add_task("t2", t2).unwrap();
1188        let results = executor.execute_graph(&graph).await.unwrap();
1189        assert_eq!(results.len(), 2);
1190        let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
1191        assert!(joined.contains("A") && joined.contains("B"));
1192    }
1193
1194    #[tokio::test]
1195    async fn test_execute_graph_respects_dependency_levels() {
1196        let tmp = TempDir::new().unwrap();
1197        let root = tmp.path();
1198
1199        let config = ExecutorConfig {
1200            capture_output: true,
1201            max_parallel: 2,
1202            project_root: root.to_path_buf(),
1203            ..Default::default()
1204        };
1205        let executor = TaskExecutor::new(config);
1206
1207        let mut tasks = Tasks::new();
1208        tasks.tasks.insert(
1209            "dep".into(),
1210            TaskDefinition::Single(Box::new(Task {
1211                command: "sh".into(),
1212                args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
1213                ..Default::default()
1214            })),
1215        );
1216        tasks.tasks.insert(
1217            "consumer".into(),
1218            TaskDefinition::Single(Box::new(Task {
1219                command: "sh".into(),
1220                args: vec!["-c".into(), "cat marker.txt".into()],
1221                depends_on: vec!["dep".into()],
1222                ..Default::default()
1223            })),
1224        );
1225
1226        let mut graph = TaskGraph::new();
1227        graph.build_for_task("consumer", &tasks).unwrap();
1228
1229        let results = executor.execute_graph(&graph).await.unwrap();
1230        assert_eq!(results.len(), 2);
1231
1232        let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
1233        assert!(consumer.success);
1234        assert!(consumer.stdout.contains("ok"));
1235    }
1236}