use super::backend::{BackendFactory, TaskBackend, create_backend_with_factory};
use super::cache::{BuildActionInput, RecordInput, TaskCacheConfig};
use super::process_registry::global_registry;
use super::{Task, TaskGraph, TaskGroup, TaskNode, Tasks};
use crate::OutputCapture;
use crate::config::BackendConfig;
use crate::environment::Environment;
use crate::{Error, Result};
use async_recursion::async_recursion;
use cuenv_workspaces::PackageManager;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use tokio::process::Command;
use tokio::task::JoinSet;
use tracing::instrument;
#[cfg(unix)]
#[allow(unused_imports)]
use std::os::unix::process::CommandExt;
#[cfg(unix)]
fn setup_process_group(cmd: &mut Command) {
#[expect(unsafe_code, reason = "Required for POSIX process group management")]
unsafe {
cmd.pre_exec(|| {
libc::setpgid(0, 0);
Ok(())
});
}
}
#[derive(Debug, Clone)]
pub struct TaskResult {
pub name: String,
pub exit_code: Option<i32>,
pub stdout: String,
pub stderr: String,
pub success: bool,
}
pub const TASK_FAILURE_SNIPPET_LINES: usize = 20;
#[derive(Debug, Clone)]
pub struct ExecutorConfig {
pub capture_output: OutputCapture,
pub max_parallel: usize,
pub environment: Environment,
pub working_dir: Option<PathBuf>,
pub project_root: PathBuf,
pub cue_module_root: Option<PathBuf>,
pub materialize_outputs: Option<PathBuf>,
pub cache_dir: Option<PathBuf>,
pub show_cache_path: bool,
pub backend_config: Option<BackendConfig>,
pub cli_backend: Option<String>,
pub cache: Option<TaskCacheConfig>,
}
impl Default for ExecutorConfig {
fn default() -> Self {
Self {
capture_output: OutputCapture::Capture,
max_parallel: 0,
environment: Environment::new(),
working_dir: None,
project_root: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
cue_module_root: None,
materialize_outputs: None,
cache_dir: None,
show_cache_path: false,
backend_config: None,
cli_backend: None,
cache: None,
}
}
}
pub struct TaskExecutor {
config: ExecutorConfig,
backend: Arc<dyn TaskBackend>,
}
impl TaskExecutor {
pub fn new(config: ExecutorConfig) -> Self {
Self::with_dagger_factory(config, None)
}
pub fn with_dagger_factory(
config: ExecutorConfig,
dagger_factory: Option<BackendFactory>,
) -> Self {
let backend = create_backend_with_factory(
config.backend_config.as_ref(),
config.project_root.clone(),
config.cli_backend.as_deref(),
dagger_factory,
);
Self { config, backend }
}
fn with_shared_backend(config: ExecutorConfig, backend: Arc<dyn TaskBackend>) -> Self {
Self { config, backend }
}
#[instrument(name = "execute_task", skip(self, task), fields(task_name = %name))]
pub async fn execute_task(&self, name: &str, task: &Task) -> Result<TaskResult> {
let cache_handle: Option<(TaskCacheConfig, cuenv_cas::Digest, PathBuf)> =
if let Some(cache) = self.config.cache.clone() {
let workdir = self.workdir_for_task(task);
match super::cache::build_action(BuildActionInput {
task,
task_name: name,
environment: &self.config.environment,
cache: &cache,
workdir: &workdir,
project_root: self.project_root_for_task(task),
module_root: self
.config
.cue_module_root
.as_deref()
.unwrap_or(&self.config.project_root),
})
.await?
{
Some((_, action_digest)) => {
if let Some(cached) = super::cache::lookup(&cache, &action_digest, task)? {
tracing::debug!(task = %name, "action cache hit");
return self.return_cache_hit(CacheHitInput {
name,
task,
cache: &cache,
workdir: &workdir,
cached: &cached,
});
}
tracing::debug!(task = %name, "action cache miss");
Some((cache, action_digest, workdir))
}
None => None,
}
} else {
None
};
let start = std::time::Instant::now();
let result = if self.backend.name() == "dagger" {
let ctx = super::backend::TaskExecutionContext {
name,
task,
environment: &self.config.environment,
project_root: &self.config.project_root,
capture_output: self.config.capture_output,
};
self.backend.execute(&ctx).await?
} else {
self.execute_task_non_hermetic(name, task).await?
};
let duration_ms = start.elapsed().as_millis();
if let Some((cache, action_digest, workdir)) = cache_handle
&& super::cache::effective_policy(task).mode.allows_write()
&& result.exit_code == Some(0)
&& let Err(e) = super::cache::record(RecordInput {
cache: &cache,
action_digest: &action_digest,
workdir: &workdir,
task,
stdout: &result.stdout,
stderr: &result.stderr,
exit_code: 0,
duration_ms,
})
{
tracing::warn!(task = %name, error = %e, "cache write failed");
}
Ok(result)
}
fn return_cache_hit(&self, input: CacheHitInput<'_>) -> Result<TaskResult> {
let CacheHitInput {
name,
task,
cache,
workdir,
cached,
} = input;
let (stdout, stderr, exit_code) = super::cache::materialize_hit(cache, workdir, cached)?;
let success = exit_code == 0;
let cmd_str = if let Some(script) = &task.script {
format!("[script: {} bytes] (cached)", script.len())
} else if task.command.is_empty() {
format!("{} (cached)", task.args.join(" "))
} else {
format!("{} {} (cached)", task.command, task.args.join(" "))
};
cuenv_events::emit_task_started!(name, cmd_str, false);
emit_cached_output_events(name, "stdout", &stdout);
emit_cached_output_events(name, "stderr", &stderr);
cuenv_events::emit_task_completed!(
name,
success,
exit_code,
cached.execution_metadata.duration_ms
);
Ok(TaskResult {
name: name.to_string(),
exit_code: Some(exit_code),
stdout,
stderr,
success,
})
}
fn workdir_for_task(&self, task: &Task) -> PathBuf {
if let Some(ref dir) = task.directory {
self.config
.cue_module_root
.as_ref()
.unwrap_or(&self.config.project_root)
.join(dir)
} else if let Some(ref project_root) = task.project_root {
project_root.clone()
} else if let Some(ref source) = task.source {
if let Some(dir) = source.directory() {
self.config
.cue_module_root
.as_ref()
.unwrap_or(&self.config.project_root)
.join(dir)
} else if let Some(ref project_root) = task.project_root {
project_root.clone()
} else {
self.config
.cue_module_root
.clone()
.unwrap_or_else(|| self.config.project_root.clone())
}
} else if !task.hermetic {
if let Some(manager) = cuenv_workspaces::detect_from_command(&task.command) {
find_workspace_root(manager, &self.config.project_root)
} else {
self.config.project_root.clone()
}
} else {
self.config.project_root.clone()
}
}
fn project_root_for_task<'a>(&'a self, task: &'a Task) -> &'a Path {
task.project_root
.as_deref()
.unwrap_or(&self.config.project_root)
}
async fn execute_task_non_hermetic(&self, name: &str, task: &Task) -> Result<TaskResult> {
if task.is_task_ref() && task.project_root.is_none() {
return Err(Error::configuration(format!(
"Task '{}' references another project's task ({}) but the reference could not be resolved.\n\
This usually means:\n\
- The referenced project doesn't exist or has no 'name' field in env.cue\n\
- The referenced task '{}' doesn't exist in that project\n\
- There was an error loading the referenced project's env.cue\n\
Run with RUST_LOG=debug for more details.",
name,
task.task_ref.as_deref().unwrap_or("unknown"),
task.task_ref
.as_deref()
.and_then(|r| r.split(':').next_back())
.unwrap_or("unknown")
)));
}
let workdir = self.workdir_for_task(task);
tracing::info!(
task = %name,
workdir = %workdir.display(),
hermetic = false,
"Executing non-hermetic task"
);
let cmd_str = if let Some(script) = &task.script {
format!("[script: {} bytes]", script.len())
} else if task.command.is_empty() {
task.args.join(" ")
} else {
format!("{} {}", task.command, task.args.join(" "))
};
cuenv_events::emit_task_started!(name, cmd_str, false);
let command_spec =
task.command_spec(|command| self.config.environment.resolve_command(command))?;
let mut cmd = Command::new(&command_spec.program);
cmd.args(&command_spec.args);
cmd.current_dir(&workdir);
let env_vars = self.config.environment.merge_with_system_hermetic();
for (k, v) in &env_vars {
cmd.env(k, v);
}
for (key, value) in &task.env {
if let Some(s) = value.as_str() {
if let Some(host_var) = super::output_refs::parse_passthrough(s) {
if let Ok(host_val) = std::env::var(host_var) {
cmd.env(key, host_val);
}
} else if !s.starts_with("cuenv:ref:") {
cmd.env(key, s);
}
} else if let Some(n) = value.as_i64() {
cmd.env(key, n.to_string());
} else if let Some(b) = value.as_bool() {
cmd.env(key, b.to_string());
}
}
if !env_vars.contains_key("FORCE_COLOR") {
cmd.env("FORCE_COLOR", "1");
}
if !env_vars.contains_key("CLICOLOR_FORCE") {
cmd.env("CLICOLOR_FORCE", "1");
}
if self.config.capture_output.should_capture() {
use tokio::io::{AsyncBufReadExt, BufReader};
let start_time = std::time::Instant::now();
#[cfg(unix)]
setup_process_group(&mut cmd);
let mut child = cmd
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| Error::Io {
source: e,
path: None,
operation: format!("spawn task {}", name),
})?;
let child_pid = child.id();
if let Some(pid) = child_pid {
global_registry().register(pid, name.to_string()).await;
}
let stdout_handle = child.stdout.take();
let stderr_handle = child.stderr.take();
let mut stdout_lines = Vec::new();
let mut stderr_lines = Vec::new();
let name_for_stdout = name.to_string();
let stdout_task = tokio::spawn(async move {
let mut lines = Vec::new();
if let Some(stdout) = stdout_handle {
let mut reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = reader.next_line().await {
cuenv_events::emit_task_output!(name_for_stdout, "stdout", line);
lines.push(line);
}
}
lines
});
let name_for_stderr = name.to_string();
let stderr_task = tokio::spawn(async move {
let mut lines = Vec::new();
if let Some(stderr) = stderr_handle {
let mut reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = reader.next_line().await {
cuenv_events::emit_task_output!(name_for_stderr, "stderr", line);
lines.push(line);
}
}
lines
});
let status = child.wait().await.map_err(|e| Error::Io {
source: e,
path: None,
operation: format!("wait for task {}", name),
})?;
if let Some(pid) = child_pid {
global_registry().unregister(pid).await;
}
if let Ok(lines) = stdout_task.await {
stdout_lines = lines;
}
if let Ok(lines) = stderr_task.await {
stderr_lines = lines;
}
let duration_ms = start_time.elapsed().as_millis() as u64;
let stdout = stdout_lines.join("\n");
let stderr = stderr_lines.join("\n");
let exit_code = status.code().unwrap_or(-1);
let success = status.success();
cuenv_events::emit_task_completed!(name, success, exit_code, duration_ms);
if !success {
tracing::warn!(task = %name, exit = exit_code, "Task failed");
tracing::error!(task = %name, "Task stdout:\n{}", stdout);
tracing::error!(task = %name, "Task stderr:\n{}", stderr);
}
Ok(TaskResult {
name: name.to_string(),
exit_code: Some(exit_code),
stdout,
stderr,
success,
})
} else {
#[cfg(unix)]
setup_process_group(&mut cmd);
let mut child = cmd
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.stdin(Stdio::inherit())
.spawn()
.map_err(|e| Error::Io {
source: e,
path: None,
operation: format!("spawn task {}", name),
})?;
let child_pid = child.id();
if let Some(pid) = child_pid {
global_registry().register(pid, name.to_string()).await;
}
let status = child.wait().await.map_err(|e| Error::Io {
source: e,
path: None,
operation: format!("wait for task {}", name),
})?;
if let Some(pid) = child_pid {
global_registry().unregister(pid).await;
}
let exit_code = status.code().unwrap_or(-1);
let success = status.success();
if !success {
tracing::warn!(task = %name, exit = exit_code, "Task failed");
}
Ok(TaskResult {
name: name.to_string(),
exit_code: Some(exit_code),
stdout: String::new(), stderr: String::new(),
success,
})
}
}
#[async_recursion]
pub async fn execute_node(
&self,
name: &str,
node: &TaskNode,
all_tasks: &Tasks,
) -> Result<Vec<TaskResult>> {
match node {
TaskNode::Task(task) => {
let result = self.execute_task(name, task.as_ref()).await?;
Ok(vec![result])
}
TaskNode::Group(group) => self.execute_parallel(name, group, all_tasks).await,
TaskNode::Sequence(seq) => self.execute_sequential(name, seq, all_tasks).await,
}
}
#[async_recursion]
pub async fn execute_definition(
&self,
name: &str,
node: &TaskNode,
all_tasks: &Tasks,
) -> Result<Vec<TaskResult>> {
self.execute_node(name, node, all_tasks).await
}
async fn execute_sequential(
&self,
prefix: &str,
sequence: &[TaskNode],
all_tasks: &Tasks,
) -> Result<Vec<TaskResult>> {
if !self.config.capture_output.should_capture() {
cuenv_events::emit_task_group_started!(prefix, true, sequence.len());
}
let mut results = Vec::new();
let mut seq_results: std::collections::HashMap<String, TaskResult> =
std::collections::HashMap::new();
for (i, step) in sequence.iter().enumerate() {
let task_name = format!("{}[{}]", prefix, i);
let step = if let TaskNode::Task(task) = step
&& super::output_refs::has_output_refs(&task.args, &task.env)
{
let mut resolved_task = (**task).clone();
let resolver = super::output_refs::OutputRefResolver {
task_name: &task_name,
results: &seq_results,
};
resolver.resolve(&mut resolved_task.args, &mut resolved_task.env)?;
TaskNode::Task(Box::new(resolved_task))
} else {
step.clone()
};
let task_results = self.execute_node(&task_name, &step, all_tasks).await?;
for result in &task_results {
if !result.success {
return Err(Error::task_failed(
&result.name,
result.exit_code.unwrap_or(-1),
&result.stdout,
&result.stderr,
));
}
seq_results.insert(result.name.clone(), result.clone());
}
results.extend(task_results);
}
Ok(results)
}
async fn execute_parallel(
&self,
prefix: &str,
group: &TaskGroup,
all_tasks: &Tasks,
) -> Result<Vec<TaskResult>> {
if let Some(default_task) = group.children.get("default") {
if !self.config.capture_output.should_capture() {
cuenv_events::emit_task_group_started!(prefix, true, 1_usize);
}
let task_name = format!("{}.default", prefix);
return self.execute_node(&task_name, default_task, all_tasks).await;
}
if !self.config.capture_output.should_capture() {
cuenv_events::emit_task_group_started!(prefix, false, group.children.len());
}
let mut join_set = JoinSet::new();
let all_tasks = Arc::new(all_tasks.clone());
let mut all_results = Vec::new();
let mut merge_results = |results: Vec<TaskResult>| -> Result<()> {
if let Some(failed) = results.iter().find(|r| !r.success) {
return Err(Error::task_failed(
&failed.name,
failed.exit_code.unwrap_or(-1),
&failed.stdout,
&failed.stderr,
));
}
all_results.extend(results);
Ok(())
};
for (name, child_node) in &group.children {
let task_name = format!("{}.{}", prefix, name);
let child_node = child_node.clone();
let all_tasks = Arc::clone(&all_tasks);
let executor = self.clone_with_config();
join_set.spawn(async move {
executor
.execute_node(&task_name, &child_node, &all_tasks)
.await
});
if self.config.max_parallel > 0
&& join_set.len() >= self.config.max_parallel
&& let Some(result) = join_set.join_next().await
{
match result {
Ok(Ok(results)) => merge_results(results)?,
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(Error::execution(format!("Task execution panicked: {}", e)));
}
}
}
}
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(results)) => merge_results(results)?,
Ok(Err(e)) => return Err(e),
Err(e) => {
return Err(Error::execution(format!("Task execution panicked: {}", e)));
}
}
}
Ok(all_results)
}
#[instrument(name = "execute_graph", skip(self, graph), fields(task_count = graph.task_count()))]
pub async fn execute_graph(&self, graph: &TaskGraph) -> Result<Vec<TaskResult>> {
let parallel_groups = graph.get_parallel_groups()?;
let mut all_results = Vec::new();
let mut results_map: std::collections::HashMap<String, TaskResult> =
std::collections::HashMap::new();
for mut group in parallel_groups {
let mut join_set = JoinSet::new();
while !group.is_empty() || !join_set.is_empty() {
while let Some(node) = group.pop() {
let mut task = node.task.clone();
let name = node.name.clone();
let resolver = super::output_refs::OutputRefResolver {
task_name: &name,
results: &results_map,
};
resolver.resolve(&mut task.args, &mut task.env)?;
let executor = self.clone_with_config();
join_set.spawn(async move { executor.execute_task(&name, &task).await });
if self.config.max_parallel > 0 && join_set.len() >= self.config.max_parallel {
break;
}
}
if let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(task_result)) => {
if !task_result.success {
join_set.abort_all();
return Err(Error::task_failed(
&task_result.name,
task_result.exit_code.unwrap_or(-1),
&task_result.stdout,
&task_result.stderr,
));
}
results_map.insert(task_result.name.clone(), task_result.clone());
all_results.push(task_result);
}
Ok(Err(e)) => {
join_set.abort_all();
return Err(e);
}
Err(e) => {
join_set.abort_all();
return Err(Error::execution(format!(
"Task execution panicked: {}",
e
)));
}
}
}
}
}
Ok(all_results)
}
fn clone_with_config(&self) -> Self {
Self::with_shared_backend(self.config.clone(), self.backend.clone())
}
}
struct CacheHitInput<'a> {
name: &'a str,
task: &'a Task,
cache: &'a TaskCacheConfig,
workdir: &'a Path,
cached: &'a cuenv_cas::ActionResult,
}
fn emit_cached_output_events(name: &str, stream: &'static str, content: &str) {
for line in content.lines() {
cuenv_events::emit_task_output!(name, stream, line);
}
}
fn find_workspace_root(manager: PackageManager, start: &Path) -> PathBuf {
let mut current = start.canonicalize().unwrap_or_else(|_| start.to_path_buf());
loop {
let is_root = match manager {
PackageManager::Npm
| PackageManager::Bun
| PackageManager::YarnClassic
| PackageManager::YarnModern => package_json_has_workspaces(¤t),
PackageManager::Pnpm => current.join("pnpm-workspace.yaml").exists(),
PackageManager::Cargo => cargo_toml_has_workspace(¤t),
PackageManager::Deno => deno_json_has_workspace(¤t),
};
if is_root {
return current;
}
if let Some(parent) = current.parent() {
current = parent.to_path_buf();
} else {
return start.to_path_buf();
}
}
}
fn package_json_has_workspaces(dir: &Path) -> bool {
let path = dir.join("package.json");
let content = std::fs::read_to_string(&path);
let Ok(json) = content.and_then(|s| {
serde_json::from_str::<serde_json::Value>(&s)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}) else {
return false;
};
match json.get("workspaces") {
Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
Some(serde_json::Value::Object(map)) => map
.get("packages")
.and_then(|packages| packages.as_array())
.map(|arr| !arr.is_empty())
.unwrap_or(false),
_ => false,
}
}
fn cargo_toml_has_workspace(dir: &Path) -> bool {
let path = dir.join("Cargo.toml");
let Ok(content) = std::fs::read_to_string(&path) else {
return false;
};
content.contains("[workspace]")
}
fn deno_json_has_workspace(dir: &Path) -> bool {
let path = dir.join("deno.json");
let content = std::fs::read_to_string(&path);
let Ok(json) = content.and_then(|s| {
serde_json::from_str::<serde_json::Value>(&s)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}) else {
return false;
};
match json.get("workspace") {
Some(serde_json::Value::Array(arr)) => !arr.is_empty(),
Some(serde_json::Value::Object(_)) => true,
_ => false,
}
}
pub fn summarize_task_failure(result: &TaskResult, max_output_lines: usize) -> String {
let exit_code = result
.exit_code
.map(|c| c.to_string())
.unwrap_or_else(|| "unknown".to_string());
let mut sections = Vec::new();
sections.push(format!(
"Task '{}' failed with exit code {}.",
result.name, exit_code
));
let output = format_failure_streams(result, max_output_lines);
if output.is_empty() {
sections.push(
"No stdout/stderr were captured; rerun with RUST_LOG=debug to stream task logs."
.to_string(),
);
} else {
sections.push(output);
}
sections.join("\n\n")
}
fn format_failure_streams(result: &TaskResult, max_output_lines: usize) -> String {
let mut streams = Vec::new();
if let Some(stdout) = summarize_stream("stdout", &result.stdout, max_output_lines) {
streams.push(stdout);
}
if let Some(stderr) = summarize_stream("stderr", &result.stderr, max_output_lines) {
streams.push(stderr);
}
streams.join("\n\n")
}
fn summarize_stream(label: &str, content: &str, max_output_lines: usize) -> Option<String> {
let normalized = content.trim_end();
if normalized.is_empty() {
return None;
}
let lines: Vec<&str> = normalized.lines().collect();
let total = lines.len();
let start = total.saturating_sub(max_output_lines);
let snippet = lines[start..].join("\n");
let header = if total > max_output_lines {
format!("{label} (last {max_output_lines} of {total} lines):")
} else {
format!("{label}:")
};
Some(format!("{header}\n{snippet}"))
}
pub async fn execute_command(
command: &str,
args: &[String],
environment: &Environment,
) -> Result<i32> {
execute_command_with_redaction(command, args, environment, &[]).await
}
pub async fn execute_command_with_redaction(
command: &str,
args: &[String],
environment: &Environment,
secrets: &[String],
) -> Result<i32> {
use tokio::io::{AsyncBufReadExt, BufReader};
tracing::info!("Executing command: {} {:?}", command, args);
let mut cmd = Command::new(command);
cmd.args(args);
let env_vars = environment.merge_with_system_hermetic();
for (key, value) in env_vars {
cmd.env(key, value);
}
if secrets.is_empty() {
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::inherit());
cmd.stdin(Stdio::inherit());
let status = cmd.status().await.map_err(|e| {
Error::configuration(format!("Failed to execute command '{}': {}", command, e))
})?;
return Ok(status.code().unwrap_or(1));
}
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.stdin(Stdio::inherit());
let mut child = cmd.spawn().map_err(|e| {
Error::configuration(format!("Failed to execute command '{}': {}", command, e))
})?;
let stdout = child
.stdout
.take()
.ok_or_else(|| Error::execution("stdout pipe not available"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| Error::execution("stderr pipe not available"))?;
let mut sorted_secrets: Vec<&str> = secrets.iter().map(String::as_str).collect();
sorted_secrets.sort_by_key(|s| std::cmp::Reverse(s.len()));
let sorted_secrets: Vec<String> = sorted_secrets.into_iter().map(String::from).collect();
let secrets_clone = sorted_secrets.clone();
let stdout_task = tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let mut redacted = line;
for secret in &secrets_clone {
if secret.len() >= 4 {
redacted = redacted.replace(secret, "[REDACTED]");
}
}
cuenv_events::emit_stdout!(&redacted);
}
});
let stderr_task = tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let mut redacted = line;
for secret in &sorted_secrets {
if secret.len() >= 4 {
redacted = redacted.replace(secret, "[REDACTED]");
}
}
cuenv_events::emit_stderr!(&redacted);
}
});
let status = child.wait().await.map_err(|e| {
Error::configuration(format!("Failed to wait for command '{}': {}", command, e))
})?;
let _ = stdout_task.await;
let _ = stderr_task.await;
Ok(status.code().unwrap_or(1))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tasks::TaskDependency;
use crate::tasks::cache::TaskCacheConfig;
use cuenv_cas::{LocalActionCache, LocalCas};
use cuenv_events::{CuenvEventLayer, EventCategory, TaskEvent};
use cuenv_vcs::WalkHasher;
use tempfile::TempDir;
use tokio::sync::mpsc;
use tracing_subscriber::layer::SubscriberExt;
#[tokio::test]
async fn test_executor_config_default() {
let config = ExecutorConfig::default();
assert!(config.capture_output.should_capture());
assert_eq!(config.max_parallel, 0);
assert!(config.environment.is_empty());
}
#[tokio::test]
async fn test_task_result() {
let result = TaskResult {
name: "test".to_string(),
exit_code: Some(0),
stdout: "output".to_string(),
stderr: String::new(),
success: true,
};
assert_eq!(result.name, "test");
assert_eq!(result.exit_code, Some(0));
assert!(result.success);
assert_eq!(result.stdout, "output");
}
#[tokio::test]
async fn test_execute_simple_task() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let task = Task {
command: "echo".to_string(),
args: vec!["hello".to_string()],
description: Some("Hello task".to_string()),
..Default::default()
};
let result = executor.execute_task("test", &task).await.unwrap();
assert!(result.success);
assert_eq!(result.exit_code, Some(0));
assert!(result.stdout.contains("hello"));
}
#[tokio::test]
async fn test_execute_with_environment() {
let mut config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
config
.environment
.set("TEST_VAR".to_string(), "test_value".to_string());
let executor = TaskExecutor::new(config);
let task = Task {
command: "printenv".to_string(),
args: vec!["TEST_VAR".to_string()],
description: Some("Print env task".to_string()),
..Default::default()
};
let result = executor.execute_task("test", &task).await.unwrap();
assert!(result.success);
assert!(result.stdout.contains("test_value"));
}
#[tokio::test]
async fn test_execute_failing_task() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let task = Task {
command: "false".to_string(),
description: Some("Failing task".to_string()),
..Default::default()
};
let result = executor.execute_task("test", &task).await.unwrap();
assert!(!result.success);
assert_eq!(result.exit_code, Some(1));
}
#[tokio::test]
async fn test_execute_script_task() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let task = Task {
script: Some("echo hello from script".to_string()),
..Default::default()
};
let result = executor.execute_task("script", &task).await.unwrap();
assert!(result.success);
assert_eq!(result.exit_code, Some(0));
assert!(result.stdout.contains("hello from script"));
}
#[tokio::test]
async fn test_execute_script_task_with_shell_options() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let task = Task {
script: Some("false\necho should-not-run".to_string()),
script_shell: Some(super::super::ScriptShell::Bash),
shell_options: Some(super::super::ShellOptions::default()),
..Default::default()
};
let result = executor
.execute_task("script.failfast", &task)
.await
.unwrap();
assert!(!result.success);
assert_eq!(result.exit_code, Some(1));
assert!(!result.stdout.contains("should-not-run"));
}
#[tokio::test]
async fn test_execute_script_task_rejects_pipefail_for_sh() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let task = Task {
script: Some("echo hello".to_string()),
script_shell: Some(super::super::ScriptShell::Sh),
shell_options: Some(super::super::ShellOptions::default()),
..Default::default()
};
let err = executor.execute_task("script.sh", &task).await.unwrap_err();
assert!(
err.to_string()
.contains("shellOptions.pipefail with unsupported script shell 'sh'"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn test_execute_script_task_rejects_unsupported_shell_options() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let task = Task {
script: Some("console.log('hello')".to_string()),
script_shell: Some(super::super::ScriptShell::Node),
shell_options: Some(super::super::ShellOptions::default()),
..Default::default()
};
let err = executor
.execute_task("script.node", &task)
.await
.unwrap_err();
assert!(
err.to_string().contains("unsupported script shell 'node'"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn test_execute_sequential_group() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let task1 = Task {
command: "echo".to_string(),
args: vec!["first".to_string()],
description: Some("First task".to_string()),
..Default::default()
};
let task2 = Task {
command: "echo".to_string(),
args: vec!["second".to_string()],
description: Some("Second task".to_string()),
..Default::default()
};
let sequence = vec![
TaskNode::Task(Box::new(task1)),
TaskNode::Task(Box::new(task2)),
];
let all_tasks = Tasks::new();
let node = TaskNode::Sequence(sequence);
let results = executor
.execute_node("seq", &node, &all_tasks)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].stdout.contains("first"));
assert!(results[1].stdout.contains("second"));
}
#[tokio::test]
async fn test_command_injection_prevention() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let malicious_task = Task {
command: "echo".to_string(),
args: vec!["hello".to_string(), "; rm -rf /".to_string()],
description: Some("Malicious task test".to_string()),
..Default::default()
};
let result = executor
.execute_task("malicious", &malicious_task)
.await
.unwrap();
assert!(result.success);
assert!(result.stdout.contains("hello ; rm -rf /"));
}
#[tokio::test]
async fn test_special_characters_in_args() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
let executor = TaskExecutor::new(config);
let special_chars = vec![
"$USER",
"$(whoami)",
"`whoami`",
"&& echo hacked",
"|| echo failed",
"> /tmp/hack",
"| cat",
];
for special_arg in special_chars {
let task = Task {
command: "echo".to_string(),
args: vec!["safe".to_string(), special_arg.to_string()],
description: Some("Special character test".to_string()),
..Default::default()
};
let result = executor.execute_task("special", &task).await.unwrap();
assert!(result.success);
assert!(result.stdout.contains("safe"));
assert!(result.stdout.contains(special_arg));
}
}
#[tokio::test]
async fn test_environment_variable_safety() {
let mut config = ExecutorConfig {
capture_output: OutputCapture::Capture,
..Default::default()
};
config
.environment
.set("DANGEROUS_VAR".to_string(), "; rm -rf /".to_string());
let executor = TaskExecutor::new(config);
let task = Task {
command: "printenv".to_string(),
args: vec!["DANGEROUS_VAR".to_string()],
description: Some("Environment variable safety test".to_string()),
..Default::default()
};
let result = executor.execute_task("env_test", &task).await.unwrap();
assert!(result.success);
assert!(result.stdout.contains("; rm -rf /"));
}
#[tokio::test]
async fn test_execute_graph_parallel_groups() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
max_parallel: 2,
..Default::default()
};
let executor = TaskExecutor::new(config);
let mut graph = TaskGraph::new();
let t1 = Task {
command: "echo".into(),
args: vec!["A".into()],
..Default::default()
};
let t2 = Task {
command: "echo".into(),
args: vec!["B".into()],
..Default::default()
};
graph.add_task("t1", t1).unwrap();
graph.add_task("t2", t2).unwrap();
let results = executor.execute_graph(&graph).await.unwrap();
assert_eq!(results.len(), 2);
let joined = results.iter().map(|r| r.stdout.clone()).collect::<String>();
assert!(joined.contains("A") && joined.contains("B"));
}
#[tokio::test]
async fn test_execute_graph_respects_dependency_levels() {
let tmp = TempDir::new().unwrap();
let root = tmp.path();
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
max_parallel: 2,
project_root: root.to_path_buf(),
..Default::default()
};
let executor = TaskExecutor::new(config);
let mut tasks = Tasks::new();
tasks.tasks.insert(
"dep".into(),
TaskNode::Task(Box::new(Task {
command: "sh".into(),
args: vec!["-c".into(), "sleep 0.2 && echo ok > marker.txt".into()],
..Default::default()
})),
);
tasks.tasks.insert(
"consumer".into(),
TaskNode::Task(Box::new(Task {
command: "sh".into(),
args: vec!["-c".into(), "cat marker.txt".into()],
depends_on: vec![TaskDependency::from_name("dep")],
..Default::default()
})),
);
let mut graph = TaskGraph::new();
graph.build_for_task("consumer", &tasks).unwrap();
let results = executor.execute_graph(&graph).await.unwrap();
assert_eq!(results.len(), 2);
let consumer = results.iter().find(|r| r.name == "consumer").unwrap();
assert!(consumer.success);
assert!(consumer.stdout.contains("ok"));
}
#[tokio::test]
async fn test_cache_hit_replays_task_output_events() {
let workspace = TempDir::new().unwrap();
let cache_root = TempDir::new().unwrap();
std::fs::write(workspace.path().join("input.txt"), "v1").unwrap();
let cache = TaskCacheConfig {
cas: Arc::new(LocalCas::open(cache_root.path()).unwrap()),
action_cache: Arc::new(LocalActionCache::open(cache_root.path()).unwrap()),
vcs_hasher: Arc::new(WalkHasher::new(workspace.path())),
vcs_hasher_root: workspace.path().to_path_buf(),
cuenv_version: "test".to_string(),
runtime_identity_properties: std::collections::BTreeMap::new(),
cache_disabled_reason: None,
};
let executor = TaskExecutor::new(ExecutorConfig {
capture_output: OutputCapture::Capture,
project_root: workspace.path().to_path_buf(),
cache: Some(cache),
..Default::default()
});
let task = Task {
command: "sh".to_string(),
args: vec![
"-c".to_string(),
"printf 'hello\\n' && cat input.txt > out.txt".to_string(),
],
inputs: vec![super::super::Input::Path("input.txt".to_string())],
outputs: vec!["out.txt".to_string()],
cache: Some(super::super::TaskCachePolicy {
mode: super::super::TaskCacheMode::ReadWrite,
max_age: None,
}),
..Task::default()
};
executor.execute_task("cached", &task).await.unwrap();
std::fs::remove_file(workspace.path().join("out.txt")).unwrap();
let (tx, mut rx) = mpsc::unbounded_channel();
let layer = CuenvEventLayer::new(tx);
let subscriber = tracing_subscriber::registry().with(layer);
let _guard = tracing::subscriber::set_default(subscriber);
let result = executor.execute_task("cached", &task).await.unwrap();
assert!(result.success);
let mut saw_output = false;
while let Ok(event) = rx.try_recv() {
if let EventCategory::Task(TaskEvent::Output { name, content, .. }) = event.category
&& name == "cached"
&& content == "hello"
{
saw_output = true;
break;
}
}
assert!(
saw_output,
"expected cached task output event to be replayed"
);
}
#[test]
fn test_summarize_task_failure_with_exit_code() {
let result = TaskResult {
name: "build".to_string(),
exit_code: Some(127),
stdout: String::new(),
stderr: "command not found".to_string(),
success: false,
};
let summary = summarize_task_failure(&result, 10);
assert!(summary.contains("build"));
assert!(summary.contains("127"));
assert!(summary.contains("command not found"));
}
#[test]
fn test_summarize_task_failure_no_exit_code() {
let result = TaskResult {
name: "killed".to_string(),
exit_code: None,
stdout: String::new(),
stderr: String::new(),
success: false,
};
let summary = summarize_task_failure(&result, 10);
assert!(summary.contains("killed"));
assert!(summary.contains("unknown"));
}
#[test]
fn test_summarize_task_failure_no_output() {
let result = TaskResult {
name: "silent".to_string(),
exit_code: Some(1),
stdout: String::new(),
stderr: String::new(),
success: false,
};
let summary = summarize_task_failure(&result, 10);
assert!(summary.contains("No stdout/stderr"));
assert!(summary.contains("RUST_LOG=debug"));
}
#[test]
fn test_summarize_task_failure_truncates_long_output() {
let result = TaskResult {
name: "verbose".to_string(),
exit_code: Some(1),
stdout: (1..=50)
.map(|i| format!("line {}", i))
.collect::<Vec<_>>()
.join("\n"),
stderr: String::new(),
success: false,
};
let summary = summarize_task_failure(&result, 10);
assert!(summary.contains("last 10 of 50 lines"));
assert!(summary.contains("line 50"));
assert!(!summary.contains("line 1\n")); }
#[test]
fn test_summarize_stream_empty() {
assert!(summarize_stream("test", "", 10).is_none());
assert!(summarize_stream("test", " ", 10).is_none());
assert!(summarize_stream("test", "\n\n", 10).is_none());
}
#[test]
fn test_summarize_stream_short() {
let result = summarize_stream("stdout", "line 1\nline 2", 10).unwrap();
assert!(result.contains("stdout:"));
assert!(result.contains("line 1"));
assert!(result.contains("line 2"));
assert!(!result.contains("last"));
}
#[test]
fn test_format_failure_streams_both() {
let result = TaskResult {
name: "test".to_string(),
exit_code: Some(1),
stdout: "stdout content".to_string(),
stderr: "stderr content".to_string(),
success: false,
};
let formatted = format_failure_streams(&result, 10);
assert!(formatted.contains("stdout:"));
assert!(formatted.contains("stderr:"));
assert!(formatted.contains("stdout content"));
assert!(formatted.contains("stderr content"));
}
#[test]
fn test_find_workspace_root_with_npm() {
let tmp = TempDir::new().unwrap();
std::fs::write(
tmp.path().join("package.json"),
r#"{"workspaces": ["packages/*"]}"#,
)
.unwrap();
let subdir = tmp.path().join("packages").join("subpkg");
std::fs::create_dir_all(&subdir).unwrap();
let root = find_workspace_root(PackageManager::Npm, &subdir);
assert_eq!(root, tmp.path().canonicalize().unwrap());
}
#[test]
fn test_find_workspace_root_with_pnpm() {
let tmp = TempDir::new().unwrap();
std::fs::write(
tmp.path().join("pnpm-workspace.yaml"),
"packages:\n - 'packages/*'",
)
.unwrap();
let subdir = tmp.path().join("packages").join("app");
std::fs::create_dir_all(&subdir).unwrap();
let root = find_workspace_root(PackageManager::Pnpm, &subdir);
assert_eq!(root, tmp.path().canonicalize().unwrap());
}
#[test]
fn test_find_workspace_root_with_cargo() {
let tmp = TempDir::new().unwrap();
std::fs::write(
tmp.path().join("Cargo.toml"),
"[workspace]\nmembers = [\"crates/*\"]",
)
.unwrap();
let subdir = tmp.path().join("crates").join("core");
std::fs::create_dir_all(&subdir).unwrap();
let root = find_workspace_root(PackageManager::Cargo, &subdir);
assert_eq!(root, tmp.path().canonicalize().unwrap());
}
#[test]
fn test_find_workspace_root_no_workspace() {
let tmp = TempDir::new().unwrap();
let root = find_workspace_root(PackageManager::Npm, tmp.path());
assert_eq!(root, tmp.path().to_path_buf());
}
#[test]
fn test_package_json_has_workspaces_array() {
let tmp = TempDir::new().unwrap();
std::fs::write(
tmp.path().join("package.json"),
r#"{"workspaces": ["packages/*"]}"#,
)
.unwrap();
assert!(package_json_has_workspaces(tmp.path()));
}
#[test]
fn test_package_json_has_workspaces_object() {
let tmp = TempDir::new().unwrap();
std::fs::write(
tmp.path().join("package.json"),
r#"{"workspaces": {"packages": ["packages/*"]}}"#,
)
.unwrap();
assert!(package_json_has_workspaces(tmp.path()));
}
#[test]
fn test_package_json_no_workspaces() {
let tmp = TempDir::new().unwrap();
std::fs::write(tmp.path().join("package.json"), r#"{"name": "test"}"#).unwrap();
assert!(!package_json_has_workspaces(tmp.path()));
}
#[test]
fn test_package_json_empty_workspaces() {
let tmp = TempDir::new().unwrap();
std::fs::write(tmp.path().join("package.json"), r#"{"workspaces": []}"#).unwrap();
assert!(!package_json_has_workspaces(tmp.path()));
}
#[test]
fn test_package_json_missing() {
let tmp = TempDir::new().unwrap();
assert!(!package_json_has_workspaces(tmp.path()));
}
#[test]
fn test_cargo_toml_has_workspace() {
let tmp = TempDir::new().unwrap();
std::fs::write(
tmp.path().join("Cargo.toml"),
"[workspace]\nmembers = [\"crates/*\"]",
)
.unwrap();
assert!(cargo_toml_has_workspace(tmp.path()));
}
#[test]
fn test_cargo_toml_no_workspace() {
let tmp = TempDir::new().unwrap();
std::fs::write(tmp.path().join("Cargo.toml"), "[package]\nname = \"test\"").unwrap();
assert!(!cargo_toml_has_workspace(tmp.path()));
}
#[test]
fn test_cargo_toml_missing() {
let tmp = TempDir::new().unwrap();
assert!(!cargo_toml_has_workspace(tmp.path()));
}
#[test]
fn test_deno_json_has_workspace_array() {
let tmp = TempDir::new().unwrap();
std::fs::write(
tmp.path().join("deno.json"),
r#"{"workspace": ["./packages/*"]}"#,
)
.unwrap();
assert!(deno_json_has_workspace(tmp.path()));
}
#[test]
fn test_deno_json_has_workspace_object() {
let tmp = TempDir::new().unwrap();
std::fs::write(
tmp.path().join("deno.json"),
r#"{"workspace": {"members": ["./packages/*"]}}"#,
)
.unwrap();
assert!(deno_json_has_workspace(tmp.path()));
}
#[test]
fn test_deno_json_no_workspace() {
let tmp = TempDir::new().unwrap();
std::fs::write(tmp.path().join("deno.json"), r#"{"name": "test"}"#).unwrap();
assert!(!deno_json_has_workspace(tmp.path()));
}
#[test]
fn test_deno_json_missing() {
let tmp = TempDir::new().unwrap();
assert!(!deno_json_has_workspace(tmp.path()));
}
#[test]
fn test_executor_config_with_fields() {
let config = ExecutorConfig {
capture_output: OutputCapture::Capture,
max_parallel: 4,
working_dir: Some(PathBuf::from("/tmp")),
project_root: PathBuf::from("/project"),
cue_module_root: Some(PathBuf::from("/project/cue.mod")),
materialize_outputs: Some(PathBuf::from("/outputs")),
cache_dir: Some(PathBuf::from("/cache")),
show_cache_path: true,
cli_backend: Some("host".to_string()),
..Default::default()
};
assert!(config.capture_output.should_capture());
assert_eq!(config.max_parallel, 4);
assert_eq!(config.working_dir, Some(PathBuf::from("/tmp")));
assert!(config.show_cache_path);
}
#[test]
fn test_task_result_clone() {
let result = TaskResult {
name: "test".to_string(),
exit_code: Some(0),
stdout: "output".to_string(),
stderr: "error".to_string(),
success: true,
};
let cloned = result.clone();
assert_eq!(cloned.name, result.name);
assert_eq!(cloned.exit_code, result.exit_code);
assert_eq!(cloned.stdout, result.stdout);
assert_eq!(cloned.stderr, result.stderr);
assert_eq!(cloned.success, result.success);
}
#[test]
fn test_task_failure_snippet_lines_constant() {
assert_eq!(TASK_FAILURE_SNIPPET_LINES, 20);
}
}