use crate::state::{HookExecutionState, StateManager, compute_instance_hash};
use crate::types::{ExecutionStatus, Hook, HookExecutionConfig, HookResult};
use crate::{Error, Result};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::time::{Duration, Instant};
use tokio::process::Command;
use tokio::time::timeout;
use tracing::{debug, error, info, warn};
#[derive(Debug)]
pub struct HookExecutor {
config: HookExecutionConfig,
state_manager: StateManager,
}
impl HookExecutor {
pub fn new(config: HookExecutionConfig) -> Result<Self> {
let state_dir = if let Some(dir) = config.state_dir.clone() {
dir
} else {
StateManager::default_state_dir()?
};
let state_manager = StateManager::new(state_dir);
Ok(Self {
config,
state_manager,
})
}
pub fn with_default_config() -> Result<Self> {
let mut config = HookExecutionConfig::default();
if let Ok(state_dir) = std::env::var("CUENV_STATE_DIR") {
config.state_dir = Some(PathBuf::from(state_dir));
}
Self::new(config)
}
pub async fn execute_hooks_background(
&self,
directory_path: PathBuf,
config_hash: String,
hooks: Vec<Hook>,
) -> Result<String> {
use std::process::{Command, Stdio};
if hooks.is_empty() {
return Ok("No hooks to execute".to_string());
}
let instance_hash = compute_instance_hash(&directory_path, &config_hash);
let total_hooks = hooks.len();
let previous_env =
if let Ok(Some(existing_state)) = self.state_manager.load_state(&instance_hash).await {
if existing_state.status == ExecutionStatus::Completed {
Some(existing_state.environment_vars.clone())
} else {
existing_state.previous_env
}
} else {
None
};
let mut state = HookExecutionState::new(
directory_path.clone(),
instance_hash.clone(),
config_hash.clone(),
hooks.clone(),
);
state.previous_env = previous_env;
self.state_manager.save_state(&state).await?;
self.state_manager
.create_directory_marker(&directory_path, &instance_hash)
.await?;
info!(
"Starting background execution of {} hooks for directory: {}",
total_hooks,
directory_path.display()
);
let pid_file = self
.state_manager
.get_state_file_path(&instance_hash)
.with_extension("pid");
if pid_file.exists() {
if let Ok(pid_str) = std::fs::read_to_string(&pid_file)
&& let Ok(pid) = pid_str.trim().parse::<usize>()
{
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
let mut system = System::new();
let process_pid = Pid::from(pid);
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[process_pid]),
false,
ProcessRefreshKind::nothing(),
);
if system.process(process_pid).is_some() {
info!("Supervisor already running for directory with PID {}", pid);
return Ok(format!(
"Supervisor already running for {} hooks (PID: {})",
total_hooks, pid
));
}
}
std::fs::remove_file(&pid_file).ok();
}
let state_dir = self.state_manager.get_state_dir();
let hooks_file = state_dir.join(format!("{}_hooks.json", instance_hash));
let config_file = state_dir.join(format!("{}_config.json", instance_hash));
let hooks_json = serde_json::to_string(&hooks)
.map_err(|e| Error::serialization(format!("Failed to serialize hooks: {}", e)))?;
std::fs::write(&hooks_file, &hooks_json).map_err(|e| Error::Io {
source: e,
path: Some(hooks_file.clone().into_boxed_path()),
operation: "write".to_string(),
})?;
let config_json = serde_json::to_string(&self.config)
.map_err(|e| Error::serialization(format!("Failed to serialize config: {}", e)))?;
std::fs::write(&config_file, &config_json).map_err(|e| Error::Io {
source: e,
path: Some(config_file.clone().into_boxed_path()),
operation: "write".to_string(),
})?;
let current_exe = if let Ok(exe_path) = std::env::var("CUENV_EXECUTABLE") {
PathBuf::from(exe_path)
} else {
std::env::current_exe()
.map_err(|e| Error::process(format!("Failed to get current exe: {}", e)))?
};
let mut cmd = Command::new(¤t_exe);
cmd.arg("__hook-supervisor") .arg("--directory")
.arg(directory_path.to_string_lossy().to_string())
.arg("--instance-hash")
.arg(&instance_hash)
.arg("--config-hash")
.arg(&config_hash)
.arg("--hooks-file")
.arg(hooks_file.to_string_lossy().to_string())
.arg("--config-file")
.arg(config_file.to_string_lossy().to_string())
.stdin(Stdio::null());
let temp_dir = std::env::temp_dir();
let log_file = std::fs::File::create(temp_dir.join("cuenv_supervisor.log")).ok();
let err_file = std::fs::File::create(temp_dir.join("cuenv_supervisor_err.log")).ok();
if let Some(log) = log_file {
cmd.stdout(Stdio::from(log));
} else {
cmd.stdout(Stdio::null());
}
if let Some(err) = err_file {
cmd.stderr(Stdio::from(err));
} else {
cmd.stderr(Stdio::null());
}
if let Ok(state_dir) = std::env::var("CUENV_STATE_DIR") {
cmd.env("CUENV_STATE_DIR", state_dir);
}
if let Ok(approval_file) = std::env::var("CUENV_APPROVAL_FILE") {
cmd.env("CUENV_APPROVAL_FILE", approval_file);
}
if let Ok(rust_log) = std::env::var("RUST_LOG") {
cmd.env("RUST_LOG", rust_log);
}
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
const DETACHED_PROCESS: u32 = 0x00000008;
const CREATE_NEW_PROCESS_GROUP: u32 = 0x00000200;
cmd.creation_flags(DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP);
}
let _child = cmd
.spawn()
.map_err(|e| Error::process(format!("Failed to spawn supervisor: {}", e)))?;
info!("Spawned supervisor process for hook execution");
Ok(format!(
"Started execution of {} hooks in background",
total_hooks
))
}
pub async fn get_execution_status(
&self,
directory_path: &Path,
) -> Result<Option<HookExecutionState>> {
let states = self.state_manager.list_active_states().await?;
for state in states {
if state.directory_path == directory_path {
return Ok(Some(state));
}
}
Ok(None)
}
pub async fn get_execution_status_for_instance(
&self,
directory_path: &Path,
config_hash: &str,
) -> Result<Option<HookExecutionState>> {
let instance_hash = compute_instance_hash(directory_path, config_hash);
self.state_manager.load_state(&instance_hash).await
}
pub async fn get_fast_status(
&self,
directory_path: &Path,
) -> Result<Option<HookExecutionState>> {
if !self.state_manager.has_active_marker(directory_path) {
return Ok(None);
}
if let Some(instance_hash) = self
.state_manager
.get_marker_instance_hash(directory_path)
.await
{
let state = self.state_manager.load_state(&instance_hash).await?;
match &state {
Some(s) if s.is_complete() && !s.should_display_completed() => {
self.state_manager
.remove_directory_marker(directory_path)
.await
.ok();
return Ok(None);
}
None => {
self.state_manager
.remove_directory_marker(directory_path)
.await
.ok();
return Ok(None);
}
Some(_) => return Ok(state),
}
}
Ok(None)
}
#[must_use]
pub fn state_manager(&self) -> &StateManager {
&self.state_manager
}
pub fn get_fast_status_sync(
&self,
directory_path: &Path,
) -> Result<Option<HookExecutionState>> {
if !self.state_manager.has_active_marker(directory_path) {
return Ok(None);
}
if let Some(instance_hash) = self
.state_manager
.get_marker_instance_hash_sync(directory_path)
{
let state = self.state_manager.load_state_sync(&instance_hash)?;
match &state {
Some(s) if s.is_complete() && !s.should_display_completed() => {
return Ok(None);
}
None => {
return Ok(None);
}
Some(_) => return Ok(state),
}
}
Ok(None)
}
pub async fn wait_for_completion(
&self,
directory_path: &Path,
config_hash: &str,
timeout_seconds: Option<u64>,
) -> Result<HookExecutionState> {
let instance_hash = compute_instance_hash(directory_path, config_hash);
let poll_interval = Duration::from_millis(500);
let start_time = Instant::now();
loop {
if let Some(state) = self.state_manager.load_state(&instance_hash).await? {
if state.is_complete() {
return Ok(state);
}
} else {
return Err(Error::state_not_found(&instance_hash));
}
if let Some(timeout) = timeout_seconds
&& start_time.elapsed().as_secs() >= timeout
{
return Err(Error::Timeout { seconds: timeout });
}
tokio::time::sleep(poll_interval).await;
}
}
pub async fn cancel_execution(
&self,
directory_path: &Path,
config_hash: &str,
reason: Option<String>,
) -> Result<bool> {
let instance_hash = compute_instance_hash(directory_path, config_hash);
let pid_file = self
.state_manager
.get_state_file_path(&instance_hash)
.with_extension("pid");
if pid_file.exists()
&& let Ok(pid_str) = std::fs::read_to_string(&pid_file)
&& let Ok(pid) = pid_str.trim().parse::<usize>()
{
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, Signal, System};
let mut system = System::new();
let process_pid = Pid::from(pid);
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[process_pid]),
false,
ProcessRefreshKind::nothing(),
);
if let Some(process) = system.process(process_pid) {
if process.kill_with(Signal::Term).is_some() {
info!("Sent SIGTERM to supervisor process PID {}", pid);
} else {
warn!("Failed to send SIGTERM to supervisor process PID {}", pid);
}
} else {
info!(
"Supervisor process PID {} not found (may have already exited)",
pid
);
}
std::fs::remove_file(&pid_file).ok();
}
if let Some(mut state) = self.state_manager.load_state(&instance_hash).await?
&& !state.is_complete()
{
state.mark_cancelled(reason);
self.state_manager.save_state(&state).await?;
info!(
"Cancelled execution for directory: {}",
directory_path.display()
);
return Ok(true);
}
Ok(false)
}
pub async fn cleanup_old_states(&self, older_than: chrono::Duration) -> Result<usize> {
let states = self.state_manager.list_active_states().await?;
let cutoff = chrono::Utc::now() - older_than;
let mut cleaned_count = 0;
for state in states {
if state.is_complete()
&& let Some(finished_at) = state.finished_at
&& finished_at < cutoff
{
self.state_manager
.remove_state(&state.instance_hash)
.await?;
cleaned_count += 1;
}
}
if cleaned_count > 0 {
info!("Cleaned up {} old execution states", cleaned_count);
}
Ok(cleaned_count)
}
pub async fn execute_single_hook(&self, hook: Hook) -> Result<HookResult> {
let timeout = self.config.default_timeout_seconds;
execute_hook_with_timeout(hook, &timeout).await
}
}
pub async fn execute_hooks(
hooks: Vec<Hook>,
_directory_path: &Path,
config: &HookExecutionConfig,
state_manager: &StateManager,
state: &mut HookExecutionState,
) -> Result<()> {
let hook_count = hooks.len();
debug!("execute_hooks called with {} hooks", hook_count);
if hook_count == 0 {
debug!("No hooks to execute");
return Ok(());
}
debug!("Starting to iterate over {} hooks", hook_count);
for (index, hook) in hooks.into_iter().enumerate() {
debug!(
"Processing hook {}/{}: command={}",
index + 1,
state.total_hooks,
hook.command
);
debug!("Checking if execution was cancelled");
if let Ok(Some(current_state)) = state_manager.load_state(&state.instance_hash).await {
debug!("Loaded state: status = {:?}", current_state.status);
if current_state.status == ExecutionStatus::Cancelled {
debug!("Execution was cancelled, stopping");
break;
}
}
let timeout_seconds = config.default_timeout_seconds;
state.mark_hook_running(index);
let result = execute_hook_with_timeout(hook.clone(), &timeout_seconds).await;
match result {
Ok(hook_result) => {
if hook.source.unwrap_or(false) {
if hook_result.stdout.is_empty() {
warn!(
"Source hook produced empty stdout. Stderr content:\n{}",
hook_result.stderr
);
} else {
debug!(
"Evaluating source hook output for environment variables (success={})",
hook_result.success
);
match evaluate_shell_environment(
&hook_result.stdout,
&state.environment_vars,
)
.await
{
Ok((env_vars, removed_keys)) => {
let count = env_vars.len();
debug!(
"Captured {} environment variables from source hook ({} removed)",
count,
removed_keys.len()
);
for (key, value) in env_vars {
state.environment_vars.insert(key, value);
}
for key in &removed_keys {
state.environment_vars.remove(key);
}
}
Err(e) => {
warn!("Failed to evaluate source hook output: {}", e);
}
}
}
}
state.record_hook_result(index, hook_result.clone());
if !hook_result.success && config.fail_fast {
warn!(
"Hook {} failed and fail_fast is enabled, stopping",
index + 1
);
break;
}
}
Err(e) => {
let error_msg = format!("Hook execution error: {}", e);
state.record_hook_result(
index,
HookResult::failure(
hook.clone(),
None,
String::new(),
error_msg.clone(),
0,
error_msg,
),
);
if config.fail_fast {
warn!("Hook {} failed with error, stopping", index + 1);
break;
}
}
}
state_manager.save_state(state).await?;
}
if state.status == ExecutionStatus::Running {
state.status = ExecutionStatus::Completed;
state.finished_at = Some(chrono::Utc::now());
info!(
"All hooks completed successfully for directory: {}",
state.directory_path.display()
);
}
state_manager.save_state(state).await?;
Ok(())
}
pub async fn capture_source_environment(
hook: Hook,
prior_env: &HashMap<String, String>,
timeout_seconds: u64,
) -> Result<HashMap<String, String>> {
if !hook.source.unwrap_or(false) {
return Err(Error::configuration(
"capture_source_environment requires a source hook",
));
}
let hook_result = execute_hook_with_timeout(hook, &timeout_seconds).await?;
let (env_delta, removed_keys) =
evaluate_shell_environment(&hook_result.stdout, prior_env).await?;
let mut environment = prior_env.clone();
for (key, value) in env_delta {
environment.insert(key, value);
}
for key in removed_keys {
environment.remove(&key);
}
Ok(environment)
}
fn find_env_command() -> String {
let path_var = std::env::var_os("PATH").unwrap_or_default();
for dir in std::env::split_paths(&path_var) {
let candidate = dir.join("env");
if candidate.is_file() {
return candidate.to_string_lossy().into_owned();
}
}
"/usr/bin/env".to_string()
}
async fn detect_shell() -> String {
if is_shell_capable("bash").await {
return "bash".to_string();
}
if is_shell_capable("zsh").await {
return "zsh".to_string();
}
"sh".to_string()
}
async fn is_shell_capable(shell: &str) -> bool {
let check_script = "case x in x) true ;& y) true ;; esac";
match Command::new(shell)
.arg("-c")
.arg(check_script)
.output()
.await
{
Ok(output) => output.status.success(),
Err(_) => false,
}
}
async fn evaluate_shell_environment(
shell_script: &str,
prior_env: &HashMap<String, String>,
) -> Result<(HashMap<String, String>, Vec<String>)> {
const DELIMITER: &str = "__CUENV_ENV_START__";
debug!(
"Evaluating shell script to extract environment ({} bytes)",
shell_script.len()
);
tracing::trace!("Raw shell script from hook:\n{}", shell_script);
let mut shell = detect_shell().await;
for line in shell_script.lines() {
if let Some(path) = line.strip_prefix("BASH='")
&& let Some(end) = path.find('\'')
{
let bash_path = &path[..end];
let path = PathBuf::from(bash_path);
if path.exists() {
debug!("Detected Nix bash in script: {}", bash_path);
shell = bash_path.to_string();
break;
}
}
}
debug!("Using shell: {}", shell);
let env_cmd = find_env_command();
let mut cmd_before = Command::new(&shell);
cmd_before.arg("-c");
cmd_before.arg(format!("{env_cmd} -0"));
cmd_before.stdout(Stdio::piped());
cmd_before.stderr(Stdio::piped());
for (key, value) in prior_env {
cmd_before.env(key, value);
}
let output_before = cmd_before
.output()
.await
.map_err(|e| Error::configuration(format!("Failed to get initial environment: {}", e)))?;
let env_before_output = String::from_utf8_lossy(&output_before.stdout);
let mut env_before = HashMap::new();
for line in env_before_output.split('\0') {
if let Some((key, value)) = line.split_once('=') {
env_before.insert(key.to_string(), value.to_string());
}
}
let filtered_lines: Vec<&str> = shell_script
.lines()
.filter(|line| {
let trimmed = line.trim();
if trimmed.is_empty() {
return false;
}
if trimmed.starts_with("✓")
|| trimmed.starts_with("sh:")
|| trimmed.starts_with("bash:")
{
return false;
}
true
})
.collect();
let filtered_script = filtered_lines.join("\n");
tracing::trace!("Filtered shell script:\n{}", filtered_script);
let mut cmd = Command::new(shell);
cmd.arg("-c");
let script = format!(
"{}\necho -ne '\\0{}\\0'; {env_cmd} -0",
filtered_script, DELIMITER
);
cmd.arg(script);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
for (key, value) in prior_env {
cmd.env(key, value);
}
let output = cmd.output().await.map_err(|e| {
Error::configuration(format!("Failed to evaluate shell environment: {}", e))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
warn!(
"Shell script evaluation finished with error (exit code {:?}): {}",
output.status.code(),
stderr
);
}
let stdout_bytes = &output.stdout;
let delimiter_bytes = format!("\0{}\0", DELIMITER).into_bytes();
let env_start_index = stdout_bytes
.windows(delimiter_bytes.len())
.position(|window| window == delimiter_bytes);
let env_output_bytes = if let Some(idx) = env_start_index {
&stdout_bytes[idx + delimiter_bytes.len()..]
} else {
debug!("Environment delimiter not found in hook output");
let len = stdout_bytes.len();
let start = len.saturating_sub(1000);
let tail = String::from_utf8_lossy(&stdout_bytes[start..]);
warn!(
"Delimiter missing. Tail of stdout (last 1000 bytes):\n{}",
tail
);
&[]
};
let env_output = String::from_utf8_lossy(env_output_bytes);
let mut env_delta = HashMap::new();
let mut post_env_keys = std::collections::HashSet::new();
let is_skip_key = |key: &str| -> bool {
key.starts_with("BASH_FUNC_")
|| key == "PS1"
|| key == "PS2"
|| key == "_"
|| key == "PWD"
|| key == "OLDPWD"
|| key == "SHLVL"
|| key.starts_with("BASH")
};
for line in env_output.split('\0') {
if line.is_empty() {
continue;
}
if let Some((key, value)) = line.split_once('=') {
if is_skip_key(key) {
continue;
}
if !key.is_empty() {
post_env_keys.insert(key.to_string());
}
if !key.is_empty() && env_before.get(key) != Some(&value.to_string()) {
env_delta.insert(key.to_string(), value.to_string());
}
}
}
let removed_keys: Vec<String> = prior_env
.keys()
.filter(|key| !is_skip_key(key) && !post_env_keys.contains(key.as_str()))
.cloned()
.collect();
if env_delta.is_empty() && removed_keys.is_empty() && !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(Error::configuration(format!(
"Shell script evaluation failed and no environment captured. Error: {}",
stderr
)));
}
debug!(
"Evaluated shell script and extracted {} new/changed environment variables ({} removed)",
env_delta.len(),
removed_keys.len()
);
Ok((env_delta, removed_keys))
}
async fn execute_hook_with_timeout(hook: Hook, timeout_seconds: &u64) -> Result<HookResult> {
let start_time = Instant::now();
debug!(
"Executing hook: {} {} (source: {})",
hook.command,
hook.args.join(" "),
hook.source.unwrap_or(false)
);
let mut cmd = Command::new(&hook.command);
cmd.args(&hook.args);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(dir) = &hook.dir {
cmd.current_dir(dir);
}
if hook.source.unwrap_or(false) {
cmd.env("SHELL", detect_shell().await);
}
let execution_result = timeout(Duration::from_secs(*timeout_seconds), cmd.output()).await;
#[expect(
clippy::cast_possible_truncation,
reason = "u128 to u64 truncation is acceptable for duration"
)]
let duration_ms = start_time.elapsed().as_millis() as u64;
match execution_result {
Ok(Ok(output)) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if output.status.success() {
debug!("Hook completed successfully in {}ms", duration_ms);
Ok(HookResult::success(
hook,
output.status,
stdout,
stderr,
duration_ms,
))
} else {
warn!("Hook failed with exit code: {:?}", output.status.code());
Ok(HookResult::failure(
hook,
Some(output.status),
stdout,
stderr,
duration_ms,
format!("Command exited with status: {}", output.status),
))
}
}
Ok(Err(io_error)) => {
error!("Failed to execute hook: {}", io_error);
Ok(HookResult::failure(
hook,
None,
String::new(),
String::new(),
duration_ms,
format!("Failed to execute command: {}", io_error),
))
}
Err(_timeout_error) => {
warn!("Hook timed out after {} seconds", timeout_seconds);
Ok(HookResult::timeout(
hook,
String::new(),
String::new(),
*timeout_seconds,
))
}
}
}
#[cfg(test)]
#[expect(
clippy::print_stderr,
reason = "Tests may use eprintln! to report skip conditions"
)]
mod tests {
use super::*;
use crate::types::Hook;
use tempfile::TempDir;
fn setup_cuenv_executable() -> Option<PathBuf> {
if std::env::var("CUENV_EXECUTABLE").is_ok() {
return Some(PathBuf::from(std::env::var("CUENV_EXECUTABLE").unwrap()));
}
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let workspace_root = manifest_dir.parent()?.parent()?;
let cuenv_binary = workspace_root.join("target/debug/cuenv");
if cuenv_binary.exists() {
#[expect(
unsafe_code,
reason = "Test helper setting env var in controlled test environment"
)]
unsafe {
std::env::set_var("CUENV_EXECUTABLE", &cuenv_binary);
}
Some(cuenv_binary)
} else {
None
}
}
#[tokio::test]
async fn test_hook_executor_creation() {
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 60,
fail_fast: true,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = HookExecutor::new(config).unwrap();
assert_eq!(executor.config.default_timeout_seconds, 60);
}
#[tokio::test]
async fn test_execute_single_hook_success() {
let executor = HookExecutor::with_default_config().unwrap();
let hook = Hook {
order: 100,
propagate: false,
command: "echo".to_string(),
args: vec!["hello".to_string()],
dir: None,
inputs: vec![],
source: None,
};
let result = executor.execute_single_hook(hook).await.unwrap();
assert!(result.success);
assert!(result.stdout.contains("hello"));
}
#[tokio::test]
async fn test_execute_single_hook_failure() {
let executor = HookExecutor::with_default_config().unwrap();
let hook = Hook {
order: 100,
propagate: false,
command: "false".to_string(), args: vec![],
dir: None,
inputs: Vec::new(),
source: Some(false),
};
let result = executor.execute_single_hook(hook).await.unwrap();
assert!(!result.success);
assert!(result.exit_status.is_some());
assert_ne!(result.exit_status.unwrap(), 0);
}
#[tokio::test]
async fn test_execute_single_hook_timeout() {
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 1, fail_fast: true,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = HookExecutor::new(config).unwrap();
let hook = Hook {
order: 100,
propagate: false,
command: "sleep".to_string(),
args: vec!["10".to_string()], dir: None,
inputs: Vec::new(),
source: Some(false),
};
let result = executor.execute_single_hook(hook).await.unwrap();
assert!(!result.success);
assert!(result.error.as_ref().unwrap().contains("timed out"));
}
#[tokio::test]
async fn test_background_execution() {
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 30,
fail_fast: true,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = HookExecutor::new(config).unwrap();
let directory_path = PathBuf::from("/test/directory");
let config_hash = "test_hash".to_string();
let hooks = vec![
Hook {
order: 100,
propagate: false,
command: "echo".to_string(),
args: vec!["hook1".to_string()],
dir: None,
inputs: Vec::new(),
source: Some(false),
},
Hook {
order: 100,
propagate: false,
command: "echo".to_string(),
args: vec!["hook2".to_string()],
dir: None,
inputs: Vec::new(),
source: Some(false),
},
];
let result = executor
.execute_hooks_background(directory_path.clone(), config_hash.clone(), hooks)
.await
.unwrap();
assert!(result.contains("Started execution of 2 hooks"));
tokio::time::sleep(Duration::from_millis(100)).await;
let status = executor
.get_execution_status_for_instance(&directory_path, &config_hash)
.await
.unwrap();
assert!(status.is_some());
let state = status.unwrap();
assert_eq!(state.total_hooks, 2);
assert_eq!(state.directory_path, directory_path);
}
#[tokio::test]
async fn test_command_validation() {
let executor = HookExecutor::with_default_config().unwrap();
let hook = Hook {
order: 100,
propagate: false,
command: "echo".to_string(),
args: vec!["test message".to_string()],
dir: None,
inputs: Vec::new(),
source: Some(false),
};
let result = executor.execute_single_hook(hook).await;
assert!(result.is_ok(), "Echo command should succeed");
let hook_result = result.unwrap();
assert!(hook_result.stdout.contains("test message"));
}
#[tokio::test]
async fn test_cancellation() {
if setup_cuenv_executable().is_none() {
eprintln!("Skipping test_cancellation: cuenv binary not found");
return;
}
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 30,
fail_fast: false,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = HookExecutor::new(config).unwrap();
let directory_path = PathBuf::from("/test/cancel");
let config_hash = "cancel_test".to_string();
let hooks = vec![Hook {
order: 100,
propagate: false,
command: "sleep".to_string(),
args: vec!["10".to_string()],
dir: None,
inputs: Vec::new(),
source: Some(false),
}];
executor
.execute_hooks_background(directory_path.clone(), config_hash.clone(), hooks)
.await
.unwrap();
let mut started = false;
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(100)).await;
if let Ok(Some(state)) = executor
.get_execution_status_for_instance(&directory_path, &config_hash)
.await
&& state.status == ExecutionStatus::Running
{
started = true;
break;
}
}
if !started {
eprintln!("Warning: Supervisor didn't start in time, skipping cancellation test");
return;
}
let cancelled = executor
.cancel_execution(
&directory_path,
&config_hash,
Some("User cancelled".to_string()),
)
.await
.unwrap();
assert!(cancelled);
let state = executor
.get_execution_status_for_instance(&directory_path, &config_hash)
.await
.unwrap()
.unwrap();
assert_eq!(state.status, ExecutionStatus::Cancelled);
}
#[tokio::test]
async fn test_large_output_handling() {
let executor = HookExecutor::with_default_config().unwrap();
let large_content = "x".repeat(1000); let mut args = Vec::new();
for i in 0..100 {
args.push(format!("Line {}: {}", i, large_content));
}
let hook = Hook {
order: 100,
propagate: false,
command: "echo".to_string(),
args,
dir: None,
inputs: Vec::new(),
source: Some(false),
};
let result = executor.execute_single_hook(hook).await.unwrap();
assert!(result.success);
assert!(result.stdout.len() > 50_000); }
#[tokio::test]
async fn test_state_cleanup() {
if setup_cuenv_executable().is_none() {
eprintln!("Skipping test_state_cleanup: cuenv binary not found");
return;
}
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 30,
fail_fast: false,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = HookExecutor::new(config).unwrap();
let directory_path = PathBuf::from("/test/cleanup");
let config_hash = "cleanup_test".to_string();
let hooks = vec![Hook {
order: 100,
propagate: false,
command: "echo".to_string(),
args: vec!["test".to_string()],
dir: None,
inputs: Vec::new(),
source: Some(false),
}];
executor
.execute_hooks_background(directory_path.clone(), config_hash.clone(), hooks)
.await
.unwrap();
let mut state_exists = false;
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(100)).await;
if executor
.get_execution_status_for_instance(&directory_path, &config_hash)
.await
.unwrap()
.is_some()
{
state_exists = true;
break;
}
}
if !state_exists {
eprintln!("Warning: State never created, skipping cleanup test");
return;
}
if let Err(e) = executor
.wait_for_completion(&directory_path, &config_hash, Some(15))
.await
{
eprintln!(
"Warning: wait_for_completion timed out: {}, skipping test",
e
);
return;
}
let cleaned = executor
.cleanup_old_states(chrono::Duration::seconds(0))
.await
.unwrap();
assert_eq!(cleaned, 1);
let state = executor
.get_execution_status_for_instance(&directory_path, &config_hash)
.await
.unwrap();
assert!(state.is_none());
}
#[tokio::test]
async fn test_execution_state_tracking() {
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 30,
fail_fast: true,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = HookExecutor::new(config).unwrap();
let directory_path = PathBuf::from("/test/directory");
let config_hash = "hash".to_string();
let status = executor
.get_execution_status_for_instance(&directory_path, &config_hash)
.await
.unwrap();
assert!(status.is_none());
let hooks = vec![Hook {
order: 100,
propagate: false,
command: "echo".to_string(),
args: vec!["test".to_string()],
dir: None,
inputs: Vec::new(),
source: Some(false),
}];
executor
.execute_hooks_background(directory_path.clone(), config_hash.clone(), hooks)
.await
.unwrap();
let status = executor
.get_execution_status_for_instance(&directory_path, &config_hash)
.await
.unwrap();
assert!(status.is_some());
}
#[tokio::test]
async fn test_working_directory_handling() {
let executor = HookExecutor::with_default_config().unwrap();
let temp_dir = TempDir::new().unwrap();
let hook_with_valid_dir = Hook {
order: 100,
propagate: false,
command: "pwd".to_string(),
args: vec![],
dir: Some(temp_dir.path().to_string_lossy().to_string()),
inputs: vec![],
source: None,
};
let result = executor
.execute_single_hook(hook_with_valid_dir)
.await
.unwrap();
assert!(result.success);
assert!(result.stdout.contains(temp_dir.path().to_str().unwrap()));
let hook_with_invalid_dir = Hook {
order: 100,
propagate: false,
command: "pwd".to_string(),
args: vec![],
dir: Some("/nonexistent/directory/that/does/not/exist".to_string()),
inputs: vec![],
source: None,
};
let result = executor.execute_single_hook(hook_with_invalid_dir).await;
if let Ok(output) = result {
assert!(
!output
.stdout
.contains("/nonexistent/directory/that/does/not/exist")
);
}
}
#[tokio::test]
async fn test_hook_execution_with_complex_output() {
let executor = HookExecutor::with_default_config().unwrap();
let hook = Hook {
order: 100,
propagate: false,
command: "echo".to_string(),
args: vec!["stdout output".to_string()],
dir: None,
inputs: vec![],
source: None,
};
let result = executor.execute_single_hook(hook).await.unwrap();
assert!(result.success);
assert!(result.stdout.contains("stdout output"));
let hook_with_exit_code = Hook {
order: 100,
propagate: false,
command: "false".to_string(),
args: vec![],
dir: None,
inputs: Vec::new(),
source: Some(false),
};
let result = executor
.execute_single_hook(hook_with_exit_code)
.await
.unwrap();
assert!(!result.success);
assert!(result.exit_status.is_some());
}
#[tokio::test]
async fn test_state_dir_getter() {
use crate::state::StateManager;
let temp_dir = TempDir::new().unwrap();
let state_dir = temp_dir.path().to_path_buf();
let state_manager = StateManager::new(state_dir.clone());
assert_eq!(state_manager.get_state_dir(), state_dir.as_path());
}
#[tokio::test]
async fn test_hook_timeout_behavior() {
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 1,
fail_fast: true,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = HookExecutor::new(config).unwrap();
let slow_hook = Hook {
order: 100,
propagate: false,
command: "sleep".to_string(),
args: vec!["30".to_string()],
dir: None,
inputs: Vec::new(),
source: Some(false),
};
let result = executor.execute_single_hook(slow_hook).await.unwrap();
assert!(!result.success, "Hook should fail due to timeout");
assert!(
result.error.is_some(),
"Should have error message on timeout"
);
let error_msg = result.error.as_ref().unwrap();
assert!(
error_msg.contains("timed out"),
"Error should mention timeout: {}",
error_msg
);
assert!(
error_msg.contains('1'),
"Error should mention timeout duration: {}",
error_msg
);
assert!(
result.exit_status.is_none(),
"Exit status should be None for timed out process"
);
assert!(
result.duration_ms >= 1000,
"Duration should be at least 1 second"
);
assert!(
result.duration_ms < 5000,
"Duration should not be much longer than timeout"
);
}
#[tokio::test]
async fn test_hook_timeout_with_partial_output() {
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 1,
fail_fast: true,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = HookExecutor::new(config).unwrap();
let hook = Hook {
order: 100,
propagate: false,
command: "bash".to_string(),
args: vec!["-c".to_string(), "echo 'started'; sleep 30".to_string()],
dir: None,
inputs: Vec::new(),
source: Some(false),
};
let result = executor.execute_single_hook(hook).await.unwrap();
assert!(!result.success, "Hook should timeout");
assert!(
result.error.as_ref().unwrap().contains("timed out"),
"Should indicate timeout"
);
}
#[tokio::test]
async fn test_concurrent_hook_isolation() {
use std::sync::Arc;
use tokio::task::JoinSet;
let temp_dir = TempDir::new().unwrap();
let config = HookExecutionConfig {
default_timeout_seconds: 30,
fail_fast: false,
state_dir: Some(temp_dir.path().to_path_buf()),
};
let executor = Arc::new(HookExecutor::new(config).unwrap());
let mut join_set = JoinSet::new();
for i in 0..5 {
let executor = executor.clone();
let unique_id = format!("hook_{}", i);
join_set.spawn(async move {
let hook = Hook {
order: 100,
propagate: false,
command: "bash".to_string(),
args: vec![
"-c".to_string(),
format!(
"echo 'ID:{}'; sleep 0.1; echo 'DONE:{}'",
unique_id, unique_id
),
],
dir: None,
inputs: Vec::new(),
source: Some(false),
};
let result = executor.execute_single_hook(hook).await.unwrap();
(i, result)
});
}
let mut results = Vec::new();
while let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
assert_eq!(results.len(), 5, "All 5 hooks should complete");
for (i, result) in results {
assert!(result.success, "Hook {} should succeed", i);
let expected_id = format!("hook_{}", i);
assert!(
result.stdout.contains(&format!("ID:{}", expected_id)),
"Hook {} output should contain its ID. Got: {}",
i,
result.stdout
);
assert!(
result.stdout.contains(&format!("DONE:{}", expected_id)),
"Hook {} output should contain its DONE marker. Got: {}",
i,
result.stdout
);
for j in 0..5 {
if j != i {
let other_id = format!("hook_{}", j);
assert!(
!result.stdout.contains(&format!("ID:{}", other_id)),
"Hook {} output should not contain hook {} ID",
i,
j
);
}
}
}
}
#[tokio::test]
async fn test_environment_capture_special_chars() {
let multiline_script = r"
export MULTILINE_VAR='line1
line2
line3'
";
let result = evaluate_shell_environment(multiline_script, &HashMap::new()).await;
assert!(result.is_ok(), "Should parse multiline env vars");
let (env_vars, _removed) = result.unwrap();
if let Some(value) = env_vars.get("MULTILINE_VAR") {
assert!(
value.contains("line1"),
"Should contain first line: {}",
value
);
assert!(
value.contains("line2"),
"Should contain second line: {}",
value
);
}
let unicode_script = r"
export UNICODE_VAR='Hello 世界 🌍 émoji'
export CHINESE_VAR='中文测试'
export JAPANESE_VAR='日本語テスト'
";
let result = evaluate_shell_environment(unicode_script, &HashMap::new()).await;
assert!(result.is_ok(), "Should parse unicode env vars");
let (env_vars, _removed) = result.unwrap();
if let Some(value) = env_vars.get("UNICODE_VAR") {
assert!(
value.contains("世界"),
"Should preserve Chinese characters: {}",
value
);
assert!(value.contains("🌍"), "Should preserve emoji: {}", value);
}
let special_chars_script = r#"
export QUOTED_VAR="value with 'single' and \"double\" quotes"
export PATH_VAR="/usr/local/bin:/usr/bin:/bin"
export EQUALS_VAR="key=value=another"
"#;
let result = evaluate_shell_environment(special_chars_script, &HashMap::new()).await;
assert!(result.is_ok(), "Should parse special chars");
let (env_vars, _removed) = result.unwrap();
if let Some(value) = env_vars.get("EQUALS_VAR") {
assert!(
value.contains("key=value=another"),
"Should preserve equals signs: {}",
value
);
}
}
#[tokio::test]
async fn test_environment_capture_edge_cases() {
let empty_script = r"
export EMPTY_VAR=''
export SPACE_VAR=' '
";
let result = evaluate_shell_environment(empty_script, &HashMap::new()).await;
assert!(result.is_ok(), "Should handle empty/whitespace values");
let (_env_vars, _removed) = result.unwrap();
let long_value = "x".repeat(10000);
let long_script = format!("export LONG_VAR='{}'", long_value);
let result = evaluate_shell_environment(&long_script, &HashMap::new()).await;
assert!(result.is_ok(), "Should handle very long values");
let (env_vars, _removed) = result.unwrap();
if let Some(value) = env_vars.get("LONG_VAR") {
assert_eq!(value.len(), 10000, "Should preserve full length");
}
}
#[tokio::test]
async fn test_environment_prior_env_chaining() {
let mut prior_env = HashMap::new();
prior_env.insert("CUENV_TEST_PRIOR".to_string(), "original_value".to_string());
let script = r#"export CUENV_TEST_PRIOR="extended_${CUENV_TEST_PRIOR}""#;
let result = evaluate_shell_environment(script, &prior_env).await;
assert!(
result.is_ok(),
"Should evaluate with prior_env: {:?}",
result.as_ref().err()
);
let (env_vars, _removed) = result.unwrap();
if let Some(value) = env_vars.get("CUENV_TEST_PRIOR") {
assert!(
value.contains("extended_"),
"Value should contain extended_ prefix: {}",
value
);
assert!(
value.contains("original_value"),
"Value should contain original_value from prior_env: {}",
value
);
} else {
panic!("CUENV_TEST_PRIOR should be in env_vars delta since it was modified");
}
let mut prior_env = HashMap::new();
prior_env.insert("CUENV_TEST_REMOVE".to_string(), "bar".to_string());
let script = "unset CUENV_TEST_REMOVE";
let result = evaluate_shell_environment(script, &prior_env).await;
assert!(result.is_ok(), "Should evaluate unset script");
let (env_vars, removed) = result.unwrap();
assert!(
!env_vars.contains_key("CUENV_TEST_REMOVE"),
"Unset variable should not appear in env_vars"
);
assert!(
removed.contains(&"CUENV_TEST_REMOVE".to_string()),
"Unset variable should appear in removed_keys: {:?}",
removed
);
}
#[tokio::test]
async fn test_working_directory_isolation() {
let executor = HookExecutor::with_default_config().unwrap();
let temp_dir1 = TempDir::new().unwrap();
let temp_dir2 = TempDir::new().unwrap();
std::fs::write(temp_dir1.path().join("marker.txt"), "dir1").unwrap();
std::fs::write(temp_dir2.path().join("marker.txt"), "dir2").unwrap();
let hook1 = Hook {
order: 100,
propagate: false,
command: "cat".to_string(),
args: vec!["marker.txt".to_string()],
dir: Some(temp_dir1.path().to_string_lossy().to_string()),
inputs: vec![],
source: None,
};
let hook2 = Hook {
order: 100,
propagate: false,
command: "cat".to_string(),
args: vec!["marker.txt".to_string()],
dir: Some(temp_dir2.path().to_string_lossy().to_string()),
inputs: vec![],
source: None,
};
let result1 = executor.execute_single_hook(hook1).await.unwrap();
let result2 = executor.execute_single_hook(hook2).await.unwrap();
assert!(result1.success, "Hook 1 should succeed");
assert!(result2.success, "Hook 2 should succeed");
assert!(
result1.stdout.contains("dir1"),
"Hook 1 should read from dir1: {}",
result1.stdout
);
assert!(
result2.stdout.contains("dir2"),
"Hook 2 should read from dir2: {}",
result2.stdout
);
}
#[tokio::test]
async fn test_stderr_capture() {
let executor = HookExecutor::with_default_config().unwrap();
let hook = Hook {
order: 100,
propagate: false,
command: "bash".to_string(),
args: vec![
"-c".to_string(),
"echo 'to stdout'; echo 'to stderr' >&2".to_string(),
],
dir: None,
inputs: vec![],
source: None,
};
let result = executor.execute_single_hook(hook).await.unwrap();
assert!(result.success, "Hook should succeed");
assert!(
result.stdout.contains("to stdout"),
"Should capture stdout: {}",
result.stdout
);
assert!(
result.stderr.contains("to stderr"),
"Should capture stderr: {}",
result.stderr
);
}
#[tokio::test]
async fn test_binary_output_handling() {
let executor = HookExecutor::with_default_config().unwrap();
let hook = Hook {
order: 100,
propagate: false,
command: "bash".to_string(),
args: vec!["-c".to_string(), "printf 'hello\\x00world'".to_string()],
dir: None,
inputs: vec![],
source: None,
};
let result = executor.execute_single_hook(hook).await.unwrap();
assert!(result.success, "Hook should succeed");
assert!(
result.stdout.contains("hello") && result.stdout.contains("world"),
"Should contain text parts: {}",
result.stdout
);
}
#[tokio::test]
async fn test_capture_source_environment_returns_resulting_env() {
let hook = Hook {
order: 100,
propagate: false,
command: "bash".to_string(),
args: vec![
"-c".to_string(),
"printf '%s\n' 'export CUENV_RUNTIME_TEST=from_runtime'".to_string(),
],
dir: None,
inputs: vec![],
source: Some(true),
};
let environment = capture_source_environment(hook, &HashMap::new(), 5)
.await
.unwrap();
assert_eq!(
environment.get("CUENV_RUNTIME_TEST"),
Some(&"from_runtime".to_string())
);
}
}