use std::collections::VecDeque;
use std::io;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use serde::Serialize;
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::timeout;
const STDERR_RING_CAPACITY: usize = 4 * 1024;
const SPAWN_RETRY_BACKOFF_MS: &[u64] = &[10, 50, 200, 1_000];
#[cfg(unix)]
fn is_transient_spawn_errno(err: &io::Error) -> bool {
let Some(errno) = err.raw_os_error() else {
return false;
};
errno == libc::EAGAIN
|| errno == libc::ENOMEM
|| errno == libc::EMFILE
|| errno == libc::ETXTBSY
}
#[cfg(not(unix))]
fn is_transient_spawn_errno(_err: &io::Error) -> bool {
false
}
#[cfg(all(test, unix))]
fn test_force_eagain_remaining() -> u32 {
use std::sync::atomic::{AtomicU32, Ordering};
static REMAINING: AtomicU32 = AtomicU32::new(u32::MAX);
static INIT: std::sync::Once = std::sync::Once::new();
INIT.call_once(|| {
let n = std::env::var("AI_MEMORY_TEST_FORCE_SPAWN_EAGAIN")
.ok()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(0);
REMAINING.store(n, Ordering::SeqCst);
});
let cur = REMAINING.load(Ordering::SeqCst);
if cur == 0 {
return 0;
}
REMAINING.fetch_sub(1, Ordering::SeqCst);
cur
}
async fn spawn_with_transient_retry<F>(mut spawn_once: F) -> io::Result<Child>
where
F: FnMut() -> io::Result<Child>,
{
let total_attempts = SPAWN_RETRY_BACKOFF_MS.len() + 1;
let mut last_err: Option<io::Error> = None;
for attempt in 0..total_attempts {
if attempt > 0 {
let sleep_ms = SPAWN_RETRY_BACKOFF_MS[attempt - 1];
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
}
#[cfg(all(test, unix))]
{
if test_force_eagain_remaining() > 0 {
last_err = Some(io::Error::from_raw_os_error(libc::EAGAIN));
continue;
}
}
match spawn_once() {
Ok(child) => return Ok(child),
Err(e) if is_transient_spawn_errno(&e) => {
tracing::debug!(
attempt = attempt + 1,
max = total_attempts,
errno = ?e.raw_os_error(),
"hooks: transient spawn errno, retrying after backoff"
);
last_err = Some(e);
continue;
}
Err(e) => return Err(e),
}
}
Err(last_err.unwrap_or_else(|| io::Error::other("spawn retries exhausted with no error")))
}
type StderrRing = Arc<Mutex<VecDeque<u8>>>;
fn spawn_stderr_drain(mut stderr: ChildStderr, ring: StderrRing) -> JoinHandle<()> {
tokio::spawn(async move {
let mut chunk = [0u8; 4 * 1024];
loop {
match stderr.read(&mut chunk).await {
Ok(0) => break, Ok(n) => {
let mut guard = ring.lock().await;
guard.extend(&chunk[..n]);
while guard.len() > STDERR_RING_CAPACITY {
guard.pop_front();
}
}
Err(_) => break, }
}
})
}
fn redact_stderr_tail(tail: &str) -> String {
const SECRET_KEYWORDS: &[&str] = &[
"secret",
"password",
"passwd",
"token",
"api_key",
"apikey",
"bearer",
"private_key",
"private-key",
" auth",
"credential",
"cookie",
"x-amz-",
"aws_",
"ssh-rsa",
"ssh-ed25519",
"begin private",
"begin rsa",
"begin ec",
"begin openssh",
];
tail.lines()
.map(|line| {
let lower = line.to_ascii_lowercase();
if let Some(eq_pos) = line.find('=') {
let prefix: &str = &line[..eq_pos];
if !prefix.is_empty()
&& prefix
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_')
&& prefix
.chars()
.next()
.is_some_and(|c| c.is_ascii_alphabetic() || c == '_')
{
return format!("{prefix}=<redacted>");
}
}
if SECRET_KEYWORDS.iter().any(|kw| lower.contains(kw)) {
return "<redacted: matched secret-keyword filter>".to_string();
}
line.to_string()
})
.collect::<Vec<_>>()
.join("\n")
}
fn warn_stderr_tail(command: &std::path::Path, stage: &str, tail: &str) {
if !tail.is_empty() {
let redacted = redact_stderr_tail(tail);
tracing::warn!(
command = %command.display(),
stage,
stderr_tail = %redacted,
"hooks: daemon child stderr at failure (redacted — env-var-shaped values + secret-keyword lines stripped)"
);
}
}
async fn snapshot_stderr_ring(ring: &StderrRing) -> String {
let guard = ring.lock().await;
let bytes: Vec<u8> = guard.iter().copied().collect();
String::from_utf8_lossy(&bytes).into_owned()
}
use super::config::HookConfig;
use super::decision::HookDecision;
use super::events::HookEvent;
fn parse_decision_line(line: &str) -> Result<HookDecision> {
HookDecision::parse(line).map_err(|e| ExecutorError::Decode {
reason: e.to_string(),
})
}
#[derive(Debug)]
pub enum ExecutorError {
Spawn { command: String, source: io::Error },
Io(io::Error),
ChildExit { code: Option<i32>, stderr: String },
Decode { reason: String },
Timeout { ms: u64 },
DaemonUnavailable { attempts: u32 },
GovernanceRefused { command: String, reason: String },
}
impl std::fmt::Display for ExecutorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExecutorError::Spawn { command, source } => {
write!(f, "hook spawn failed for {command}: {source}")
}
ExecutorError::Io(e) => write!(f, "hook io error: {e}"),
ExecutorError::ChildExit { code, stderr: _ } => {
let code_str = code.map_or_else(|| "<signaled>".into(), |c| c.to_string());
write!(
f,
"hook child exited (code {code_str}); see operator log for redacted stderr"
)
}
ExecutorError::Decode { reason } => {
write!(f, "hook decision decode failed: {reason}")
}
ExecutorError::Timeout { ms } => {
write!(f, "hook timed out after {ms}ms")
}
ExecutorError::DaemonUnavailable { attempts } => {
write!(
f,
"hook daemon unavailable after {attempts} reconnect attempts"
)
}
ExecutorError::GovernanceRefused { command, reason } => {
write!(
f,
"hook spawn refused by governance for {command}: {reason}"
)
}
}
}
}
impl std::error::Error for ExecutorError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ExecutorError::Spawn { source, .. } | ExecutorError::Io(source) => Some(source),
_ => None,
}
}
}
impl From<io::Error> for ExecutorError {
fn from(value: io::Error) -> Self {
ExecutorError::Io(value)
}
}
pub type Result<T> = std::result::Result<T, ExecutorError>;
pub trait HookExecutor: Send + Sync {
fn fire<'a>(
&'a self,
event: HookEvent,
payload: Value,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<HookDecision>> + Send + 'a>>;
fn metrics(&self) -> ExecutorMetrics;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct ExecutorMetrics {
pub events_fired: u64,
pub events_dropped: u64,
pub mean_latency_us: u64,
}
#[derive(Debug, Default)]
struct MetricsCounters {
fired: AtomicU64,
dropped: AtomicU64,
latency_sum_us: AtomicU64,
latency_n: AtomicU64,
}
impl MetricsCounters {
fn record_fire(&self, latency: Duration) {
self.fired.fetch_add(1, Ordering::Relaxed);
let us = u64::try_from(latency.as_micros()).unwrap_or(u64::MAX);
self.latency_sum_us.fetch_add(us, Ordering::Relaxed);
self.latency_n.fetch_add(1, Ordering::Relaxed);
}
fn record_drop(&self) {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self) -> ExecutorMetrics {
let fired = self.fired.load(Ordering::Relaxed);
let dropped = self.dropped.load(Ordering::Relaxed);
let n = self.latency_n.load(Ordering::Relaxed);
let sum = self.latency_sum_us.load(Ordering::Relaxed);
let mean_latency_us = if n == 0 { 0 } else { sum / n };
ExecutorMetrics {
events_fired: fired,
events_dropped: dropped,
mean_latency_us,
}
}
}
#[derive(Debug, Serialize)]
struct FireEnvelope<'a> {
event: HookEvent,
payload: &'a Value,
}
pub struct ExecExecutor {
config: HookConfig,
metrics: MetricsCounters,
}
#[cfg(unix)]
fn os_str_lossless_for_wire_check(s: &std::ffi::OsStr) -> String {
use std::os::unix::ffi::OsStrExt;
String::from_utf8_lossy(s.as_bytes()).into_owned()
}
#[cfg(not(unix))]
fn os_str_lossless_for_wire_check(s: &std::ffi::OsStr) -> String {
s.to_string_lossy().into_owned()
}
impl ExecExecutor {
#[must_use]
pub fn new(config: HookConfig) -> Self {
Self {
config,
metrics: MetricsCounters::default(),
}
}
async fn fire_inner(&self, event: HookEvent, payload: Value) -> Result<HookDecision> {
let binary = os_str_lossless_for_wire_check(self.config.command.as_os_str());
let command_str = self.config.command.display().to_string();
let spawn_action = crate::governance::agent_action::AgentAction::ProcessSpawn {
binary,
args: Vec::new(),
};
if let Err(refusal) = crate::governance::wire_check::check(&spawn_action) {
return Err(ExecutorError::GovernanceRefused {
command: command_str,
reason: refusal.reason,
});
}
let envelope = FireEnvelope {
event,
payload: &payload,
};
let envelope_bytes = serde_json::to_vec(&envelope).map_err(|e| ExecutorError::Decode {
reason: format!("envelope encode: {e}"),
})?;
let command_path = self.config.command.clone();
let child = spawn_with_transient_retry(|| {
Command::new(&command_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
})
.await
.map_err(|source| ExecutorError::Spawn {
command: self.config.command.display().to_string(),
source,
})?;
let started = Instant::now();
let deadline = Duration::from_millis(u64::from(self.config.timeout_ms));
let command_str = self.config.command.display().to_string();
let driven = timeout(
deadline,
drive_exec_child(child, envelope_bytes, &command_str),
)
.await;
match driven {
Ok(Ok(decision)) => {
self.metrics.record_fire(started.elapsed());
Ok(decision)
}
Ok(Err(e)) => {
self.metrics.record_fire(started.elapsed());
Err(e)
}
Err(_elapsed) => {
self.metrics.record_drop();
Err(ExecutorError::Timeout {
ms: u64::from(self.config.timeout_ms),
})
}
}
}
}
impl HookExecutor for ExecExecutor {
fn fire<'a>(
&'a self,
event: HookEvent,
payload: Value,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<HookDecision>> + Send + 'a>>
{
Box::pin(self.fire_inner(event, payload))
}
fn metrics(&self) -> ExecutorMetrics {
self.metrics.snapshot()
}
}
async fn drive_exec_child(
mut child: Child,
envelope: Vec<u8>,
command: &str,
) -> Result<HookDecision> {
if let Some(mut stdin) = child.stdin.take() {
stdin.write_all(&envelope).await?;
stdin.write_all(b"\n").await?;
stdin.shutdown().await?;
}
let output = child.wait_with_output().await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
return Err(ExecutorError::ChildExit {
code: output.status.code(),
stderr,
});
}
if !output.stderr.is_empty() {
let trimmed_len = output.stderr.len().min(STDERR_RING_CAPACITY);
let start = output.stderr.len() - trimmed_len;
let stderr_tail = String::from_utf8_lossy(&output.stderr[start..]);
tracing::debug!(
command,
stderr_bytes = output.stderr.len(),
stderr = %stderr_tail,
"hooks: exec child wrote stderr on success path"
);
}
let stdout = String::from_utf8_lossy(&output.stdout);
let decision_line = stdout
.lines()
.filter(|l| !l.trim().is_empty())
.next_back()
.unwrap_or("");
parse_decision_line(decision_line)
}
pub struct DaemonExecutor {
config: HookConfig,
conn: Mutex<Option<DaemonConnection>>,
metrics: MetricsCounters,
}
struct DaemonConnection {
child: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
stderr_buf: StderrRing,
stderr_task: JoinHandle<()>,
}
impl DaemonConnection {
async fn stderr_tail(&self) -> String {
snapshot_stderr_ring(&self.stderr_buf).await
}
}
impl Drop for DaemonConnection {
fn drop(&mut self) {
self.stderr_task.abort();
}
}
impl DaemonExecutor {
#[must_use]
pub fn new(config: HookConfig) -> Self {
Self {
config,
conn: Mutex::new(None),
metrics: MetricsCounters::default(),
}
}
async fn fire_inner(&self, event: HookEvent, payload: Value) -> Result<HookDecision> {
let envelope = FireEnvelope {
event,
payload: &payload,
};
let mut envelope_bytes =
serde_json::to_vec(&envelope).map_err(|e| ExecutorError::Decode {
reason: format!("envelope encode: {e}"),
})?;
envelope_bytes.push(b'\n');
let started = Instant::now();
let deadline = Duration::from_millis(u64::from(self.config.timeout_ms));
let driven = timeout(deadline, self.exchange(&envelope_bytes)).await;
match driven {
Ok(Ok(decision)) => {
self.metrics.record_fire(started.elapsed());
Ok(decision)
}
Ok(Err(e)) => {
self.metrics.record_fire(started.elapsed());
Err(e)
}
Err(_elapsed) => {
self.metrics.record_drop();
let mut guard = self.conn.lock().await;
if let Some(conn) = guard.as_ref() {
let tail = conn.stderr_tail().await;
if !tail.is_empty() {
tracing::warn!(
command = %self.config.command.display(),
timeout_ms = self.config.timeout_ms,
stderr_tail = %tail,
"hooks: daemon child stderr at timeout"
);
}
}
*guard = None;
Err(ExecutorError::Timeout {
ms: u64::from(self.config.timeout_ms),
})
}
}
}
async fn exchange(&self, envelope: &[u8]) -> Result<HookDecision> {
let mut guard = self.conn.lock().await;
if guard.is_none() {
*guard = Some(self.connect_with_backoff().await?);
}
let conn = guard.as_mut().expect("connection just inserted");
if let Err(e) = conn.stdin.write_all(envelope).await {
let tail = conn.stderr_tail().await;
warn_stderr_tail(&self.config.command, "stdin write", &tail);
*guard = None;
return Err(ExecutorError::Io(e));
}
if let Err(e) = conn.stdin.flush().await {
let tail = conn.stderr_tail().await;
warn_stderr_tail(&self.config.command, "stdin flush", &tail);
*guard = None;
return Err(ExecutorError::Io(e));
}
let mut line = String::new();
match conn.stdout.read_line(&mut line).await {
Ok(0) => {
let tail = conn.stderr_tail().await;
let stderr = if tail.is_empty() {
"daemon child closed stdout".into()
} else {
format!("daemon child closed stdout; stderr tail: {tail}")
};
*guard = None;
Err(ExecutorError::ChildExit { code: None, stderr })
}
Ok(_) => match parse_decision_line(&line) {
Ok(d) => Ok(d),
Err(e) => {
let tail = conn.stderr_tail().await;
warn_stderr_tail(&self.config.command, "framing error", &tail);
*guard = None;
Err(e)
}
},
Err(e) => {
let tail = conn.stderr_tail().await;
warn_stderr_tail(&self.config.command, "stdout read", &tail);
*guard = None;
Err(ExecutorError::Io(e))
}
}
}
async fn connect_with_backoff(&self) -> Result<DaemonConnection> {
const MAX_ATTEMPTS: u32 = 5;
const BASE_BACKOFF_MS: u64 = 100;
const MAX_BACKOFF_MS: u64 = 5_000;
let mut last_err: Option<ExecutorError> = None;
for attempt in 0..MAX_ATTEMPTS {
if attempt > 0 {
let pow = 1u64 << (attempt - 1);
let backoff_ms = (BASE_BACKOFF_MS.saturating_mul(pow)).min(MAX_BACKOFF_MS);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
match self.spawn_one().await {
Ok(conn) => return Ok(conn),
Err(e) => {
tracing::warn!(
attempt = attempt + 1,
max = MAX_ATTEMPTS,
error = %e,
"hooks: daemon spawn attempt failed"
);
last_err = Some(e);
}
}
}
Err(last_err.unwrap_or(ExecutorError::DaemonUnavailable {
attempts: MAX_ATTEMPTS,
}))
}
async fn spawn_one(&self) -> Result<DaemonConnection> {
let command_str = self.config.command.display().to_string();
let spawn_action = crate::governance::agent_action::AgentAction::ProcessSpawn {
binary: command_str.clone(),
args: Vec::new(),
};
if let Err(refusal) = crate::governance::wire_check::check(&spawn_action) {
return Err(ExecutorError::GovernanceRefused {
command: command_str,
reason: refusal.reason,
});
}
let command_path = self.config.command.clone();
let mut child = spawn_with_transient_retry(|| {
Command::new(&command_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
})
.await
.map_err(|source| ExecutorError::Spawn {
command: self.config.command.display().to_string(),
source,
})?;
let stdin = child.stdin.take().ok_or_else(|| {
ExecutorError::Io(io::Error::new(
io::ErrorKind::BrokenPipe,
"child stdin not piped",
))
})?;
let stdout = child.stdout.take().ok_or_else(|| {
ExecutorError::Io(io::Error::new(
io::ErrorKind::BrokenPipe,
"child stdout not piped",
))
})?;
let stderr = child.stderr.take().ok_or_else(|| {
ExecutorError::Io(io::Error::new(
io::ErrorKind::BrokenPipe,
"child stderr not piped",
))
})?;
let stderr_buf: StderrRing =
Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_RING_CAPACITY)));
let stderr_task = spawn_stderr_drain(stderr, Arc::clone(&stderr_buf));
Ok(DaemonConnection {
child,
stdin,
stdout: BufReader::new(stdout),
stderr_buf,
stderr_task,
})
}
}
impl HookExecutor for DaemonExecutor {
fn fire<'a>(
&'a self,
event: HookEvent,
payload: Value,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<HookDecision>> + Send + 'a>>
{
Box::pin(self.fire_inner(event, payload))
}
fn metrics(&self) -> ExecutorMetrics {
self.metrics.snapshot()
}
}
impl Drop for DaemonExecutor {
fn drop(&mut self) {
if let Ok(mut guard) = self.conn.try_lock() {
if let Some(conn) = guard.as_mut() {
let _ = conn.child.start_kill();
}
}
}
}
pub struct ExecutorRegistry {
cache: Vec<(HookConfig, Arc<dyn HookExecutor>)>,
}
impl ExecutorRegistry {
#[must_use]
pub fn new() -> Self {
Self { cache: Vec::new() }
}
#[must_use]
pub fn from_hooks(hooks: &[HookConfig]) -> Self {
let mut me = Self::new();
for h in hooks {
let _ = me.get(h);
}
me
}
pub fn get(&mut self, hook: &HookConfig) -> Arc<dyn HookExecutor> {
if let Some((_, existing)) = self.cache.iter().find(|(cfg, _)| cfg == hook) {
return Arc::clone(existing);
}
let executor: Arc<dyn HookExecutor> = match hook.mode {
super::config::HookMode::Exec => Arc::new(ExecExecutor::new(hook.clone())),
super::config::HookMode::Daemon => Arc::new(DaemonExecutor::new(hook.clone())),
};
self.cache.push((hook.clone(), Arc::clone(&executor)));
executor
}
pub fn metrics(&self) -> Vec<(HookConfig, ExecutorMetrics)> {
self.cache
.iter()
.map(|(cfg, ex)| (cfg.clone(), ex.metrics()))
.collect()
}
#[must_use]
pub fn len(&self) -> usize {
self.cache.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.cache.is_empty()
}
}
impl Default for ExecutorRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hooks::config::HookMode;
fn cfg(mode: HookMode) -> HookConfig {
HookConfig {
event: HookEvent::PostStore,
command: std::path::PathBuf::from("/bin/true"),
priority: 0,
timeout_ms: 1_000,
mode,
enabled: true,
namespace: "*".into(),
fail_mode: crate::hooks::config::FailMode::Open,
}
}
#[test]
fn parse_decision_line_allow_default_on_empty() {
assert_eq!(parse_decision_line("").unwrap(), HookDecision::Allow);
assert_eq!(parse_decision_line(" ").unwrap(), HookDecision::Allow);
assert_eq!(parse_decision_line("{}").unwrap(), HookDecision::Allow);
}
#[test]
fn redact_stderr_strips_env_var_assignments() {
let raw = "AWS_SECRET_ACCESS_KEY=AKIA1234567890ABCDEF\nDATABASE_URL=postgres://u:p@h/db\nGITHUB_TOKEN=ghp_abcdef\nuser-message-fine\n";
let red = redact_stderr_tail(raw);
assert!(!red.contains("AKIA1234567890ABCDEF"));
assert!(!red.contains("ghp_abcdef"));
assert!(red.contains("AWS_SECRET_ACCESS_KEY=<redacted>"));
assert!(red.contains("GITHUB_TOKEN=<redacted>"));
assert!(red.contains("user-message-fine"));
}
#[test]
fn redact_stderr_drops_secret_keyword_lines() {
let raw = "Authorization: Bearer eyJ.fake.jwt\nset-cookie: session=abc\nmsg=normal\n";
let red = redact_stderr_tail(raw);
assert!(!red.contains("Bearer eyJ.fake.jwt"));
assert!(!red.contains("session=abc"));
}
#[test]
fn child_exit_display_excludes_stderr_content() {
let err = ExecutorError::ChildExit {
code: Some(1),
stderr: "AWS_SECRET_ACCESS_KEY=AKIA-secret-content".to_string(),
};
let s = err.to_string();
assert!(!s.contains("AKIA-secret-content"));
assert!(!s.contains("AWS_SECRET_ACCESS_KEY"));
assert!(s.contains("code 1"));
}
#[test]
fn parse_decision_line_allow_explicit() {
let d = parse_decision_line(r#"{"action":"allow"}"#).unwrap();
assert_eq!(d, HookDecision::Allow);
}
#[test]
fn parse_decision_line_deny_with_default_code() {
let d = parse_decision_line(r#"{"action":"deny","reason":"nope"}"#).unwrap();
match d {
HookDecision::Deny { reason, code } => {
assert_eq!(reason, "nope");
assert_eq!(code, 403);
}
other => panic!("expected Deny, got {other:?}"),
}
}
#[test]
fn parse_decision_line_deny_with_explicit_code() {
let d = parse_decision_line(r#"{"action":"deny","reason":"x","code":429}"#).unwrap();
match d {
HookDecision::Deny { code, .. } => assert_eq!(code, 429),
_ => panic!("expected Deny"),
}
}
#[test]
fn parse_decision_line_unknown_action_wraps_to_decode() {
let err = parse_decision_line(r#"{"action":"explode"}"#).unwrap_err();
match err {
ExecutorError::Decode { reason } => {
assert!(
reason.contains("unknown action"),
"decode reason should name the failure: {reason}"
);
}
other => panic!("expected Decode, got {other:?}"),
}
}
#[test]
fn parse_decision_line_modify_now_recognised() {
let d = parse_decision_line(r#"{"action":"modify","delta":{"priority":7}}"#).unwrap();
match d {
HookDecision::Modify(m) => assert_eq!(m.delta.priority, Some(7)),
other => panic!("expected Modify, got {other:?}"),
}
}
#[test]
fn metrics_counters_track_fired_dropped_and_mean() {
let m = MetricsCounters::default();
m.record_fire(Duration::from_micros(100));
m.record_fire(Duration::from_micros(300));
m.record_drop();
let snap = m.snapshot();
assert_eq!(snap.events_fired, 2);
assert_eq!(snap.events_dropped, 1);
assert_eq!(snap.mean_latency_us, 200);
}
#[test]
fn metrics_counters_zero_when_no_fires() {
let snap = MetricsCounters::default().snapshot();
assert_eq!(snap.events_fired, 0);
assert_eq!(snap.events_dropped, 0);
assert_eq!(snap.mean_latency_us, 0);
}
#[test]
fn registry_caches_per_hook_config() {
let mut reg = ExecutorRegistry::new();
let a = cfg(HookMode::Exec);
let b = cfg(HookMode::Exec);
let e1 = reg.get(&a);
let e2 = reg.get(&b);
assert_eq!(reg.len(), 1, "equal HookConfigs must dedupe");
assert!(Arc::ptr_eq(&e1, &e2), "same Arc on cache hit");
}
#[test]
fn registry_distinct_executors_for_distinct_modes() {
let mut reg = ExecutorRegistry::new();
let exec_cfg = cfg(HookMode::Exec);
let mut daemon_cfg = cfg(HookMode::Daemon);
daemon_cfg.priority = 99;
reg.get(&exec_cfg);
reg.get(&daemon_cfg);
assert_eq!(reg.len(), 2);
}
#[test]
fn registry_from_hooks_prepopulates() {
let hooks = vec![cfg(HookMode::Exec), {
let mut d = cfg(HookMode::Daemon);
d.priority = 1;
d
}];
let reg = ExecutorRegistry::from_hooks(&hooks);
assert_eq!(reg.len(), 2);
}
#[test]
fn registry_metrics_starts_at_zero() {
let mut reg = ExecutorRegistry::new();
let _ = reg.get(&cfg(HookMode::Exec));
let metrics = reg.metrics();
assert_eq!(metrics.len(), 1);
assert_eq!(metrics[0].1.events_fired, 0);
assert_eq!(metrics[0].1.events_dropped, 0);
}
#[test]
fn executor_error_display_formats_each_variant() {
let cases: Vec<ExecutorError> = vec![
ExecutorError::Spawn {
command: "/bin/x".into(),
source: io::Error::new(io::ErrorKind::NotFound, "no"),
},
ExecutorError::Io(io::Error::new(io::ErrorKind::Other, "boom")),
ExecutorError::ChildExit {
code: Some(42),
stderr: "stderr msg".into(),
},
ExecutorError::Decode {
reason: "bad json".into(),
},
ExecutorError::Timeout { ms: 1234 },
ExecutorError::DaemonUnavailable { attempts: 5 },
ExecutorError::GovernanceRefused {
command: "/usr/bin/cargo".into(),
reason: "R004: cargo forbidden on low-disk".into(),
},
];
for e in cases {
let s = e.to_string();
assert!(!s.is_empty(), "Display empty for {e:?}");
}
}
#[test]
fn executor_error_governance_refused_display_names_both_fields() {
let err = ExecutorError::GovernanceRefused {
command: "/opt/homebrew/bin/cargo".into(),
reason: "R004 cargo forbidden".into(),
};
let s = err.to_string();
assert!(
s.contains("/opt/homebrew/bin/cargo"),
"Display must surface the command path: {s}"
);
assert!(
s.contains("R004 cargo forbidden"),
"Display must surface the rule's reason: {s}"
);
assert!(
s.contains("refused by governance"),
"Display must carry the governance-refusal marker: {s}"
);
}
#[test]
fn executor_error_governance_refused_source_is_none() {
let err = ExecutorError::GovernanceRefused {
command: "/bin/x".into(),
reason: "denied".into(),
};
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn executor_error_from_io_error_wraps_into_io_variant() {
let io_err = io::Error::new(io::ErrorKind::PermissionDenied, "nope");
let err: ExecutorError = io_err.into();
assert!(matches!(err, ExecutorError::Io(_)));
let s = err.to_string();
assert!(s.contains("hook io error"));
assert!(std::error::Error::source(&err).is_some());
}
#[test]
fn executor_error_spawn_source_chain() {
let io_err = io::Error::new(io::ErrorKind::NotFound, "no such cmd");
let err = ExecutorError::Spawn {
command: "/bin/missing".into(),
source: io_err,
};
assert!(std::error::Error::source(&err).is_some());
let s = err.to_string();
assert!(s.contains("/bin/missing"));
assert!(s.contains("no such cmd"));
}
#[test]
fn executor_error_child_exit_with_signaled_code() {
let err = ExecutorError::ChildExit {
code: None,
stderr: "killed".into(),
};
let s = err.to_string();
assert!(s.contains("<signaled>"));
assert!(!s.contains("killed"));
assert!(s.contains("see operator log for redacted stderr"));
}
#[test]
fn executor_error_child_exit_stderr_is_truncated_for_display() {
let big = "x".repeat(1024);
let err = ExecutorError::ChildExit {
code: Some(1),
stderr: big,
};
let s = err.to_string();
assert!(s.len() < 1024);
}
#[test]
fn executor_error_decode_display_carries_reason() {
let err = ExecutorError::Decode {
reason: "bad parse".into(),
};
let s = err.to_string();
assert!(s.contains("decode failed"));
assert!(s.contains("bad parse"));
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn executor_error_timeout_display_carries_ms() {
let err = ExecutorError::Timeout { ms: 5000 };
let s = err.to_string();
assert!(s.contains("5000ms"));
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn executor_error_daemon_unavailable_carries_attempts() {
let err = ExecutorError::DaemonUnavailable { attempts: 7 };
let s = err.to_string();
assert!(s.contains("7"));
assert!(s.contains("reconnect attempts"));
assert!(std::error::Error::source(&err).is_none());
}
#[test]
fn executor_metrics_serialize_to_json() {
let m = ExecutorMetrics {
events_fired: 42,
events_dropped: 1,
mean_latency_us: 250,
};
let json = serde_json::to_string(&m).unwrap();
assert!(json.contains("\"events_fired\":42"));
assert!(json.contains("\"events_dropped\":1"));
assert!(json.contains("\"mean_latency_us\":250"));
}
#[test]
fn metrics_counters_overflow_safe_latency() {
let m = MetricsCounters::default();
m.record_fire(Duration::from_micros(u64::MAX));
m.record_fire(Duration::from_micros(0));
let snap = m.snapshot();
assert_eq!(snap.events_fired, 2);
}
#[test]
fn registry_default_is_empty_and_default_eq_new() {
let reg = ExecutorRegistry::default();
assert!(reg.is_empty());
assert_eq!(reg.len(), 0);
}
#[test]
fn parse_decision_line_modify_invalid_delta_wraps_to_decode() {
let err = parse_decision_line(r#"{"action":"modify","delta":99}"#).unwrap_err();
assert!(matches!(err, ExecutorError::Decode { .. }));
}
#[test]
fn parse_decision_line_array_payload_wraps_to_decode() {
let err = parse_decision_line(r#"[1,2,3]"#).unwrap_err();
match err {
ExecutorError::Decode { reason } => assert!(reason.contains("object")),
other => panic!("expected Decode, got {other:?}"),
}
}
#[cfg(unix)]
#[test]
fn issue_1207_is_transient_spawn_errno_classification() {
for errno in [libc::EAGAIN, libc::ENOMEM, libc::EMFILE, libc::ETXTBSY] {
let err = io::Error::from_raw_os_error(errno);
assert!(
is_transient_spawn_errno(&err),
"errno {errno} should be transient"
);
}
for errno in [libc::ENOENT, libc::EACCES, libc::ENOEXEC] {
let err = io::Error::from_raw_os_error(errno);
assert!(
!is_transient_spawn_errno(&err),
"errno {errno} must NOT be classified as transient"
);
}
assert!(!is_transient_spawn_errno(&io::Error::other("oops")));
}
#[cfg(unix)]
#[tokio::test(flavor = "current_thread")]
async fn issue_1207_spawn_retry_first_attempt_succeeds() {
let started = Instant::now();
let child = spawn_with_transient_retry(|| {
Command::new(if std::path::Path::new("/bin/true").exists() {
"/bin/true"
} else {
"/usr/bin/true"
})
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.kill_on_drop(true)
.spawn()
})
.await
.expect("first-attempt spawn should succeed");
drop(child);
assert!(
started.elapsed() < Duration::from_millis(10),
"first-attempt success must not pay any backoff"
);
}
#[cfg(unix)]
#[tokio::test(flavor = "current_thread")]
async fn issue_1207_spawn_retry_non_transient_errno_surfaces_immediately() {
let started = Instant::now();
let err = spawn_with_transient_retry(|| {
Command::new("/nonexistent/path/to/binary-xyz-1207")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.kill_on_drop(true)
.spawn()
})
.await
.expect_err("spawn of /nonexistent should fail");
assert_eq!(err.raw_os_error(), Some(libc::ENOENT));
assert!(
started.elapsed() < Duration::from_millis(10),
"non-transient errno must surface immediately, took {:?}",
started.elapsed()
);
}
#[cfg(unix)]
#[tokio::test(flavor = "current_thread")]
async fn issue_1207_spawn_retry_recovers_from_transient_eagain() {
use std::cell::Cell;
let attempt_count: Cell<u32> = Cell::new(0);
let started = Instant::now();
let child = spawn_with_transient_retry(|| {
let n = attempt_count.get();
attempt_count.set(n + 1);
if n < 2 {
Err(io::Error::from_raw_os_error(libc::EAGAIN))
} else {
Command::new(if std::path::Path::new("/bin/true").exists() {
"/bin/true"
} else {
"/usr/bin/true"
})
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.kill_on_drop(true)
.spawn()
}
})
.await
.expect("spawn should succeed after 2 EAGAIN retries");
drop(child);
assert_eq!(
attempt_count.get(),
3,
"closure called 3 times (2 EAGAIN + 1 success)"
);
let elapsed = started.elapsed();
assert!(
elapsed >= Duration::from_millis(55),
"should pay at least 10+50=60ms backoff, got {elapsed:?}"
);
assert!(
elapsed < Duration::from_millis(1_500),
"should not pay the full 1.26s ladder when success comes on attempt 3, got {elapsed:?}"
);
}
#[cfg(unix)]
#[tokio::test(flavor = "current_thread")]
async fn issue_1207_spawn_retry_exhaustion_surfaces_last_error() {
use std::cell::Cell;
let attempt_count: Cell<u32> = Cell::new(0);
let err = spawn_with_transient_retry(|| {
attempt_count.set(attempt_count.get() + 1);
Err::<Child, _>(io::Error::from_raw_os_error(libc::EMFILE))
})
.await
.expect_err("all attempts return EMFILE → helper must surface");
assert_eq!(err.raw_os_error(), Some(libc::EMFILE));
assert_eq!(
attempt_count.get(),
(SPAWN_RETRY_BACKOFF_MS.len() + 1) as u32,
"closure must be called total_attempts times before exhaustion"
);
}
}