ai_memory/hooks/executor.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// v0.7 Track G — Task G3: subprocess hook executor (exec + daemon modes).
5//
6// G1 (PR #554) shipped the `hooks.toml` schema + SIGHUP hot-reload
7// plumbing. G2 (PR #563) attached payload structs to every variant
8// of `HookEvent`. G3 wires the runtime: given a `HookConfig` and a
9// JSON payload, fire the configured subprocess and parse a
10// `HookDecision` back from its stdout.
11//
12// # Two modes
13//
14// * [`ExecExecutor`] — `tokio::process::Command` per fire. The
15// payload is written to stdin; stdin is then closed; the child's
16// stdout is read to EOF and parsed as a single JSON object.
17// Cheapest model to reason about; ideal for cold or low-rate
18// events (`pre_governance_decision`, `pre_archive`).
19//
20// * [`DaemonExecutor`] — one long-lived child per `HookConfig`.
21// Frames newline-delimited JSON over stdin/stdout (NDJSON, see
22// "Framing choice" below). Cheap per-fire amortized cost; the
23// right pick for hot events (`post_recall`, `post_search`) where
24// we need to preserve the v0.6.3 50ms recall budget. Reconnects
25// on child crash with exponential backoff (100ms → 5s, capped at
26// 5 attempts).
27//
28// # Framing choice — NDJSON
29//
30// Newline-delimited JSON. One JSON object per line, no embedded
31// newlines (`serde_json::to_writer` + `b'\n'`). Picked over
32// length-prefixed for three reasons:
33//
34// 1. Hook authors can `read_line()` from any language stdlib —
35// no varint or 4-byte-BE length to decode.
36// 2. The same stdio pipe is greppable / pipeable for debugging:
37// `tail -f /var/log/hook.log | jq` Just Works.
38// 3. Our payloads (`MemoryDelta`, `RecallResult`) never embed
39// raw newlines — `serde_json` encodes `\n` inside strings as
40// `\\n`, so the framing invariant holds without escaping
41// gymnastics on either side of the pipe.
42//
43// The trade-off is a single malformed line corrupts the rest of
44// the stream — but on the daemon path we already reconnect on
45// any framing error, so the operator-visible behaviour is the same
46// as a child crash: log + exponential backoff + retry.
47//
48// # Backpressure
49//
50// The daemon executor wraps each in-flight fire in a `tokio::time::timeout`
51// keyed off `HookConfig.timeout_ms`. If the child can't keep up, the
52// per-fire deadline trips and we drop the request with a `tracing::warn!`
53// + `events_dropped` counter bump. "Oldest first" is a property of the
54// single-flight serialization: each fire holds the connection mutex for
55// at most `timeout_ms`, so the queue ahead of a slow fire is bounded by
56// `timeout_ms × queue_depth` — which is exactly the deadline-drain shape
57// the prompt asked for.
58//
59// # G4 update — HookDecision lifted into `src/hooks/decision.rs`
60//
61// G3 shipped a local `Allow + Deny` stub of `HookDecision` here so
62// the executor had something to deserialize against. G4 replaces
63// the stub with the full four-variant enum
64// (`Allow / Modify(MemoryDelta) / Deny / AskUser`) in the
65// dedicated `decision.rs` module. This file now imports the
66// canonical type and routes parse errors through the executor's
67// `Decode` variant — failure modes the operator sees (warning
68// log + degrade-to-Allow on the dispatcher path) are unchanged.
69//
70// # Cross-references (G5/G6 shipped post-G3)
71//
72// * G5 chain ordering / first-deny-wins lives in `hooks/chain.rs`.
73// The executor here is the per-config fire primitive; `chain.rs`
74// composes a sequence of executors into the per-event chain.
75// * G6 per-event-class deadlines + multiplier path live in
76// `hooks/timeouts.rs`. This executor still honours
77// `HookConfig.timeout_ms` as the per-fire ceiling; the class
78// deadlines in `timeouts.rs` resolve the budget the chain hands
79// each fire (via the class-multiplier table).
80// * G7-G11 firing at the actual memory operation points is wired
81// through the per-event hook sites under `hooks/recall.rs`,
82// `hooks/pre_store/`, and `hooks/post_reflect/`, composed via the
83// chain entry points in `hooks/chain.rs`.
84
85use std::collections::VecDeque;
86use std::io;
87use std::process::Stdio;
88use std::sync::Arc;
89use std::sync::atomic::{AtomicU64, Ordering};
90use std::time::{Duration, Instant};
91
92use serde::Serialize;
93use serde_json::Value;
94use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
95use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
96use tokio::sync::Mutex;
97use tokio::task::JoinHandle;
98use tokio::time::timeout;
99
100/// Capacity of the per-child stderr ring buffer (in bytes). Sized to
101/// `4 KiB` — large enough to surface a typical Python/Node traceback
102/// (the diagnostic shape operators reach for first), small enough to
103/// keep the per-daemon overhead irrelevant. The drain task pops from
104/// the front whenever the buffer would exceed this on push, so the
105/// invariant is "the most recent 4 KiB of stderr is always retained".
106const STDERR_RING_CAPACITY: usize = 4 * 1024;
107
108// ---------------------------------------------------------------------------
109// Spawn-retry backoff (issue #1207) — handle transient fork(2) errnos
110// ---------------------------------------------------------------------------
111//
112// Under parallel test load on macOS (and on any kernel under fork/FD
113// pressure), `fork+exec` of a hook child can fail with one of three
114// transient errnos:
115//
116// * `EAGAIN` — kernel can't allocate a new process / hit RLIMIT_NPROC
117// (libc::EAGAIN: 35 on macOS, 11 on Linux).
118// * `ENOMEM` — kernel running low on memory (libc::ENOMEM = 12).
119// * `EMFILE` — process FD table full (libc::EMFILE = 24).
120//
121// We also keep the pre-existing `ETXTBSY` (26) retry inline below
122// because it's the same shape: transient, kernel-mediated, resolved
123// by a few-ms backoff. Issue #1207 added EAGAIN/ENOMEM/EMFILE to the
124// same retry loop so the executor stops surfacing transient kernel
125// pressure to callers as a hard `ExecutorError::Spawn` — which under
126// `FailMode::Open` silently degrades to `ChainResult::Allow` and
127// masquerades as a passing test when the child never actually ran
128// (the v0.7 G8 `on_index_eviction_fires_with_full_payload` flake).
129
130/// Spawn-retry backoff ladder. Each attempt sleeps for the listed
131/// duration BEFORE retrying; the first attempt has no pre-sleep.
132/// Total wall-clock budget across all retries is ~1.26s, which keeps
133/// the executor well under the typical hook `timeout_ms` ceiling
134/// (5_000ms in the v0.7 G8 test fixture).
135const SPAWN_RETRY_BACKOFF_MS: &[u64] = &[10, 50, 200, 1_000];
136
137/// Returns `true` iff the io::Error is a transient errno the kernel
138/// will likely recover from on a short backoff. On non-unix platforms
139/// always returns `false` (Windows doesn't expose these errnos; the
140/// caller still gets the original error via the standard Spawn path).
141#[cfg(unix)]
142fn is_transient_spawn_errno(err: &io::Error) -> bool {
143 let Some(errno) = err.raw_os_error() else {
144 return false;
145 };
146 // libc::EAGAIN, ENOMEM, EMFILE, ETXTBSY (the v0.6 race) — all
147 // transient under parallel-load conditions. ETXTBSY is the
148 // exec-races-write window that PR #563 originally guarded.
149 errno == libc::EAGAIN
150 || errno == libc::ENOMEM
151 || errno == libc::EMFILE
152 || errno == libc::ETXTBSY
153}
154
155#[cfg(not(unix))]
156fn is_transient_spawn_errno(_err: &io::Error) -> bool {
157 false
158}
159
160/// Fault-injection hook for `spawn_with_transient_retry` unit tests.
161/// When set (in test builds only), the next `expected` spawn attempts
162/// will be forced to return a synthesized EAGAIN regardless of what
163/// the inner closure would return. Each call decrements the counter
164/// by one; the closure's real result is returned once the counter
165/// hits zero.
166///
167/// Production callers never read this — the constant
168/// `AI_MEMORY_TEST_FORCE_SPAWN_EAGAIN` env var is the only ingress
169/// path and the env-var read itself is gated on `cfg(test)`. Even if
170/// the env var is set in a release binary, the helper short-circuits
171/// at compile time.
172#[cfg(all(test, unix))]
173fn test_force_eagain_remaining() -> u32 {
174 use std::sync::atomic::{AtomicU32, Ordering};
175 static REMAINING: AtomicU32 = AtomicU32::new(u32::MAX);
176 // Lazy-init from env on first call per process.
177 static INIT: std::sync::Once = std::sync::Once::new();
178 INIT.call_once(|| {
179 let n = std::env::var("AI_MEMORY_TEST_FORCE_SPAWN_EAGAIN")
180 .ok()
181 .and_then(|s| s.parse::<u32>().ok())
182 .unwrap_or(0);
183 REMAINING.store(n, Ordering::SeqCst);
184 });
185 let cur = REMAINING.load(Ordering::SeqCst);
186 if cur == 0 {
187 return 0;
188 }
189 REMAINING.fetch_sub(1, Ordering::SeqCst);
190 cur
191}
192
193/// Spawn a child with bounded retry on transient kernel errnos. The
194/// closure is invoked at least once and at most `SPAWN_RETRY_BACKOFF_MS.len() + 1`
195/// times. Between attempts we sleep per the backoff ladder. After
196/// exhausting retries we surface the original `io::Error` verbatim so
197/// the caller can wrap it in the appropriate `ExecutorError::Spawn`
198/// (or another variant) and operators still see the precise errno.
199///
200/// The closure is `FnMut(): io::Result<Child>` rather than
201/// `FnOnce` because each retry needs to invoke `Command::spawn()`
202/// afresh — `Command` is consumed by `spawn()` so the caller wraps
203/// the build-and-spawn pair in the closure.
204async fn spawn_with_transient_retry<F>(mut spawn_once: F) -> io::Result<Child>
205where
206 F: FnMut() -> io::Result<Child>,
207{
208 let total_attempts = SPAWN_RETRY_BACKOFF_MS.len() + 1;
209 let mut last_err: Option<io::Error> = None;
210 for attempt in 0..total_attempts {
211 if attempt > 0 {
212 // Backoff index is (attempt - 1) since attempt 0 has no
213 // pre-sleep. attempt 1 → BACKOFF_MS[0] = 10ms, etc.
214 let sleep_ms = SPAWN_RETRY_BACKOFF_MS[attempt - 1];
215 tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
216 }
217
218 // Test-only fault injection — synthesize EAGAIN N times then
219 // pass through. Compiled out of release builds entirely. Gated
220 // on `unix` because `libc::EAGAIN` lives in the cfg(unix)-only
221 // `libc` dep; on Windows the fault injection is a no-op (the
222 // transient-spawn-errno classifier is unix-only too, so there
223 // is nothing to fault-inject for).
224 #[cfg(all(test, unix))]
225 {
226 if test_force_eagain_remaining() > 0 {
227 last_err = Some(io::Error::from_raw_os_error(libc::EAGAIN));
228 continue;
229 }
230 }
231
232 match spawn_once() {
233 Ok(child) => return Ok(child),
234 Err(e) if is_transient_spawn_errno(&e) => {
235 tracing::debug!(
236 attempt = attempt + 1,
237 max = total_attempts,
238 errno = ?e.raw_os_error(),
239 "hooks: transient spawn errno, retrying after backoff"
240 );
241 last_err = Some(e);
242 continue;
243 }
244 Err(e) => return Err(e),
245 }
246 }
247 Err(last_err.unwrap_or_else(|| io::Error::other("spawn retries exhausted with no error")))
248}
249
250/// Bounded ring buffer used by the daemon-mode stderr drain. Wrapped
251/// in an `Arc<Mutex<…>>` so the drain task and the executor (which
252/// snapshots the contents on timeout / drop) can both reach it without
253/// fighting over ownership of the underlying `ChildStderr` handle.
254type StderrRing = Arc<Mutex<VecDeque<u8>>>;
255
256/// Spawn a background task that drains `stderr` into `ring`, bounding
257/// the retained bytes at [`STDERR_RING_CAPACITY`]. The task exits when
258/// the child closes its stderr (EOF) or on any read error — both
259/// terminal cases the executor will rediscover on the next exchange,
260/// so silent task exit here does not desync the caller.
261///
262/// Returning the `JoinHandle` lets the caller await/abort on shutdown
263/// if it wants to (the daemon executor stores it on the connection so
264/// the buffered bytes are still snapshot-able after the child dies).
265fn spawn_stderr_drain(mut stderr: ChildStderr, ring: StderrRing) -> JoinHandle<()> {
266 tokio::spawn(async move {
267 // 4 KiB read buffer matches the ring capacity — one read at
268 // most fills the ring, so the worst case is a single
269 // pop-from-front pass per chunk.
270 let mut chunk = [0u8; 4 * 1024];
271 loop {
272 match stderr.read(&mut chunk).await {
273 Ok(0) => break, // EOF — child closed stderr.
274 Ok(n) => {
275 let mut guard = ring.lock().await;
276 guard.extend(&chunk[..n]);
277 // Drop oldest bytes until we're back under cap.
278 // VecDeque's pop_front is O(1) amortised so this
279 // stays cheap even on a flood.
280 while guard.len() > STDERR_RING_CAPACITY {
281 guard.pop_front();
282 }
283 }
284 Err(_) => break, // Read error — surface as silent EOF; the
285 // executor will rediscover the failure on
286 // the next stdout exchange.
287 }
288 }
289 })
290}
291
292/// Redact patterns that look like secrets from a stderr tail before
293/// it reaches the operator log. Hook authors are already trusted at
294/// filesystem scope, but a hostile hook running `printenv >&2; exit 1`
295/// should not be able to exfiltrate environment variables (or other
296/// secret-shaped strings) into the operator log feed, which may be
297/// ingested by less-trusted aggregation systems.
298///
299/// Two filters: (1) replace the value half of any `VAR=value`
300/// assignment where the variable name is shell-identifier-shaped,
301/// (2) drop any line matching one of the well-known secret keywords.
302/// Conservative — favours over-redaction over leaking.
303fn redact_stderr_tail(tail: &str) -> String {
304 const SECRET_KEYWORDS: &[&str] = &[
305 "secret",
306 "password",
307 "passwd",
308 "token",
309 "api_key",
310 "apikey",
311 "bearer",
312 "private_key",
313 "private-key",
314 " auth",
315 "credential",
316 "cookie",
317 "x-amz-",
318 "aws_",
319 "ssh-rsa",
320 "ssh-ed25519",
321 "begin private",
322 "begin rsa",
323 "begin ec",
324 "begin openssh",
325 ];
326 tail.lines()
327 .map(|line| {
328 let lower = line.to_ascii_lowercase();
329 if let Some(eq_pos) = line.find('=') {
330 let prefix: &str = &line[..eq_pos];
331 if !prefix.is_empty()
332 && prefix
333 .chars()
334 .all(|c| c.is_ascii_alphanumeric() || c == '_')
335 && prefix
336 .chars()
337 .next()
338 .is_some_and(|c| c.is_ascii_alphabetic() || c == '_')
339 {
340 return format!("{prefix}=<redacted>");
341 }
342 }
343 if SECRET_KEYWORDS.iter().any(|kw| lower.contains(kw)) {
344 return "<redacted: matched secret-keyword filter>".to_string();
345 }
346 line.to_string()
347 })
348 .collect::<Vec<_>>()
349 .join("\n")
350}
351
352/// Emit a WARN log carrying the redacted stderr tail iff non-empty.
353/// Free function (rather than a method) so it can be called from the
354/// `exchange` failure arms without re-borrowing `&self` or the
355/// connection guard — the borrow-checker won't let us hold `conn`
356/// (a `&mut DaemonConnection` from `guard.as_mut()`) and call a
357/// method on `&self` that would also need to read `self.config` while
358/// `conn` is live.
359fn warn_stderr_tail(command: &std::path::Path, stage: &str, tail: &str) {
360 if !tail.is_empty() {
361 let redacted = redact_stderr_tail(tail);
362 tracing::warn!(
363 command = %command.display(),
364 stage,
365 stderr_tail = %redacted,
366 "hooks: daemon child stderr at failure (redacted — env-var-shaped values + secret-keyword lines stripped)"
367 );
368 }
369}
370
371/// Snapshot the current contents of a stderr ring as a UTF-8 string.
372/// Lossy decoding so a stray non-UTF-8 byte (a hook author's binary
373/// dump, an emoji split mid-sequence by the ring's pop_front) doesn't
374/// suppress the diagnostic — operators get the closest readable
375/// rendering of what the child actually wrote.
376async fn snapshot_stderr_ring(ring: &StderrRing) -> String {
377 let guard = ring.lock().await;
378 let bytes: Vec<u8> = guard.iter().copied().collect();
379 String::from_utf8_lossy(&bytes).into_owned()
380}
381
382use super::config::HookConfig;
383use super::decision::HookDecision;
384use super::events::HookEvent;
385
386/// Adapter from the G4 strict-parse path into the executor's
387/// existing `Decode` error surface. Keeps `drive_exec_child` /
388/// `exchange` callers using one error type while letting the
389/// dispatcher (G5) reach for `DecisionParseError`'s named
390/// variants when it wants to log the precise failure mode.
391fn parse_decision_line(line: &str) -> Result<HookDecision> {
392 HookDecision::parse(line).map_err(|e| ExecutorError::Decode {
393 reason: e.to_string(),
394 })
395}
396
397// ---------------------------------------------------------------------------
398// HookExecutor trait
399// ---------------------------------------------------------------------------
400
401/// Errors surfaced by the executor layer. Hand-rolled `Display +
402/// Error` per the v0.7 lesson (no `thiserror` in this crate's hot
403/// dependency tree).
404#[derive(Debug)]
405pub enum ExecutorError {
406 /// The configured `command` could not be spawned (missing
407 /// binary, permissions, etc.).
408 Spawn { command: String, source: io::Error },
409 /// I/O failure talking to the child's stdio pipes.
410 Io(io::Error),
411 /// The child returned non-zero or closed its stdout without
412 /// writing a decision.
413 ChildExit { code: Option<i32>, stderr: String },
414 /// The child wrote a payload we could not parse as a
415 /// [`HookDecision`].
416 Decode { reason: String },
417 /// The fire deadline (`HookConfig.timeout_ms`) elapsed before
418 /// the child returned a decision.
419 Timeout { ms: u64 },
420 /// The daemon child crashed or was unreachable after exhausting
421 /// the reconnect budget.
422 DaemonUnavailable { attempts: u32 },
423 /// v0.7.0 (issue #691 fold-1) — the governance pre-action wire
424 /// hook refused the `ProcessSpawn` action. `reason` carries the
425 /// operator-authored explanation from the matched rule. Surfaced
426 /// to the chain runner so the cascade policy can treat it as a
427 /// distinct outcome from a Spawn / Io error.
428 GovernanceRefused { command: String, reason: String },
429}
430
431impl std::fmt::Display for ExecutorError {
432 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433 match self {
434 ExecutorError::Spawn { command, source } => {
435 write!(f, "hook spawn failed for {command}: {source}")
436 }
437 ExecutorError::Io(e) => write!(f, "hook io error: {e}"),
438 ExecutorError::ChildExit { code, stderr: _ } => {
439 // P2 (#628 agent-5): the stderr tail is in the operator
440 // log (redacted via `redact_stderr_tail`) — do not
441 // include it here. `Display for ExecutorError` flows
442 // into `ChainResult::Deny.reason`, which is sent back
443 // to the JSON-RPC caller; surfacing raw stderr there
444 // is a credential-exfiltration vector for hostile
445 // hooks (`printenv >&2; exit 1`).
446 let code_str = code.map_or_else(|| "<signaled>".into(), |c| c.to_string());
447 write!(
448 f,
449 "hook child exited (code {code_str}); see operator log for redacted stderr"
450 )
451 }
452 ExecutorError::Decode { reason } => {
453 write!(f, "hook decision decode failed: {reason}")
454 }
455 ExecutorError::Timeout { ms } => {
456 write!(f, "hook timed out after {ms}ms")
457 }
458 ExecutorError::DaemonUnavailable { attempts } => {
459 write!(
460 f,
461 "hook daemon unavailable after {attempts} reconnect attempts"
462 )
463 }
464 ExecutorError::GovernanceRefused { command, reason } => {
465 write!(
466 f,
467 "hook spawn refused by governance for {command}: {reason}"
468 )
469 }
470 }
471 }
472}
473
474impl std::error::Error for ExecutorError {
475 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
476 match self {
477 ExecutorError::Spawn { source, .. } | ExecutorError::Io(source) => Some(source),
478 _ => None,
479 }
480 }
481}
482
483impl From<io::Error> for ExecutorError {
484 fn from(value: io::Error) -> Self {
485 ExecutorError::Io(value)
486 }
487}
488
489/// `Result` alias used across the executor surface.
490pub type Result<T> = std::result::Result<T, ExecutorError>;
491
492/// Trait every executor implementation satisfies. `fire` is the
493/// single hot-path method G5 will iterate over when stitching
494/// chains together.
495///
496/// `Send + Sync` is mandatory: the registry hands out
497/// `Arc<dyn HookExecutor>` and the chain runner (G5) drives fires
498/// from arbitrary tokio worker threads.
499pub trait HookExecutor: Send + Sync {
500 /// Fire the hook for `event` with `payload`. Returns the
501 /// child's [`HookDecision`] or an [`ExecutorError`] on
502 /// spawn / IO / decode / timeout failure.
503 ///
504 /// This is `async` via the `BoxFuture` shape because trait
505 /// objects + `async fn in trait` is still rough on stable
506 /// (the auto-trait inference for `Send` doesn't carry across
507 /// the dyn boundary). `BoxFuture<'_, Result<HookDecision>>`
508 /// is the same shape `tower::Service` settled on.
509 fn fire<'a>(
510 &'a self,
511 event: HookEvent,
512 payload: Value,
513 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<HookDecision>> + Send + 'a>>;
514
515 /// Snapshot of executor metrics. Surfaced by `ai-memory doctor
516 /// --tokens --hooks` (see `src/cli/doctor.rs`).
517 fn metrics(&self) -> ExecutorMetrics;
518}
519
520// ---------------------------------------------------------------------------
521// ExecutorMetrics — backpressure observability
522// ---------------------------------------------------------------------------
523
524/// Per-executor metrics surfaced by `ai-memory doctor`.
525///
526/// These are *snapshots*; the executor accumulates raw counters
527/// internally and projects to this struct on demand. See
528/// [`MetricsCounters`] for the live atomics.
529#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
530pub struct ExecutorMetrics {
531 pub events_fired: u64,
532 pub events_dropped: u64,
533 pub mean_latency_us: u64,
534}
535
536#[derive(Debug, Default)]
537struct MetricsCounters {
538 fired: AtomicU64,
539 dropped: AtomicU64,
540 latency_sum_us: AtomicU64,
541 latency_n: AtomicU64,
542}
543
544impl MetricsCounters {
545 fn record_fire(&self, latency: Duration) {
546 self.fired.fetch_add(1, Ordering::Relaxed);
547 let us = u64::try_from(latency.as_micros()).unwrap_or(u64::MAX);
548 self.latency_sum_us.fetch_add(us, Ordering::Relaxed);
549 self.latency_n.fetch_add(1, Ordering::Relaxed);
550 }
551
552 fn record_drop(&self) {
553 self.dropped.fetch_add(1, Ordering::Relaxed);
554 }
555
556 fn snapshot(&self) -> ExecutorMetrics {
557 let fired = self.fired.load(Ordering::Relaxed);
558 let dropped = self.dropped.load(Ordering::Relaxed);
559 let n = self.latency_n.load(Ordering::Relaxed);
560 let sum = self.latency_sum_us.load(Ordering::Relaxed);
561 let mean_latency_us = if n == 0 { 0 } else { sum / n };
562 ExecutorMetrics {
563 events_fired: fired,
564 events_dropped: dropped,
565 mean_latency_us,
566 }
567 }
568}
569
570// ---------------------------------------------------------------------------
571// FireEnvelope — wire shape sent to the child
572// ---------------------------------------------------------------------------
573
574/// JSON envelope written to a hook subprocess on every fire.
575///
576/// Keeping `event` separate from `payload` lets the child route
577/// without re-parsing the payload bag — useful for daemon mode,
578/// where one child might subscribe to several events.
579#[derive(Debug, Serialize)]
580struct FireEnvelope<'a> {
581 event: HookEvent,
582 payload: &'a Value,
583}
584
585// ---------------------------------------------------------------------------
586// ExecExecutor — subprocess per fire
587// ---------------------------------------------------------------------------
588
589/// Subprocess-per-fire executor. Spawns a fresh child for every
590/// event; closes stdin to signal "no more input"; reads stdout to
591/// EOF and parses a single [`HookDecision`].
592///
593/// Cheapest mental model. Right pick for low-rate events. Hot
594/// events should configure `mode = "daemon"` instead.
595pub struct ExecExecutor {
596 config: HookConfig,
597 metrics: MetricsCounters,
598}
599
600/// SEC-17 (Cluster D, issue #767) — convert an `OsStr` (the raw
601/// command path) into the wire-check `AgentAction::ProcessSpawn.binary`
602/// string with the highest fidelity available on the host. On Unix
603/// the underlying bytes are forwarded verbatim through
604/// `OsStrExt::as_bytes` + `String::from_utf8_lossy` (the lossy
605/// conversion is a substitution, not a truncation — every non-UTF-8
606/// byte becomes a U+FFFD REPLACEMENT CHARACTER, preserving the byte
607/// count so a downstream substring match cannot accidentally collide
608/// with a sanitised value). On Windows / wasm the standard
609/// `OsStr::to_string_lossy` path is the best the platform exposes.
610///
611/// Why not `display().to_string()` everywhere? `PathBuf::display` is
612/// designed for human-facing output and may collapse or reshape
613/// non-UTF-8 sequences depending on the platform. The matcher engine
614/// is a security boundary — it must see the same bytes the kernel
615/// will exec, with no platform-specific rewriting.
616#[cfg(unix)]
617fn os_str_lossless_for_wire_check(s: &std::ffi::OsStr) -> String {
618 use std::os::unix::ffi::OsStrExt;
619 // `from_utf8_lossy` substitutes invalid bytes with U+FFFD WITHOUT
620 // shrinking the byte length (each invalid byte becomes one
621 // replacement char), so a substring matcher sees a stable shape.
622 String::from_utf8_lossy(s.as_bytes()).into_owned()
623}
624
625/// Non-Unix fallback — `to_string_lossy` is the best the platform
626/// surface exposes. On Windows the path is WTF-8 internally so the
627/// lossy conversion is generally a no-op for legitimate paths.
628#[cfg(not(unix))]
629fn os_str_lossless_for_wire_check(s: &std::ffi::OsStr) -> String {
630 s.to_string_lossy().into_owned()
631}
632
633impl ExecExecutor {
634 #[must_use]
635 pub fn new(config: HookConfig) -> Self {
636 Self {
637 config,
638 metrics: MetricsCounters::default(),
639 }
640 }
641
642 async fn fire_inner(&self, event: HookEvent, payload: Value) -> Result<HookDecision> {
643 // v0.7.0 (issue #691 fold-1) — wire the ProcessSpawn governance
644 // gate BEFORE the Command::new(...).spawn() call. The closure
645 // installed by bootstrap_serve consults the governance_rules
646 // table for a refusal verdict (e.g. R004 — cargo forbidden on
647 // low-disk system). Refusal short-circuits cleanly with a
648 // typed ExecutorError::GovernanceRefused so the chain runner
649 // can apply the cascade policy without confusing it with a
650 // legitimate spawn IO failure.
651 //
652 // SEC-13 / SEC-17 (Cluster D, issue #767) — build the binary
653 // identifier from the raw `OsStr` (NOT
654 // `display().to_string()`, which is lossy on non-UTF-8 path
655 // injections). The lossy conversion is reserved for log
656 // surfaces / error messages where readability outweighs the
657 // fidelity loss. The matcher engine sees the full OS-string
658 // representation so a non-UTF-8 path injection does not
659 // sneak past a substring-match rule. We pass an empty argv
660 // here because the hook executor never spawns the child with
661 // positional args (HookConfig has no `args` field); the
662 // matcher's optional `args_contain` field is a no-op for the
663 // hooks path but ready for the next caller (CLI shell-out,
664 // skill-export, etc.) that adds positional args.
665 let binary = os_str_lossless_for_wire_check(self.config.command.as_os_str());
666 let command_str = self.config.command.display().to_string();
667 let spawn_action = crate::governance::agent_action::AgentAction::ProcessSpawn {
668 binary,
669 args: Vec::new(),
670 };
671 if let Err(refusal) = crate::governance::wire_check::check(&spawn_action) {
672 return Err(ExecutorError::GovernanceRefused {
673 command: command_str,
674 reason: refusal.reason,
675 });
676 }
677 let envelope = FireEnvelope {
678 event,
679 payload: &payload,
680 };
681 let envelope_bytes = serde_json::to_vec(&envelope).map_err(|e| ExecutorError::Decode {
682 reason: format!("envelope encode: {e}"),
683 })?;
684
685 // Spawn-retry-with-backoff (issue #1207). Covers:
686 // * Linux/macOS ETXTBSY ("Text file busy") — exec() races
687 // with another process that still holds the file open for
688 // write. The pre-#1207 ETXTBSY loop is now folded into
689 // `spawn_with_transient_retry`.
690 // * EAGAIN/ENOMEM/EMFILE — kernel under fork/FD pressure
691 // under parallel test load. The v0.7 G8 flake
692 // (`on_index_eviction_fires_with_full_payload`) was the
693 // trigger; pre-#1207 the executor surfaced EAGAIN to the
694 // caller, `FailMode::Open` masked it as `ChainResult::Allow`,
695 // and the child never actually ran.
696 let command_path = self.config.command.clone();
697 let child = spawn_with_transient_retry(|| {
698 Command::new(&command_path)
699 .stdin(Stdio::piped())
700 .stdout(Stdio::piped())
701 .stderr(Stdio::piped())
702 .kill_on_drop(true)
703 .spawn()
704 })
705 .await
706 .map_err(|source| ExecutorError::Spawn {
707 command: self.config.command.display().to_string(),
708 source,
709 })?;
710
711 let started = Instant::now();
712 let deadline = Duration::from_millis(u64::from(self.config.timeout_ms));
713 let command_str = self.config.command.display().to_string();
714
715 let driven = timeout(
716 deadline,
717 drive_exec_child(child, envelope_bytes, &command_str),
718 )
719 .await;
720
721 match driven {
722 Ok(Ok(decision)) => {
723 self.metrics.record_fire(started.elapsed());
724 Ok(decision)
725 }
726 Ok(Err(e)) => {
727 // Even on child error, we count the latency we
728 // actually paid — the budget is a wall-clock figure.
729 self.metrics.record_fire(started.elapsed());
730 Err(e)
731 }
732 Err(_elapsed) => {
733 self.metrics.record_drop();
734 // The Child has been moved into drive_exec_child; that
735 // future is dropped on timeout, which fires the
736 // `kill_on_drop` knob set above and reaps the process.
737 Err(ExecutorError::Timeout {
738 ms: u64::from(self.config.timeout_ms),
739 })
740 }
741 }
742 }
743}
744
745impl HookExecutor for ExecExecutor {
746 fn fire<'a>(
747 &'a self,
748 event: HookEvent,
749 payload: Value,
750 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<HookDecision>> + Send + 'a>>
751 {
752 Box::pin(self.fire_inner(event, payload))
753 }
754
755 fn metrics(&self) -> ExecutorMetrics {
756 self.metrics.snapshot()
757 }
758}
759
760async fn drive_exec_child(
761 mut child: Child,
762 envelope: Vec<u8>,
763 command: &str,
764) -> Result<HookDecision> {
765 // Write the envelope, then close stdin so the child knows it
766 // can finish. `take()` drops the handle on the way out.
767 if let Some(mut stdin) = child.stdin.take() {
768 stdin.write_all(&envelope).await?;
769 stdin.write_all(b"\n").await?;
770 stdin.shutdown().await?;
771 }
772
773 let output = child.wait_with_output().await?;
774 if !output.status.success() {
775 let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
776 return Err(ExecutorError::ChildExit {
777 code: output.status.code(),
778 stderr,
779 });
780 }
781
782 // H9 fix — surface stderr on the success path too. Operators
783 // debugging a flapping hook need to see whatever diagnostics the
784 // child wrote even when the decision parses cleanly; silently
785 // dropping stderr here was the v0.7.0 review #628 blocker. Cap
786 // the logged slice at the same `STDERR_RING_CAPACITY` the daemon
787 // path uses so an over-eager hook can't pin tracing's allocator.
788 if !output.stderr.is_empty() {
789 let trimmed_len = output.stderr.len().min(STDERR_RING_CAPACITY);
790 let start = output.stderr.len() - trimmed_len;
791 let stderr_tail = String::from_utf8_lossy(&output.stderr[start..]);
792 tracing::debug!(
793 command,
794 stderr_bytes = output.stderr.len(),
795 stderr = %stderr_tail,
796 "hooks: exec child wrote stderr on success path"
797 );
798 }
799
800 let stdout = String::from_utf8_lossy(&output.stdout);
801 // The child may write multiple newlines; the decision is the
802 // last non-empty line. Hooks that print debug output to stdout
803 // before their decision Just Work this way.
804 let decision_line = stdout
805 .lines()
806 .filter(|l| !l.trim().is_empty())
807 .next_back()
808 .unwrap_or("");
809 parse_decision_line(decision_line)
810}
811
812// ---------------------------------------------------------------------------
813// DaemonExecutor — long-lived child + NDJSON framing
814// ---------------------------------------------------------------------------
815
816/// Per-`HookConfig` long-lived child. One executor owns one child
817/// at a time; fires are serialized through a single connection
818/// mutex (NDJSON request → NDJSON response). On framing error or
819/// child exit, the connection is dropped and the next fire
820/// reconnects with exponential backoff.
821pub struct DaemonExecutor {
822 config: HookConfig,
823 conn: Mutex<Option<DaemonConnection>>,
824 metrics: MetricsCounters,
825}
826
827struct DaemonConnection {
828 child: Child,
829 stdin: ChildStdin,
830 stdout: BufReader<ChildStdout>,
831 /// Bounded ring of the most recent [`STDERR_RING_CAPACITY`] bytes
832 /// the child wrote to stderr. Populated by `stderr_drain` in the
833 /// background; snapshotted by the executor on timeout / error /
834 /// drop so operators see the child's diagnostics next to the
835 /// failure that surfaced them.
836 stderr_buf: StderrRing,
837 /// Handle to the stderr-drain task. Held so we can `abort()` it
838 /// when the connection is torn down — without this the task
839 /// would linger until the kernel closes stderr after the child's
840 /// reaping, which is racy under stress.
841 stderr_task: JoinHandle<()>,
842}
843
844impl DaemonConnection {
845 /// Snapshot the buffered stderr (lossy UTF-8) for inclusion in a
846 /// failure log. Only called on the slow / failure paths; the
847 /// happy path never touches the ring.
848 async fn stderr_tail(&self) -> String {
849 snapshot_stderr_ring(&self.stderr_buf).await
850 }
851}
852
853impl Drop for DaemonConnection {
854 fn drop(&mut self) {
855 // Abort the drain task — the ChildStderr it owns is going
856 // out of scope and the OS will close the pipe; no point
857 // waiting for the EOF roundtrip.
858 self.stderr_task.abort();
859 }
860}
861
862impl DaemonExecutor {
863 #[must_use]
864 pub fn new(config: HookConfig) -> Self {
865 Self {
866 config,
867 conn: Mutex::new(None),
868 metrics: MetricsCounters::default(),
869 }
870 }
871
872 async fn fire_inner(&self, event: HookEvent, payload: Value) -> Result<HookDecision> {
873 let envelope = FireEnvelope {
874 event,
875 payload: &payload,
876 };
877 let mut envelope_bytes =
878 serde_json::to_vec(&envelope).map_err(|e| ExecutorError::Decode {
879 reason: format!("envelope encode: {e}"),
880 })?;
881 envelope_bytes.push(b'\n');
882
883 let started = Instant::now();
884 let deadline = Duration::from_millis(u64::from(self.config.timeout_ms));
885
886 let driven = timeout(deadline, self.exchange(&envelope_bytes)).await;
887 match driven {
888 Ok(Ok(decision)) => {
889 self.metrics.record_fire(started.elapsed());
890 Ok(decision)
891 }
892 Ok(Err(e)) => {
893 self.metrics.record_fire(started.elapsed());
894 Err(e)
895 }
896 Err(_elapsed) => {
897 self.metrics.record_drop();
898 // Drop the connection so the next fire reconnects;
899 // a slow daemon may still write the response into
900 // the pipe after we've moved on, which would desync
901 // subsequent fires.
902 //
903 // H9 fix — before tearing down the connection, snapshot
904 // the buffered stderr so the operator log carries
905 // whatever the child was complaining about right up
906 // to the deadline. Without this, a hook that's
907 // genuinely failing inside its handler (panic
908 // backtrace on stderr, but no decision on stdout)
909 // would just look like a generic "Timeout" with no
910 // diagnostic context.
911 let mut guard = self.conn.lock().await;
912 if let Some(conn) = guard.as_ref() {
913 let tail = conn.stderr_tail().await;
914 if !tail.is_empty() {
915 tracing::warn!(
916 command = %self.config.command.display(),
917 timeout_ms = self.config.timeout_ms,
918 stderr_tail = %tail,
919 "hooks: daemon child stderr at timeout"
920 );
921 }
922 }
923 *guard = None;
924 Err(ExecutorError::Timeout {
925 ms: u64::from(self.config.timeout_ms),
926 })
927 }
928 }
929 }
930
931 /// Write one envelope, read one decision line. On any IO /
932 /// framing error the connection is dropped before returning so
933 /// the next fire goes through `connect_with_backoff`.
934 async fn exchange(&self, envelope: &[u8]) -> Result<HookDecision> {
935 let mut guard = self.conn.lock().await;
936 if guard.is_none() {
937 *guard = Some(self.connect_with_backoff().await?);
938 }
939 // Safe: just inserted if missing.
940 let conn = guard.as_mut().expect("connection just inserted");
941
942 if let Err(e) = conn.stdin.write_all(envelope).await {
943 // H9 fix — snapshot buffered stderr before tearing down
944 // so the operator log carries the child's diagnostics.
945 let tail = conn.stderr_tail().await;
946 warn_stderr_tail(&self.config.command, "stdin write", &tail);
947 *guard = None;
948 return Err(ExecutorError::Io(e));
949 }
950 if let Err(e) = conn.stdin.flush().await {
951 let tail = conn.stderr_tail().await;
952 warn_stderr_tail(&self.config.command, "stdin flush", &tail);
953 *guard = None;
954 return Err(ExecutorError::Io(e));
955 }
956
957 let mut line = String::new();
958 match conn.stdout.read_line(&mut line).await {
959 Ok(0) => {
960 // EOF — child closed its stdout (likely crashed).
961 // H9 fix — surface buffered stderr in the error so
962 // the operator sees the child's last words instead
963 // of a generic "closed stdout".
964 let tail = conn.stderr_tail().await;
965 let stderr = if tail.is_empty() {
966 "daemon child closed stdout".into()
967 } else {
968 format!("daemon child closed stdout; stderr tail: {tail}")
969 };
970 *guard = None;
971 Err(ExecutorError::ChildExit { code: None, stderr })
972 }
973 Ok(_) => match parse_decision_line(&line) {
974 Ok(d) => Ok(d),
975 Err(e) => {
976 // Framing error — reset the connection so the
977 // next fire doesn't read into a half-consumed
978 // envelope. Log buffered stderr at WARN so the
979 // operator can correlate the bad framing with
980 // any hook-side panic that produced it.
981 let tail = conn.stderr_tail().await;
982 warn_stderr_tail(&self.config.command, "framing error", &tail);
983 *guard = None;
984 Err(e)
985 }
986 },
987 Err(e) => {
988 let tail = conn.stderr_tail().await;
989 warn_stderr_tail(&self.config.command, "stdout read", &tail);
990 *guard = None;
991 Err(ExecutorError::Io(e))
992 }
993 }
994 }
995
996 /// Spawn the child with exponential backoff (100ms → 5s, max 5
997 /// attempts). Returns the connected handles or
998 /// [`ExecutorError::DaemonUnavailable`] on exhaustion.
999 async fn connect_with_backoff(&self) -> Result<DaemonConnection> {
1000 const MAX_ATTEMPTS: u32 = 5;
1001 const BASE_BACKOFF_MS: u64 = 100;
1002 const MAX_BACKOFF_MS: u64 = 5_000;
1003
1004 let mut last_err: Option<ExecutorError> = None;
1005 for attempt in 0..MAX_ATTEMPTS {
1006 if attempt > 0 {
1007 // Exponential backoff: 100, 200, 400, 800, 1600… capped.
1008 let pow = 1u64 << (attempt - 1);
1009 let backoff_ms = (BASE_BACKOFF_MS.saturating_mul(pow)).min(MAX_BACKOFF_MS);
1010 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
1011 }
1012 match self.spawn_one().await {
1013 Ok(conn) => return Ok(conn),
1014 Err(e) => {
1015 tracing::warn!(
1016 attempt = attempt + 1,
1017 max = MAX_ATTEMPTS,
1018 error = %e,
1019 "hooks: daemon spawn attempt failed"
1020 );
1021 last_err = Some(e);
1022 }
1023 }
1024 }
1025 Err(last_err.unwrap_or(ExecutorError::DaemonUnavailable {
1026 attempts: MAX_ATTEMPTS,
1027 }))
1028 }
1029
1030 async fn spawn_one(&self) -> Result<DaemonConnection> {
1031 // v0.7.0 (issue #691 fold-1) — same ProcessSpawn gate as the
1032 // exec-mode path above. Daemon-mode hooks spawn at most once
1033 // per (process, hook) so this fires at start-up rather than
1034 // per-event; a governance refusal here aborts the daemon
1035 // connection attempt cleanly.
1036 let command_str = self.config.command.display().to_string();
1037 let spawn_action = crate::governance::agent_action::AgentAction::ProcessSpawn {
1038 binary: command_str.clone(),
1039 args: Vec::new(),
1040 };
1041 if let Err(refusal) = crate::governance::wire_check::check(&spawn_action) {
1042 return Err(ExecutorError::GovernanceRefused {
1043 command: command_str,
1044 reason: refusal.reason,
1045 });
1046 }
1047 // Spawn-retry-with-backoff (issue #1207) for the daemon path
1048 // too. The outer `connect_with_backoff` retries on any
1049 // ExecutorError (5 attempts, 100ms → 5s); this inner loop
1050 // catches the narrow transient-errno class first so a brief
1051 // fork-pressure window doesn't even consume a reconnect slot.
1052 let command_path = self.config.command.clone();
1053 let mut child = spawn_with_transient_retry(|| {
1054 Command::new(&command_path)
1055 .stdin(Stdio::piped())
1056 .stdout(Stdio::piped())
1057 .stderr(Stdio::piped())
1058 .spawn()
1059 })
1060 .await
1061 .map_err(|source| ExecutorError::Spawn {
1062 command: self.config.command.display().to_string(),
1063 source,
1064 })?;
1065 let stdin = child.stdin.take().ok_or_else(|| {
1066 ExecutorError::Io(io::Error::new(
1067 io::ErrorKind::BrokenPipe,
1068 "child stdin not piped",
1069 ))
1070 })?;
1071 let stdout = child.stdout.take().ok_or_else(|| {
1072 ExecutorError::Io(io::Error::new(
1073 io::ErrorKind::BrokenPipe,
1074 "child stdout not piped",
1075 ))
1076 })?;
1077 // H9 fix — drain stderr in the background so a verbose hook
1078 // can't fill the OS pipe buffer (typically 64 KiB on Linux,
1079 // ~16 KiB on macOS) and deadlock the child on its next
1080 // `write(2)` to stderr — which would in turn deadlock us
1081 // waiting for the next NDJSON response on stdout.
1082 let stderr = child.stderr.take().ok_or_else(|| {
1083 ExecutorError::Io(io::Error::new(
1084 io::ErrorKind::BrokenPipe,
1085 "child stderr not piped",
1086 ))
1087 })?;
1088 let stderr_buf: StderrRing =
1089 Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_RING_CAPACITY)));
1090 let stderr_task = spawn_stderr_drain(stderr, Arc::clone(&stderr_buf));
1091 Ok(DaemonConnection {
1092 child,
1093 stdin,
1094 stdout: BufReader::new(stdout),
1095 stderr_buf,
1096 stderr_task,
1097 })
1098 }
1099}
1100
1101impl HookExecutor for DaemonExecutor {
1102 fn fire<'a>(
1103 &'a self,
1104 event: HookEvent,
1105 payload: Value,
1106 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<HookDecision>> + Send + 'a>>
1107 {
1108 Box::pin(self.fire_inner(event, payload))
1109 }
1110
1111 fn metrics(&self) -> ExecutorMetrics {
1112 self.metrics.snapshot()
1113 }
1114}
1115
1116impl Drop for DaemonExecutor {
1117 fn drop(&mut self) {
1118 // Best-effort: kill the child so a reload doesn't leak
1119 // long-lived processes. tokio::process::Child has a
1120 // `kill_on_drop` knob but we can't reach it from here
1121 // without the conn lock; this Drop is the belt to that
1122 // suspenders.
1123 if let Ok(mut guard) = self.conn.try_lock() {
1124 if let Some(conn) = guard.as_mut() {
1125 let _ = conn.child.start_kill();
1126 }
1127 }
1128 }
1129}
1130
1131// ---------------------------------------------------------------------------
1132// ExecutorRegistry
1133// ---------------------------------------------------------------------------
1134
1135/// Maps `HookConfig` → `Arc<dyn HookExecutor>`. Built once per
1136/// `hooks.toml` load (see `src/hooks/config.rs::spawn_reload_task`)
1137/// and held behind the same `Arc<RwLock<…>>` that owns the config
1138/// snapshot. G5's chain runner consumes the registry's outputs.
1139///
1140/// The cache is keyed on `HookConfig` (full struct equality) so two
1141/// different hooks pointing at the same binary still get distinct
1142/// executors — one daemon child per `[[hook]]` block, never shared.
1143/// Sharing would let one event's slow fire starve another's
1144/// connection mutex, which is the opposite of what daemon mode
1145/// buys us.
1146///
1147/// Backed by a `Vec<(HookConfig, …)>` rather than a `HashMap`:
1148/// `HookConfig` carries a `PathBuf` and a `String` (no `Hash`
1149/// derivation today), and the cache cardinality is bounded by the
1150/// number of `[[hook]]` blocks in `hooks.toml` — a linear scan over
1151/// a few dozen entries is dwarfed by the spawn cost it gates.
1152pub struct ExecutorRegistry {
1153 cache: Vec<(HookConfig, Arc<dyn HookExecutor>)>,
1154}
1155
1156impl ExecutorRegistry {
1157 #[must_use]
1158 pub fn new() -> Self {
1159 Self { cache: Vec::new() }
1160 }
1161
1162 /// Build a registry pre-populated with one executor per entry
1163 /// in `hooks`. Convenience for the bootstrap path; callers
1164 /// driving the SIGHUP reload should call [`Self::get`] lazily
1165 /// instead so dropping a `[[hook]]` from the config tears down
1166 /// its long-lived child.
1167 #[must_use]
1168 pub fn from_hooks(hooks: &[HookConfig]) -> Self {
1169 let mut me = Self::new();
1170 for h in hooks {
1171 let _ = me.get(h);
1172 }
1173 me
1174 }
1175
1176 /// Return the executor for `hook`, constructing one on first
1177 /// touch. Subsequent calls with an *equal* `HookConfig` return
1178 /// the same `Arc`.
1179 pub fn get(&mut self, hook: &HookConfig) -> Arc<dyn HookExecutor> {
1180 if let Some((_, existing)) = self.cache.iter().find(|(cfg, _)| cfg == hook) {
1181 return Arc::clone(existing);
1182 }
1183 let executor: Arc<dyn HookExecutor> = match hook.mode {
1184 super::config::HookMode::Exec => Arc::new(ExecExecutor::new(hook.clone())),
1185 super::config::HookMode::Daemon => Arc::new(DaemonExecutor::new(hook.clone())),
1186 };
1187 self.cache.push((hook.clone(), Arc::clone(&executor)));
1188 executor
1189 }
1190
1191 /// Iterate `(HookConfig, ExecutorMetrics)` pairs. `ai-memory
1192 /// doctor --tokens --hooks` calls this to render the
1193 /// per-executor backpressure table.
1194 pub fn metrics(&self) -> Vec<(HookConfig, ExecutorMetrics)> {
1195 self.cache
1196 .iter()
1197 .map(|(cfg, ex)| (cfg.clone(), ex.metrics()))
1198 .collect()
1199 }
1200
1201 /// Number of cached executors. Cheap accessor for tests.
1202 #[must_use]
1203 pub fn len(&self) -> usize {
1204 self.cache.len()
1205 }
1206
1207 /// Whether the registry is empty.
1208 #[must_use]
1209 pub fn is_empty(&self) -> bool {
1210 self.cache.is_empty()
1211 }
1212}
1213
1214impl Default for ExecutorRegistry {
1215 fn default() -> Self {
1216 Self::new()
1217 }
1218}
1219
1220// ---------------------------------------------------------------------------
1221// Tests — unit only; the integration tests live in
1222// `tests/hooks_executor_test.rs` so they can spawn real subprocesses.
1223// ---------------------------------------------------------------------------
1224
1225#[cfg(test)]
1226mod tests {
1227 use super::*;
1228 use crate::hooks::config::HookMode;
1229
1230 fn cfg(mode: HookMode) -> HookConfig {
1231 HookConfig {
1232 event: HookEvent::PostStore,
1233 command: std::path::PathBuf::from("/bin/true"),
1234 priority: 0,
1235 timeout_ms: 1_000,
1236 mode,
1237 enabled: true,
1238 namespace: "*".into(),
1239 fail_mode: crate::hooks::config::FailMode::Open,
1240 }
1241 }
1242
1243 // The G3 stub's parse path now lives in `decision.rs`. These
1244 // tests cover the executor-side adapter (`parse_decision_line`)
1245 // which wraps `DecisionParseError` into `ExecutorError::Decode`
1246 // — the failure mode that surfaces on the daemon stdout path.
1247
1248 #[test]
1249 fn parse_decision_line_allow_default_on_empty() {
1250 assert_eq!(parse_decision_line("").unwrap(), HookDecision::Allow);
1251 assert_eq!(parse_decision_line(" ").unwrap(), HookDecision::Allow);
1252 assert_eq!(parse_decision_line("{}").unwrap(), HookDecision::Allow);
1253 }
1254
1255 // P2 (#628 agent-5) — stderr secret-redaction.
1256 #[test]
1257 fn redact_stderr_strips_env_var_assignments() {
1258 let raw = "AWS_SECRET_ACCESS_KEY=AKIA1234567890ABCDEF\nDATABASE_URL=postgres://u:p@h/db\nGITHUB_TOKEN=ghp_abcdef\nuser-message-fine\n";
1259 let red = redact_stderr_tail(raw);
1260 assert!(!red.contains("AKIA1234567890ABCDEF"));
1261 assert!(!red.contains("ghp_abcdef"));
1262 assert!(red.contains("AWS_SECRET_ACCESS_KEY=<redacted>"));
1263 assert!(red.contains("GITHUB_TOKEN=<redacted>"));
1264 // Unrelated lines pass through.
1265 assert!(red.contains("user-message-fine"));
1266 }
1267
1268 #[test]
1269 fn redact_stderr_drops_secret_keyword_lines() {
1270 let raw = "Authorization: Bearer eyJ.fake.jwt\nset-cookie: session=abc\nmsg=normal\n";
1271 let red = redact_stderr_tail(raw);
1272 assert!(!red.contains("Bearer eyJ.fake.jwt"));
1273 assert!(!red.contains("session=abc"));
1274 }
1275
1276 #[test]
1277 fn child_exit_display_excludes_stderr_content() {
1278 let err = ExecutorError::ChildExit {
1279 code: Some(1),
1280 stderr: "AWS_SECRET_ACCESS_KEY=AKIA-secret-content".to_string(),
1281 };
1282 let s = err.to_string();
1283 // Stderr content must NOT flow into the user-visible Display
1284 // (which becomes ChainResult::Deny.reason → JSON-RPC caller).
1285 assert!(!s.contains("AKIA-secret-content"));
1286 assert!(!s.contains("AWS_SECRET_ACCESS_KEY"));
1287 // Exit code IS surfaced — operator log carries the full
1288 // (redacted) stderr separately.
1289 assert!(s.contains("code 1"));
1290 }
1291
1292 #[test]
1293 fn parse_decision_line_allow_explicit() {
1294 let d = parse_decision_line(r#"{"action":"allow"}"#).unwrap();
1295 assert_eq!(d, HookDecision::Allow);
1296 }
1297
1298 #[test]
1299 fn parse_decision_line_deny_with_default_code() {
1300 let d = parse_decision_line(r#"{"action":"deny","reason":"nope"}"#).unwrap();
1301 match d {
1302 HookDecision::Deny { reason, code } => {
1303 assert_eq!(reason, "nope");
1304 assert_eq!(code, 403);
1305 }
1306 other => panic!("expected Deny, got {other:?}"),
1307 }
1308 }
1309
1310 #[test]
1311 fn parse_decision_line_deny_with_explicit_code() {
1312 let d = parse_decision_line(r#"{"action":"deny","reason":"x","code":429}"#).unwrap();
1313 match d {
1314 HookDecision::Deny { code, .. } => assert_eq!(code, 429),
1315 _ => panic!("expected Deny"),
1316 }
1317 }
1318
1319 #[test]
1320 fn parse_decision_line_unknown_action_wraps_to_decode() {
1321 // G4 recognises `modify` so the canonical "unknown action"
1322 // case becomes a deliberately bogus discriminator.
1323 let err = parse_decision_line(r#"{"action":"explode"}"#).unwrap_err();
1324 match err {
1325 ExecutorError::Decode { reason } => {
1326 assert!(
1327 reason.contains("unknown action"),
1328 "decode reason should name the failure: {reason}"
1329 );
1330 }
1331 other => panic!("expected Decode, got {other:?}"),
1332 }
1333 }
1334
1335 #[test]
1336 fn parse_decision_line_modify_now_recognised() {
1337 // G3's stub rejected `modify`; G4 lifts it into the wire
1338 // contract, so the executor must round-trip it cleanly.
1339 let d = parse_decision_line(r#"{"action":"modify","delta":{"priority":7}}"#).unwrap();
1340 match d {
1341 HookDecision::Modify(m) => assert_eq!(m.delta.priority, Some(7)),
1342 other => panic!("expected Modify, got {other:?}"),
1343 }
1344 }
1345
1346 #[test]
1347 fn metrics_counters_track_fired_dropped_and_mean() {
1348 let m = MetricsCounters::default();
1349 m.record_fire(Duration::from_micros(100));
1350 m.record_fire(Duration::from_micros(300));
1351 m.record_drop();
1352 let snap = m.snapshot();
1353 assert_eq!(snap.events_fired, 2);
1354 assert_eq!(snap.events_dropped, 1);
1355 assert_eq!(snap.mean_latency_us, 200);
1356 }
1357
1358 #[test]
1359 fn metrics_counters_zero_when_no_fires() {
1360 let snap = MetricsCounters::default().snapshot();
1361 assert_eq!(snap.events_fired, 0);
1362 assert_eq!(snap.events_dropped, 0);
1363 assert_eq!(snap.mean_latency_us, 0);
1364 }
1365
1366 #[test]
1367 fn registry_caches_per_hook_config() {
1368 let mut reg = ExecutorRegistry::new();
1369 let a = cfg(HookMode::Exec);
1370 let b = cfg(HookMode::Exec);
1371 let e1 = reg.get(&a);
1372 let e2 = reg.get(&b);
1373 assert_eq!(reg.len(), 1, "equal HookConfigs must dedupe");
1374 assert!(Arc::ptr_eq(&e1, &e2), "same Arc on cache hit");
1375 }
1376
1377 #[test]
1378 fn registry_distinct_executors_for_distinct_modes() {
1379 let mut reg = ExecutorRegistry::new();
1380 let exec_cfg = cfg(HookMode::Exec);
1381 let mut daemon_cfg = cfg(HookMode::Daemon);
1382 // Bump priority so the configs are unequal even though
1383 // command path is identical.
1384 daemon_cfg.priority = 99;
1385 reg.get(&exec_cfg);
1386 reg.get(&daemon_cfg);
1387 assert_eq!(reg.len(), 2);
1388 }
1389
1390 #[test]
1391 fn registry_from_hooks_prepopulates() {
1392 let hooks = vec![cfg(HookMode::Exec), {
1393 let mut d = cfg(HookMode::Daemon);
1394 d.priority = 1;
1395 d
1396 }];
1397 let reg = ExecutorRegistry::from_hooks(&hooks);
1398 assert_eq!(reg.len(), 2);
1399 }
1400
1401 #[test]
1402 fn registry_metrics_starts_at_zero() {
1403 let mut reg = ExecutorRegistry::new();
1404 let _ = reg.get(&cfg(HookMode::Exec));
1405 let metrics = reg.metrics();
1406 assert_eq!(metrics.len(), 1);
1407 assert_eq!(metrics[0].1.events_fired, 0);
1408 assert_eq!(metrics[0].1.events_dropped, 0);
1409 }
1410
1411 #[test]
1412 fn executor_error_display_formats_each_variant() {
1413 let cases: Vec<ExecutorError> = vec![
1414 ExecutorError::Spawn {
1415 command: "/bin/x".into(),
1416 source: io::Error::new(io::ErrorKind::NotFound, "no"),
1417 },
1418 ExecutorError::Io(io::Error::new(io::ErrorKind::Other, "boom")),
1419 ExecutorError::ChildExit {
1420 code: Some(42),
1421 stderr: "stderr msg".into(),
1422 },
1423 ExecutorError::Decode {
1424 reason: "bad json".into(),
1425 },
1426 ExecutorError::Timeout { ms: 1234 },
1427 ExecutorError::DaemonUnavailable { attempts: 5 },
1428 ExecutorError::GovernanceRefused {
1429 command: "/usr/bin/cargo".into(),
1430 reason: "R004: cargo forbidden on low-disk".into(),
1431 },
1432 ];
1433 for e in cases {
1434 let s = e.to_string();
1435 assert!(!s.is_empty(), "Display empty for {e:?}");
1436 }
1437 }
1438
1439 #[test]
1440 fn executor_error_governance_refused_display_names_both_fields() {
1441 // v0.7.0 (issue #691 fold-1) — pin the user-visible message
1442 // shape for the PE-1 wire-point refusal so an operator-facing
1443 // log line carries the command path AND the rule's authored
1444 // reason (no truncation, no PII rewrite). The chain runner's
1445 // cascade policy may classify on the prefix "hook spawn refused
1446 // by governance" — change the wording here in lockstep.
1447 let err = ExecutorError::GovernanceRefused {
1448 command: "/opt/homebrew/bin/cargo".into(),
1449 reason: "R004 cargo forbidden".into(),
1450 };
1451 let s = err.to_string();
1452 assert!(
1453 s.contains("/opt/homebrew/bin/cargo"),
1454 "Display must surface the command path: {s}"
1455 );
1456 assert!(
1457 s.contains("R004 cargo forbidden"),
1458 "Display must surface the rule's reason: {s}"
1459 );
1460 assert!(
1461 s.contains("refused by governance"),
1462 "Display must carry the governance-refusal marker: {s}"
1463 );
1464 }
1465
1466 #[test]
1467 fn executor_error_governance_refused_source_is_none() {
1468 // GovernanceRefused has no underlying io error or cause —
1469 // `Error::source` must return `None` so the anyhow chain
1470 // doesn't accidentally show an empty "caused by:" line in
1471 // operator logs. The Spawn / Io variants are the only ones
1472 // that carry sources; this pins the negative case so a future
1473 // refactor that adds a wrapped error here is forced to update
1474 // the test explicitly.
1475 let err = ExecutorError::GovernanceRefused {
1476 command: "/bin/x".into(),
1477 reason: "denied".into(),
1478 };
1479 assert!(std::error::Error::source(&err).is_none());
1480 }
1481
1482 #[test]
1483 fn executor_error_from_io_error_wraps_into_io_variant() {
1484 let io_err = io::Error::new(io::ErrorKind::PermissionDenied, "nope");
1485 let err: ExecutorError = io_err.into();
1486 assert!(matches!(err, ExecutorError::Io(_)));
1487 let s = err.to_string();
1488 assert!(s.contains("hook io error"));
1489 assert!(std::error::Error::source(&err).is_some());
1490 }
1491
1492 #[test]
1493 fn executor_error_spawn_source_chain() {
1494 let io_err = io::Error::new(io::ErrorKind::NotFound, "no such cmd");
1495 let err = ExecutorError::Spawn {
1496 command: "/bin/missing".into(),
1497 source: io_err,
1498 };
1499 assert!(std::error::Error::source(&err).is_some());
1500 let s = err.to_string();
1501 assert!(s.contains("/bin/missing"));
1502 assert!(s.contains("no such cmd"));
1503 }
1504
1505 #[test]
1506 fn executor_error_child_exit_with_signaled_code() {
1507 let err = ExecutorError::ChildExit {
1508 code: None,
1509 stderr: "killed".into(),
1510 };
1511 let s = err.to_string();
1512 assert!(s.contains("<signaled>"));
1513 // P2 (#628 agent-5) / release/v0.7.0 cbe934c: stderr is REDACTED out
1514 // of the Display impl to avoid credential-exfiltration via hostile
1515 // hooks (`printenv >&2; exit 1`). The redacted tail still reaches
1516 // the operator log via `log_redacted_stderr_at_failure`, but it
1517 // must NOT appear in the caller-facing reason string.
1518 assert!(!s.contains("killed"));
1519 assert!(s.contains("see operator log for redacted stderr"));
1520 }
1521
1522 #[test]
1523 fn executor_error_child_exit_stderr_is_truncated_for_display() {
1524 let big = "x".repeat(1024);
1525 let err = ExecutorError::ChildExit {
1526 code: Some(1),
1527 stderr: big,
1528 };
1529 let s = err.to_string();
1530 // Display previews at most 256 chars of stderr.
1531 // The total display is "hook child exited (code 1): " (28) + up to 256 chars.
1532 assert!(s.len() < 1024);
1533 }
1534
1535 #[test]
1536 fn executor_error_decode_display_carries_reason() {
1537 let err = ExecutorError::Decode {
1538 reason: "bad parse".into(),
1539 };
1540 let s = err.to_string();
1541 assert!(s.contains("decode failed"));
1542 assert!(s.contains("bad parse"));
1543 assert!(std::error::Error::source(&err).is_none());
1544 }
1545
1546 #[test]
1547 fn executor_error_timeout_display_carries_ms() {
1548 let err = ExecutorError::Timeout { ms: 5000 };
1549 let s = err.to_string();
1550 assert!(s.contains("5000ms"));
1551 assert!(std::error::Error::source(&err).is_none());
1552 }
1553
1554 #[test]
1555 fn executor_error_daemon_unavailable_carries_attempts() {
1556 let err = ExecutorError::DaemonUnavailable { attempts: 7 };
1557 let s = err.to_string();
1558 assert!(s.contains("7"));
1559 assert!(s.contains("reconnect attempts"));
1560 assert!(std::error::Error::source(&err).is_none());
1561 }
1562
1563 #[test]
1564 fn executor_metrics_serialize_to_json() {
1565 let m = ExecutorMetrics {
1566 events_fired: 42,
1567 events_dropped: 1,
1568 mean_latency_us: 250,
1569 };
1570 let json = serde_json::to_string(&m).unwrap();
1571 assert!(json.contains("\"events_fired\":42"));
1572 assert!(json.contains("\"events_dropped\":1"));
1573 assert!(json.contains("\"mean_latency_us\":250"));
1574 }
1575
1576 #[test]
1577 fn metrics_counters_overflow_safe_latency() {
1578 let m = MetricsCounters::default();
1579 // Use a huge duration; record_fire clamps to u64::MAX via try_from.
1580 m.record_fire(Duration::from_micros(u64::MAX));
1581 m.record_fire(Duration::from_micros(0));
1582 let snap = m.snapshot();
1583 assert_eq!(snap.events_fired, 2);
1584 }
1585
1586 #[test]
1587 fn registry_default_is_empty_and_default_eq_new() {
1588 let reg = ExecutorRegistry::default();
1589 assert!(reg.is_empty());
1590 assert_eq!(reg.len(), 0);
1591 }
1592
1593 #[test]
1594 fn parse_decision_line_modify_invalid_delta_wraps_to_decode() {
1595 let err = parse_decision_line(r#"{"action":"modify","delta":99}"#).unwrap_err();
1596 assert!(matches!(err, ExecutorError::Decode { .. }));
1597 }
1598
1599 #[test]
1600 fn parse_decision_line_array_payload_wraps_to_decode() {
1601 let err = parse_decision_line(r#"[1,2,3]"#).unwrap_err();
1602 match err {
1603 ExecutorError::Decode { reason } => assert!(reason.contains("object")),
1604 other => panic!("expected Decode, got {other:?}"),
1605 }
1606 }
1607
1608 // -----------------------------------------------------------------
1609 // Issue #1207 — spawn-retry-with-backoff pin tests.
1610 // -----------------------------------------------------------------
1611
1612 /// EAGAIN / ENOMEM / EMFILE / ETXTBSY are the four errnos the
1613 /// helper must treat as transient. Other errnos (e.g. ENOENT for
1614 /// a missing binary) must surface immediately.
1615 #[cfg(unix)]
1616 #[test]
1617 fn issue_1207_is_transient_spawn_errno_classification() {
1618 // Positive cases.
1619 for errno in [libc::EAGAIN, libc::ENOMEM, libc::EMFILE, libc::ETXTBSY] {
1620 let err = io::Error::from_raw_os_error(errno);
1621 assert!(
1622 is_transient_spawn_errno(&err),
1623 "errno {errno} should be transient"
1624 );
1625 }
1626 // Negative cases — ENOENT (missing binary), EACCES (perm denied)
1627 // must NOT be retried; those are operator-actionable.
1628 for errno in [libc::ENOENT, libc::EACCES, libc::ENOEXEC] {
1629 let err = io::Error::from_raw_os_error(errno);
1630 assert!(
1631 !is_transient_spawn_errno(&err),
1632 "errno {errno} must NOT be classified as transient"
1633 );
1634 }
1635 // io::Error without an errno (e.g. synthetic Other) is also
1636 // non-transient — we can't reason about kernel state.
1637 assert!(!is_transient_spawn_errno(&io::Error::other("oops")));
1638 }
1639
1640 /// The helper returns the first `Ok(child)` without sleeping when
1641 /// the closure succeeds on the first attempt. Unix-only because
1642 /// the test spawns `/bin/true` (or `/usr/bin/true`); Windows has
1643 /// no equivalent always-present zero-exit binary in a stable path,
1644 /// and the spawn-retry helper itself is a no-op on Windows
1645 /// (`is_transient_spawn_errno` always returns `false` when
1646 /// `cfg(unix)` is off).
1647 #[cfg(unix)]
1648 #[tokio::test(flavor = "current_thread")]
1649 async fn issue_1207_spawn_retry_first_attempt_succeeds() {
1650 let started = Instant::now();
1651 let child = spawn_with_transient_retry(|| {
1652 Command::new(if std::path::Path::new("/bin/true").exists() {
1653 "/bin/true"
1654 } else {
1655 "/usr/bin/true"
1656 })
1657 .stdin(Stdio::null())
1658 .stdout(Stdio::null())
1659 .stderr(Stdio::null())
1660 .kill_on_drop(true)
1661 .spawn()
1662 })
1663 .await
1664 .expect("first-attempt spawn should succeed");
1665 // Reap the child to keep the test hermetic.
1666 drop(child);
1667 // No backoff sleeps fired — well under the 10ms first-step
1668 // ladder entry.
1669 assert!(
1670 started.elapsed() < Duration::from_millis(10),
1671 "first-attempt success must not pay any backoff"
1672 );
1673 }
1674
1675 /// A non-transient errno (ENOENT — missing binary) MUST surface
1676 /// immediately without retries.
1677 #[cfg(unix)]
1678 #[tokio::test(flavor = "current_thread")]
1679 async fn issue_1207_spawn_retry_non_transient_errno_surfaces_immediately() {
1680 let started = Instant::now();
1681 let err = spawn_with_transient_retry(|| {
1682 Command::new("/nonexistent/path/to/binary-xyz-1207")
1683 .stdin(Stdio::null())
1684 .stdout(Stdio::null())
1685 .stderr(Stdio::null())
1686 .kill_on_drop(true)
1687 .spawn()
1688 })
1689 .await
1690 .expect_err("spawn of /nonexistent should fail");
1691 assert_eq!(err.raw_os_error(), Some(libc::ENOENT));
1692 // Should NOT have paid any backoff sleeps — fail-fast on
1693 // non-transient errno.
1694 assert!(
1695 started.elapsed() < Duration::from_millis(10),
1696 "non-transient errno must surface immediately, took {:?}",
1697 started.elapsed()
1698 );
1699 }
1700
1701 /// Fault-injected EAGAIN: the helper sleeps through 4 backoff
1702 /// steps (~10+50+200+1000 = 1260ms) then succeeds on the 5th
1703 /// attempt when the injected counter hits zero.
1704 ///
1705 /// This test sets the `AI_MEMORY_TEST_FORCE_SPAWN_EAGAIN` env var
1706 /// to force the helper through the full backoff ladder; it MUST
1707 /// run in a fresh process (via `cargo test`) because the env-var
1708 /// gate is `Once`-initialized. We use a `Mutex` to serialize
1709 /// against other tests in this module that might race.
1710 #[cfg(unix)]
1711 #[tokio::test(flavor = "current_thread")]
1712 async fn issue_1207_spawn_retry_recovers_from_transient_eagain() {
1713 // This test exercises the retry loop by passing a closure
1714 // that returns a synthesized EAGAIN on its first two calls,
1715 // then a real successful spawn. We don't use the env-var
1716 // injection here because the once-init makes it
1717 // single-shot per process; a per-call counter under our
1718 // direct control is the more honest unit test for the loop.
1719 use std::cell::Cell;
1720 let attempt_count: Cell<u32> = Cell::new(0);
1721 let started = Instant::now();
1722 let child = spawn_with_transient_retry(|| {
1723 let n = attempt_count.get();
1724 attempt_count.set(n + 1);
1725 if n < 2 {
1726 Err(io::Error::from_raw_os_error(libc::EAGAIN))
1727 } else {
1728 Command::new(if std::path::Path::new("/bin/true").exists() {
1729 "/bin/true"
1730 } else {
1731 "/usr/bin/true"
1732 })
1733 .stdin(Stdio::null())
1734 .stdout(Stdio::null())
1735 .stderr(Stdio::null())
1736 .kill_on_drop(true)
1737 .spawn()
1738 }
1739 })
1740 .await
1741 .expect("spawn should succeed after 2 EAGAIN retries");
1742 drop(child);
1743 assert_eq!(
1744 attempt_count.get(),
1745 3,
1746 "closure called 3 times (2 EAGAIN + 1 success)"
1747 );
1748 // Paid 10ms + 50ms = 60ms of backoff. Allow generous slop for
1749 // CI scheduler jitter (especially under cargo's parallel
1750 // test load — the EXACT condition this fix targets).
1751 let elapsed = started.elapsed();
1752 assert!(
1753 elapsed >= Duration::from_millis(55),
1754 "should pay at least 10+50=60ms backoff, got {elapsed:?}"
1755 );
1756 assert!(
1757 elapsed < Duration::from_millis(1_500),
1758 "should not pay the full 1.26s ladder when success comes on attempt 3, got {elapsed:?}"
1759 );
1760 }
1761
1762 /// Exhausting all 5 attempts surfaces the last transient error.
1763 /// Guarantees the helper doesn't loop forever AND that operators
1764 /// still see the precise errno when the kernel never recovers.
1765 #[cfg(unix)]
1766 #[tokio::test(flavor = "current_thread")]
1767 async fn issue_1207_spawn_retry_exhaustion_surfaces_last_error() {
1768 use std::cell::Cell;
1769 let attempt_count: Cell<u32> = Cell::new(0);
1770 let err = spawn_with_transient_retry(|| {
1771 attempt_count.set(attempt_count.get() + 1);
1772 Err::<Child, _>(io::Error::from_raw_os_error(libc::EMFILE))
1773 })
1774 .await
1775 .expect_err("all attempts return EMFILE → helper must surface");
1776 assert_eq!(err.raw_os_error(), Some(libc::EMFILE));
1777 assert_eq!(
1778 attempt_count.get(),
1779 (SPAWN_RETRY_BACKOFF_MS.len() + 1) as u32,
1780 "closure must be called total_attempts times before exhaustion"
1781 );
1782 }
1783}