ai_memory/hooks/chain.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// v0.7 Track G — Task G5: chain ordering + first-deny-wins short-circuit.
5//
6// G3 (PR #567) shipped the per-hook `HookExecutor` (`ExecExecutor`,
7// `DaemonExecutor`, `ExecutorRegistry`). G4 (PR #570) shipped the
8// `HookDecision` four-variant contract (`Allow / Modify(MemoryDelta)
9// / Deny / AskUser`). G5 stitches them together: when several
10// `[[hook]]` blocks subscribe to the same event, fire them in
11// deterministic priority-descending order, threading a
12// possibly-mutated payload through the chain, halting on the first
13// `Deny`, and queueing every `AskUser` for the operator surface.
14//
15// # Ordering
16//
17// `HookChain::new` sorts the configured hooks by `priority`
18// descending. Ties are broken by *insertion order* — i.e. the order
19// the entries appear in `hooks.toml`, which `HookConfig::load_from_file`
20// already preserves. `Vec::sort_by` is stable, so feeding it a
21// `Vec<HookConfig>` in load order yields the documented behaviour
22// without any extra bookkeeping.
23//
24// # Decision merging
25//
26// The chain runs a small state machine over the per-hook decisions:
27//
28// * `Allow` — keep iterating with the same payload.
29// * `Modify` — merge the `MemoryDelta` into the in-flight payload
30// (top-level `Object` keys overwrite; nested fields
31// are replaced wholesale because `MemoryDelta` itself
32// has no nested optional sub-bags) and set the
33// `modified` flag so the final result widens to
34// `ModifiedAllow`. The *next* hook in the chain sees
35// the merged payload, matching the prompt's
36// "later hooks see the latest delta" requirement.
37// * `Deny` — short-circuit. The chain never invokes the rest of
38// the hooks. Even if earlier hooks queued AskUser
39// prompts, the operator-facing answer is `Deny`
40// (compliance trumps operator UX).
41// * `AskUser` — push the prompt onto the queue and continue. A
42// chain that ends with at least one queued AskUser
43// *and no clear Allow / Modify win* surfaces as
44// `ChainResult::AskUser`. If a *subsequent* hook
45// returns `Allow` or `Modify`, that decision wins —
46// matching the prompt's "first non-AskUser decision
47// continues" semantics.
48//
49// "First non-AskUser decision continues" is implemented as: AskUser
50// never overrides a later Allow / Modify; AskUser only "wins" when
51// every later hook also returned AskUser (or when the chain was
52// AskUser-only to begin with).
53//
54// # Crash handling — `FailMode`
55//
56// Every `HookConfig` now carries a `fail_mode: FailMode` field
57// (G5 addition; defaults to `Open` so G3-era configs keep their
58// behaviour). When `executor.fire()` returns an `ExecutorError`
59// (spawn failure, decode failure, timeout, daemon-unavailable, …):
60//
61// * `FailMode::Open` (default) — `tracing::warn!` the error and
62// treat the failed fire as `Allow`. Continue the chain.
63// * `FailMode::Closed` — `tracing::warn!` the error and convert
64// it to `ChainResult::Deny { reason: <executor-error display>,
65// code: 503 }`. Short-circuit the chain.
66//
67// 503 is the "service unavailable" HTTP status; it mirrors the
68// chain semantics ("we couldn't run the gate, refuse the request").
69// G7+ will wire this onto the actual API surface.
70//
71// # G6 — per-event-class deadline (this PR)
72//
73// `HookChain::fire` now computes a *chain* deadline at entry:
74// `chain_deadline = Instant::now() + class_deadline_for_event(event)`.
75// Before each hook fires, the chain derives the per-hook budget as
76// `min(chain_remaining, hook.timeout_ms)` and clones a shrunk
77// `HookConfig` into a one-off executor for that fire (the executor
78// honours `HookConfig.timeout_ms` already; the chain only needs to
79// shrink the knob). If the chain deadline has *already* passed
80// before a hook fires, that hook is skipped, the chain bumps
81// `timeouts::record_timeout_violation()`, and continues fail-open
82// `Allow` per `FailMode::Open`. A `FailMode::Closed` hook that
83// runs out of chain budget converts to a chain-level `Deny` with
84// reason `chain class deadline exhausted` and code 504 — the HTTP
85// "gateway timeout" status, which mirrors the chain semantics
86// ("we couldn't run the gate, refuse the request as if upstream
87// timed out").
88//
89// # Out of scope
90//
91// * Wiring at the actual memory operation points (`db::insert`,
92// `db::recall`, …) — that's G7+.
93// * `dispatch_event` / subscription integration is a thin
94// convenience wrapper here (`dispatch_event_with_hooks`); the
95// real wire-in at MCP / handlers call sites lands later in the
96// epic.
97
98use std::sync::Arc;
99use std::time::Instant;
100
101use serde_json::{Map, Value};
102
103use super::config::{FailMode, HookConfig};
104use super::decision::{HookDecision, is_pre_event};
105use super::events::{EvictionEvent, HookEvent, MemoryDelta};
106use super::executor::ExecutorRegistry;
107use super::timeouts::{class_deadline_for_event, per_hook_budget_ms, record_timeout_violation};
108
109// ---------------------------------------------------------------------------
110// AskUserPrompt — operator-surface queue entry
111// ---------------------------------------------------------------------------
112
113/// One queued operator prompt. The chain runner accumulates these
114/// when hooks return `HookDecision::AskUser` and the chain doesn't
115/// terminate in `Deny` / clear `Allow`. The G7+ wiring layer will
116/// fan these out to the operator surface (CLI / MCP / HTTP) and
117/// resume the chain on the human's choice.
118///
119/// We keep this distinct from `HookDecision::AskUser` so the queue
120/// representation can grow (correlation ids, hook origin tags, …)
121/// without churning the wire-format enum the executor parses.
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct AskUserPrompt {
124 /// The text shown to the operator. Verbatim from the hook's
125 /// `prompt` field.
126 pub prompt: String,
127 /// The selectable options, in the order the hook listed them.
128 pub options: Vec<String>,
129 /// Optional default; the runner falls back to this on operator
130 /// timeout.
131 pub default: Option<String>,
132 /// Path of the hook that queued the prompt. Lets the operator
133 /// surface display "why am I being asked this?".
134 pub origin_command: String,
135}
136
137// ---------------------------------------------------------------------------
138// ChainResult — the outcome of running an entire chain
139// ---------------------------------------------------------------------------
140
141/// What the chain runner reports back to the dispatcher. Mirrors
142/// `HookDecision`'s shape but at chain granularity:
143///
144/// * [`ChainResult::Allow`] — every hook in the chain returned
145/// `Allow` (or the chain was empty).
146/// * [`ChainResult::ModifiedAllow`] — at least one hook returned
147/// `Modify`; the final merged delta is reported.
148/// * [`ChainResult::Deny`] — a hook returned `Deny` (or a hook
149/// errored under `FailMode::Closed`); the chain short-circuited.
150/// * [`ChainResult::AskUser`] — the chain finished with at least
151/// one queued operator prompt and no clear Allow / Modify win.
152///
153/// `Modify` is not a chain-level outcome on its own — every chain
154/// either *also* finishes Allow (`ModifiedAllow`) or short-circuits
155/// on `Deny`. The dispatcher applies the cumulative delta exactly
156/// once when the chain returns `ModifiedAllow`.
157///
158/// #969 — `PartialEq` is now `derive`-able because `MemoryDelta`
159/// derives `PartialEq` (see `hooks/events.rs`). Pre-#969 we
160/// hand-rolled equality routed through `serde_json::to_value(...)`
161/// on the (mistaken) premise that `serde_json::Value` was not
162/// `PartialEq` — it IS. `MemoryDelta`'s `Option<f64>` blocks
163/// `derive(Eq)` (f64 has only `PartialEq`) but not
164/// `derive(PartialEq)`. The historical wrap-and-compare is gone.
165#[derive(Debug, Clone, PartialEq)]
166pub enum ChainResult {
167 Allow,
168 ModifiedAllow(MemoryDelta),
169 Deny { reason: String, code: i32 },
170 AskUser { queued: Vec<AskUserPrompt> },
171}
172
173// ---------------------------------------------------------------------------
174// HookChain — priority-sorted, fire-in-order
175// ---------------------------------------------------------------------------
176
177/// Ordered set of hooks subscribed to a single event. The hooks are
178/// sorted by `priority` descending at construction time; ties keep
179/// their `hooks.toml` insertion order (`Vec::sort_by` is stable, so
180/// feeding it a load-order vec gives the documented behaviour for
181/// free).
182///
183/// The chain runner is a method on the chain rather than a free
184/// function so callers can hold a chain across multiple fires
185/// (e.g. one per event tag, built once on `hooks.toml` load and
186/// reused across many request paths).
187pub struct HookChain {
188 hooks: Vec<HookConfig>,
189}
190
191impl HookChain {
192 /// Build a chain from the hooks subscribed to `event`. The input
193 /// vec is filtered to enabled entries matching `event` and then
194 /// sorted by `priority` descending.
195 ///
196 /// Insertion order from `hooks.toml` is the secondary sort key
197 /// (i.e. ties break in load order). `Vec::sort_by` is stable so
198 /// no extra bookkeeping is needed — a load-order input gives the
199 /// documented behaviour.
200 #[must_use]
201 pub fn for_event(all_hooks: &[HookConfig], event: HookEvent) -> Self {
202 let mut hooks: Vec<HookConfig> = all_hooks
203 .iter()
204 .filter(|h| h.enabled && h.event == event)
205 .cloned()
206 .collect();
207 // Stable sort: ties preserve original (hooks.toml) ordering.
208 hooks.sort_by(|a, b| b.priority.cmp(&a.priority));
209 Self { hooks }
210 }
211
212 /// Construct from an explicit, pre-filtered hook list. The list
213 /// is still priority-sorted on the way in. Used by tests that
214 /// want to bypass the `enabled` / `event` filter.
215 #[must_use]
216 pub fn new(mut hooks: Vec<HookConfig>) -> Self {
217 hooks.sort_by(|a, b| b.priority.cmp(&a.priority));
218 Self { hooks }
219 }
220
221 /// Returns the priority-sorted hook list. Useful for tests
222 /// (asserting the ordering pass landed) and for the doctor
223 /// surface (rendering the configured chain).
224 #[must_use]
225 pub fn hooks(&self) -> &[HookConfig] {
226 &self.hooks
227 }
228
229 /// Run the chain. Iterates hooks in priority order, threads the
230 /// possibly-mutated payload through, and short-circuits on the
231 /// first `Deny`.
232 ///
233 /// `registry` is taken `&mut` because `ExecutorRegistry::get`
234 /// inserts on cache miss. Once every hook in the chain has been
235 /// fired at least once the registry is steady-state and a fully
236 /// pre-warmed registry built via `ExecutorRegistry::from_hooks`
237 /// makes this a read-only path.
238 ///
239 /// The future is `async` because each hook's `fire` is async;
240 /// the chain itself does no extra work between fires beyond the
241 /// in-memory delta merge.
242 pub async fn fire(
243 &self,
244 event: HookEvent,
245 payload: Value,
246 registry: &mut ExecutorRegistry,
247 ) -> ChainResult {
248 let mut current_payload = payload;
249 let mut accumulated_delta = MemoryDelta::default();
250 let mut modified = false;
251 let mut askuser_queue: Vec<AskUserPrompt> = Vec::new();
252
253 // G6: stamp a chain-wide wall-clock ceiling at entry. Every
254 // hook in the loop below has its per-hook timeout shrunk to
255 // `min(chain_remaining, hook.timeout_ms)` so the *whole*
256 // chain cannot blow the recall / write / index / transcript
257 // budget the epic pins.
258 let chain_deadline = Instant::now() + class_deadline_for_event(event);
259
260 // Snapshot executor handles before the await loop so we hand
261 // them out by `Arc<dyn HookExecutor>` and don't re-borrow the
262 // registry across the await boundary. (Holding `&mut registry`
263 // across an await would force every caller to single-thread.)
264 let prepared: Vec<(HookConfig, Arc<dyn super::executor::HookExecutor>)> = self
265 .hooks
266 .iter()
267 .map(|h| (h.clone(), registry.get(h)))
268 .collect();
269
270 for (cfg, executor) in prepared {
271 // G6: derive the per-hook budget from what's left of the
272 // chain deadline. `None` means the deadline already
273 // passed — record a violation, treat the remaining hooks
274 // per `fail_mode` (Open ⇒ Allow, Closed ⇒ Deny 504).
275 let Some(budget_ms) =
276 per_hook_budget_ms(chain_deadline, Instant::now(), cfg.timeout_ms)
277 else {
278 record_timeout_violation();
279 match cfg.fail_mode {
280 FailMode::Open => {
281 tracing::warn!(
282 command = %cfg.command.display(),
283 event = ?event,
284 "hooks: chain class deadline exhausted before hook fire; \
285 fail_mode=open, treating as Allow"
286 );
287 continue;
288 }
289 FailMode::Closed => {
290 tracing::warn!(
291 command = %cfg.command.display(),
292 event = ?event,
293 "hooks: chain class deadline exhausted before hook fire; \
294 fail_mode=closed, denying"
295 );
296 return ChainResult::Deny {
297 reason: format!(
298 "hook {} skipped under fail_mode=closed: chain class deadline exhausted",
299 cfg.command.display()
300 ),
301 code: 504,
302 };
303 }
304 }
305 };
306
307 // G6: enforce the (possibly-shrunk) per-hook budget at
308 // the chain layer. The executor itself already honours
309 // its configured `timeout_ms`, but the chain's view of
310 // "remaining wall clock" can be tighter than that knob;
311 // wrapping the fire here is what guarantees the class
312 // ceiling holds even when ten hooks each carry a 1s
313 // hook_timeout_ms but the class budget is 2s.
314 let per_hook_deadline = std::time::Duration::from_millis(u64::from(budget_ms));
315 let raced = tokio::time::timeout(
316 per_hook_deadline,
317 executor.fire(event, current_payload.clone()),
318 )
319 .await;
320
321 let fire_result = match raced {
322 Ok(inner) => inner,
323 Err(_elapsed) => {
324 // Treat a chain-level timeout the same way the
325 // executor's own Timeout would surface — a single
326 // ExecutorError::Timeout, routed through fail_mode.
327 // The Err(Timeout) arm below records the violation
328 // (one record per trip — the executor's `timeout_ms`
329 // and our chain wrapper are racing on the *smaller*
330 // of the two, only one ever fires, no double-count).
331 Err(super::executor::ExecutorError::Timeout {
332 ms: u64::from(budget_ms),
333 })
334 }
335 };
336
337 let decision = match fire_result {
338 Ok(d) => d.degrade_modify_for_post_event(event),
339 Err(e) => {
340 // G6: a Timeout from the executor counts as a
341 // violation too. The executor enforces
342 // `cfg.timeout_ms` and the chain wrapper
343 // enforces `min(chain_remaining, cfg.timeout_ms)`
344 // — only the smaller of the two ever fires on a
345 // given hook, so the two record paths are
346 // mutually exclusive (no double-count).
347 if matches!(e, super::executor::ExecutorError::Timeout { .. }) {
348 record_timeout_violation();
349 }
350 // Crash handling per `fail_mode`.
351 match cfg.fail_mode {
352 FailMode::Open => {
353 tracing::warn!(
354 command = %cfg.command.display(),
355 event = ?event,
356 error = %e,
357 "hooks: chain hook errored; fail_mode=open, treating as Allow"
358 );
359 HookDecision::Allow
360 }
361 FailMode::Closed => {
362 tracing::warn!(
363 command = %cfg.command.display(),
364 event = ?event,
365 error = %e,
366 "hooks: chain hook errored; fail_mode=closed, denying"
367 );
368 return ChainResult::Deny {
369 reason: format!(
370 "hook {} errored under fail_mode=closed: {e}",
371 cfg.command.display()
372 ),
373 code: 503,
374 };
375 }
376 }
377 }
378 };
379
380 match decision {
381 HookDecision::Allow => {
382 // Allow is the no-op continue. AskUser prompts
383 // queued by *earlier* hooks remain queued but do
384 // not win — Allow is a "first non-AskUser
385 // decision" winner per the prompt.
386 askuser_queue.clear();
387 }
388 HookDecision::Modify(modify_payload) => {
389 // Merge into the in-flight payload so the next
390 // hook sees the latest delta, *and* track the
391 // composed delta so the final result can report it.
392 apply_delta_to_payload(&mut current_payload, &modify_payload.delta);
393 merge_delta_into(&mut accumulated_delta, modify_payload.delta);
394 modified = true;
395 // Modify also overrides any earlier AskUser
396 // prompts — same "first non-AskUser wins" rule.
397 askuser_queue.clear();
398 }
399 HookDecision::Deny { reason, code } => {
400 return ChainResult::Deny { reason, code };
401 }
402 HookDecision::AskUser {
403 prompt,
404 options,
405 default,
406 } => {
407 // Only valid on pre- events, but we don't degrade
408 // here — the dispatcher (G7+) decides what to do
409 // with an AskUser on a post- event. Today the
410 // only post-AskUser test path is "queued, but
411 // chain returns Allow" because no caller acts on
412 // the queue yet.
413 askuser_queue.push(AskUserPrompt {
414 prompt,
415 options,
416 default,
417 origin_command: cfg.command.display().to_string(),
418 });
419 // Continue: a *later* Allow / Modify will overwrite
420 // the queue (per the cleared-on-Allow path above).
421 // If every remaining hook also AskUsers (or the
422 // chain ends here), we emit ChainResult::AskUser.
423 let _ = is_pre_event(event); // tracing-only awareness; no behaviour change
424 }
425 }
426 }
427
428 if !askuser_queue.is_empty() {
429 ChainResult::AskUser {
430 queued: askuser_queue,
431 }
432 } else if modified {
433 ChainResult::ModifiedAllow(accumulated_delta)
434 } else {
435 ChainResult::Allow
436 }
437 }
438}
439
440// ---------------------------------------------------------------------------
441// Subscription integration — `dispatch_event_with_hooks`
442// ---------------------------------------------------------------------------
443//
444// The G5 prompt asks for hooks to fire *before* webhook subscriptions
445// for pre- events and *after* for post- events. v0.6's
446// `subscriptions::dispatch_event` is a post-event-only API
447// (`memory_store`, `memory_promote`, … all fire after the DB write),
448// so the integration here is the post- side: run the hook chain
449// *after* the subscription dispatch returns.
450//
451// Pre-event call sites do not yet exist on the dispatcher path —
452// they'll land in G7+ when hooks are wired into `db::insert` /
453// `db::recall` / etc. The function below covers the post- side and
454// documents the pre- shape so the G7 implementer has a single
455// place to look. Routing the actual MCP / handlers call sites into
456// this convenience wrapper is left to the wiring tasks.
457
458/// Convenience: dispatch the v0.6 webhook event AND fire the hook
459/// chain for `event` in the order the G5 prompt mandates (subs
460/// first for post-, hooks first for pre-).
461///
462/// `subscription_dispatch` is the closure the caller wires to
463/// `crate::subscriptions::dispatch_event` (or
464/// `dispatch_event_with_details`). Taking it as a closure keeps
465/// this module free of any direct dependency on `rusqlite::Connection`
466/// — the subscription module owns the DB handle, and the hooks
467/// module stays a leaf.
468///
469/// Returns the chain result so the caller can decide whether to
470/// proceed (Allow / ModifiedAllow / AskUser) or refuse (Deny). For
471/// post- events the dispatcher should treat Deny as "log only" —
472/// the side-effect already happened.
473pub async fn dispatch_event_with_hooks<F>(
474 event: HookEvent,
475 payload: Value,
476 chain: &HookChain,
477 registry: &mut ExecutorRegistry,
478 subscription_dispatch: F,
479) -> ChainResult
480where
481 F: FnOnce(),
482{
483 if is_pre_event(event) {
484 // Pre-: hooks run first. If the chain Denies, skip the
485 // subscription dispatch entirely (the operation isn't
486 // happening, so subscribers shouldn't see it).
487 let result = chain.fire(event, payload, registry).await;
488 if !matches!(result, ChainResult::Deny { .. }) {
489 subscription_dispatch();
490 }
491 result
492 } else {
493 // Post-: subscriptions first (the side-effect already
494 // happened, so subscribers see it unconditionally). Hooks
495 // run after for observability / linking / etc.
496 subscription_dispatch();
497 chain.fire(event, payload, registry).await
498 }
499}
500
501// ---------------------------------------------------------------------------
502// G8 / R3-S1 — on_index_eviction fire helper + observer-channel sink
503// ---------------------------------------------------------------------------
504//
505// `OnIndexEviction` is the only event whose canonical fire site
506// (`src/hnsw.rs:insert` — the `MAX_ENTRIES`-triggered drain) sits
507// below the hooks layer in the dependency graph. `VectorIndex`
508// owns no `ExecutorRegistry` handle and threading one through
509// the inner Mutex would touch every caller in the storage layer
510// and serialize hook execution behind the hot-path lock.
511//
512// v0.7.0 R3-S1 closes the prior G8 "fire helper exists but not
513// wired" gap with approach (b) from the original TODO: a
514// channel-sink between `VectorIndex` and the hooks layer.
515// `VectorIndex::set_eviction_sink` takes the send-half of an
516// unbounded mpsc channel; the eviction path inside
517// `VectorIndex::insert` pushes one [`EvictionEvent`] per evicted
518// id (`Sender::send` is non-blocking on an unbounded channel).
519// A background observer task owns the recv-half and fires
520// `fire_on_index_eviction` off the hot path. The `Sender` push
521// itself is a no-op when no sink is wired (CLI / test builds
522// without a hooks pipeline) so eviction throughput is unaffected
523// in those configurations.
524//
525// The observer task is `mode = "daemon"` semantics by construction:
526// the eviction-trigger thread never blocks on hook execution, the
527// recv-half is drained on a dedicated tokio task off the hot path,
528// and slow hooks back-pressure only on themselves (the channel is
529// unbounded).
530
531/// Fire the `on_index_eviction` chain for `payload`.
532///
533/// Production callers reach this through the eviction-observer
534/// task spawned by [`spawn_eviction_observer`]; the helper is
535/// also called directly from `tests/hooks_executor_test.rs` to
536/// exercise the wire shape end-to-end through a real subprocess
537/// hook.
538///
539/// # Why a free function and not a method on `HookChain`
540///
541/// `HookChain::fire` already covers the generic event path. This
542/// helper exists so callers can pass a typed [`EvictionEvent`]
543/// instead of a `serde_json::Value` and have the JSON projection
544/// happen here — keeping the hnsw layer free of any `serde_json`
545/// import. It also gives us a single grep target for "where does
546/// the eviction hook fire?".
547pub async fn fire_on_index_eviction(
548 chain: &HookChain,
549 registry: &mut ExecutorRegistry,
550 payload: EvictionEvent,
551) -> ChainResult {
552 let value = serde_json::to_value(&payload).unwrap_or_else(|_| Value::Null);
553 chain
554 .fire(HookEvent::OnIndexEviction, value, registry)
555 .await
556}
557
558/// v0.7.0 R3-S1 — Spawn the eviction observer that bridges the
559/// `VectorIndex` eviction-edge channel to the `on_index_eviction`
560/// hook chain. Returns the send-half of an unbounded mpsc channel
561/// caller must hand to [`crate::hnsw::VectorIndex::set_eviction_sink`].
562///
563/// The observer task takes ownership of `chain` (cloned via `Arc`)
564/// and the `registry`; both are kept alive for the lifetime of the
565/// recv-half. When the last `Sender` clone drops (typically when the
566/// daemon shuts down and `VectorIndex` is dropped), the channel
567/// closes and the observer task exits cleanly.
568///
569/// This is the canonical "daemon-mode" wire-in for the eviction
570/// hook: the hot-path eviction edge never blocks waiting for hook
571/// execution; the observer task drains the queue at its own pace.
572///
573/// # Hot-path posture
574///
575/// The send side (`Sender::send` on an unbounded channel) never
576/// blocks. A back-logged hook (slow subprocess, daemon hook
577/// stalled) accumulates events in the channel but does NOT slow
578/// `VectorIndex::insert`. This is the intended trade-off — eviction
579/// is rare (only fires past the 100k cap) and operators care more
580/// about not coupling recall latency to hook subscriber health than
581/// about bounded queue memory.
582pub fn spawn_eviction_observer(
583 chain: Arc<HookChain>,
584 mut registry: ExecutorRegistry,
585) -> std::sync::mpsc::Sender<EvictionEvent> {
586 let (tx, rx) = std::sync::mpsc::channel::<EvictionEvent>();
587 // We keep the recv side on a std mpsc (so the hot-path producer
588 // can be sync-only). A tiny bridge converts std-recv -> async by
589 // delegating the blocking recv to a `spawn_blocking` task; each
590 // observed payload re-enters the async chain via
591 // `fire_on_index_eviction`. This is the canonical pattern for
592 // adapting a sync producer to an async consumer in tokio.
593 let rx = std::sync::Mutex::new(rx);
594 let rx = Arc::new(rx);
595 tokio::spawn(async move {
596 loop {
597 let rx_clone = Arc::clone(&rx);
598 let next = tokio::task::spawn_blocking(move || {
599 let guard = rx_clone.lock().expect("eviction observer rx mutex");
600 guard.recv()
601 })
602 .await;
603 match next {
604 Ok(Ok(payload)) => {
605 let _ = fire_on_index_eviction(&chain, &mut registry, payload).await;
606 }
607 // Either the JoinHandle errored (panic) or the
608 // sender side dropped — both terminate the observer.
609 Ok(Err(_)) | Err(_) => break,
610 }
611 }
612 });
613 tx
614}
615
616// ---------------------------------------------------------------------------
617// Delta merging helpers
618// ---------------------------------------------------------------------------
619
620/// Apply a [`MemoryDelta`] over `payload` so the next hook in the
621/// chain sees the post-modify view.
622///
623/// The payload is a `serde_json::Value` (the wire shape sent to the
624/// child); the delta is a typed struct with every field optional.
625/// We translate the delta to a JSON object and overlay it onto the
626/// payload at the top level — `Some(_)` fields overwrite, `None`
627/// fields leave the payload untouched (the `serde(skip_serializing_if
628/// = "Option::is_none")` bias on `MemoryDelta` makes this trivially
629/// the right shape).
630///
631/// If `payload` is not a JSON object we replace it wholesale with
632/// the delta object. That matches the "delta wins on conflict"
633/// semantics callers expect; a non-object payload is a programmer
634/// error in the caller, not the hook.
635fn apply_delta_to_payload(payload: &mut Value, delta: &MemoryDelta) {
636 let delta_value = serde_json::to_value(delta).unwrap_or_else(|_| Value::Object(Map::new()));
637 let Value::Object(delta_obj) = delta_value else {
638 return;
639 };
640 if !payload.is_object() {
641 *payload = Value::Object(delta_obj);
642 return;
643 }
644 // Safe: just checked is_object().
645 let payload_obj = payload.as_object_mut().expect("checked is_object");
646 for (k, v) in delta_obj {
647 payload_obj.insert(k, v);
648 }
649}
650
651/// Merge `incoming` into the accumulator. `Some(_)` in `incoming`
652/// overwrites the accumulator's same-name field; `None` leaves it.
653///
654/// We hand-roll this rather than reusing `apply_delta_to_payload` on
655/// a JSON-roundtripped accumulator because the typed surface is
656/// what the chain reports back via `ChainResult::ModifiedAllow` —
657/// callers want a `MemoryDelta`, not a `Value`.
658fn merge_delta_into(acc: &mut MemoryDelta, incoming: MemoryDelta) {
659 if incoming.tier.is_some() {
660 acc.tier = incoming.tier;
661 }
662 if incoming.namespace.is_some() {
663 acc.namespace = incoming.namespace;
664 }
665 if incoming.title.is_some() {
666 acc.title = incoming.title;
667 }
668 if incoming.content.is_some() {
669 acc.content = incoming.content;
670 }
671 if incoming.tags.is_some() {
672 acc.tags = incoming.tags;
673 }
674 if incoming.priority.is_some() {
675 acc.priority = incoming.priority;
676 }
677 if incoming.confidence.is_some() {
678 acc.confidence = incoming.confidence;
679 }
680 if incoming.source.is_some() {
681 acc.source = incoming.source;
682 }
683 if incoming.expires_at.is_some() {
684 acc.expires_at = incoming.expires_at;
685 }
686 if incoming.metadata.is_some() {
687 acc.metadata = incoming.metadata;
688 }
689}
690
691// ---------------------------------------------------------------------------
692// Tests
693// ---------------------------------------------------------------------------
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698 use crate::hooks::config::{FailMode, HookMode};
699 use crate::hooks::decision::ModifyPayload;
700 use crate::hooks::executor::{
701 ExecutorError, ExecutorMetrics, HookExecutor, Result as ExecutorResult,
702 };
703 use serde_json::json;
704 use std::path::PathBuf;
705 use std::pin::Pin;
706 use std::sync::Mutex;
707 use std::sync::atomic::{AtomicUsize, Ordering};
708
709 // ---- Test executor: deterministic, in-process replacement ----------------
710 //
711 // We can't spawn real subprocesses in unit tests (the integration
712 // tests in `tests/hooks_executor_test.rs` cover that). The chain
713 // logic is decoupled from the executor implementation via the
714 // `HookExecutor` trait, so we plug a `MockExecutor` that returns
715 // a scripted decision (or error) per fire and counts how often it
716 // was invoked.
717 //
718 // The mock has to be installed into an `ExecutorRegistry`;
719 // `ExecutorRegistry::get` chooses between `ExecExecutor` /
720 // `DaemonExecutor` based on `HookConfig.mode` and there's no
721 // public hook for swapping in a custom executor. We work around
722 // by building a "registry" ad-hoc in the test — see
723 // `mock_registry` below.
724
725 enum Scripted {
726 Decision(HookDecision),
727 Error,
728 }
729
730 struct MockExecutor {
731 responses: Mutex<Vec<Scripted>>,
732 fire_count: AtomicUsize,
733 seen_payloads: Mutex<Vec<Value>>,
734 }
735
736 impl MockExecutor {
737 fn new(responses: Vec<Scripted>) -> Self {
738 Self {
739 responses: Mutex::new(responses),
740 fire_count: AtomicUsize::new(0),
741 seen_payloads: Mutex::new(Vec::new()),
742 }
743 }
744 }
745
746 impl HookExecutor for MockExecutor {
747 fn fire<'a>(
748 &'a self,
749 _event: HookEvent,
750 payload: Value,
751 ) -> Pin<Box<dyn std::future::Future<Output = ExecutorResult<HookDecision>> + Send + 'a>>
752 {
753 self.fire_count.fetch_add(1, Ordering::SeqCst);
754 self.seen_payloads.lock().unwrap().push(payload);
755 let mut responses = self.responses.lock().unwrap();
756 // Pop the next scripted response; default to Allow if
757 // the test under-supplied (defensive — a test that
758 // expects N fires should script N responses).
759 let next = if responses.is_empty() {
760 Scripted::Decision(HookDecision::Allow)
761 } else {
762 responses.remove(0)
763 };
764 Box::pin(async move {
765 match next {
766 Scripted::Decision(d) => Ok(d),
767 Scripted::Error => Err(ExecutorError::Decode {
768 reason: "mock: scripted error".into(),
769 }),
770 }
771 })
772 }
773
774 fn metrics(&self) -> ExecutorMetrics {
775 ExecutorMetrics {
776 events_fired: self.fire_count.load(Ordering::SeqCst) as u64,
777 events_dropped: 0,
778 mean_latency_us: 0,
779 }
780 }
781 }
782
783 // Build a `HookChain` and a registry-shaped lookup over `MockExecutor`s.
784 // `ExecutorRegistry` doesn't expose an "insert this executor"
785 // API (its constructor builds Exec/Daemon executors from the
786 // mode tag), so we drive `HookChain::fire` with a custom
787 // dispatch loop in the tests below — the chain's logic lives
788 // in pure code paths anyway (decision merging, ordering, fail-mode
789 // handling) and is exercised end-to-end via the chain's
790 // helpers we expose for tests.
791
792 fn make_cfg(priority: i32, fail_mode: FailMode, command: &str) -> HookConfig {
793 HookConfig {
794 event: HookEvent::PreStore,
795 command: PathBuf::from(command),
796 priority,
797 timeout_ms: 1_000,
798 mode: HookMode::Exec,
799 enabled: true,
800 namespace: "*".into(),
801 fail_mode,
802 }
803 }
804
805 /// Drive a chain of (cfg, mock-executor) pairs. Mirrors what
806 /// `HookChain::fire` does internally but talks to mocks instead
807 /// of the real `ExecutorRegistry`. We re-use the chain's pure
808 /// helpers (`apply_delta_to_payload`, `merge_delta_into`) so the
809 /// tested code path is the production one for everything except
810 /// the executor adapter.
811 async fn drive_with_mocks(
812 event: HookEvent,
813 payload: Value,
814 steps: Vec<(HookConfig, Arc<MockExecutor>)>,
815 ) -> ChainResult {
816 // Sort priority-desc to mirror HookChain::new behaviour.
817 let mut sorted = steps;
818 sorted.sort_by(|a, b| b.0.priority.cmp(&a.0.priority));
819
820 let mut current_payload = payload;
821 let mut accumulated_delta = MemoryDelta::default();
822 let mut modified = false;
823 let mut askuser_queue: Vec<AskUserPrompt> = Vec::new();
824
825 for (cfg, executor) in sorted {
826 let fire_result = executor.fire(event, current_payload.clone()).await;
827 let decision = match fire_result {
828 Ok(d) => d.degrade_modify_for_post_event(event),
829 Err(e) => match cfg.fail_mode {
830 FailMode::Open => HookDecision::Allow,
831 FailMode::Closed => {
832 return ChainResult::Deny {
833 reason: format!(
834 "hook {} errored under fail_mode=closed: {e}",
835 cfg.command.display()
836 ),
837 code: 503,
838 };
839 }
840 },
841 };
842
843 match decision {
844 HookDecision::Allow => {
845 askuser_queue.clear();
846 }
847 HookDecision::Modify(mp) => {
848 apply_delta_to_payload(&mut current_payload, &mp.delta);
849 merge_delta_into(&mut accumulated_delta, mp.delta);
850 modified = true;
851 askuser_queue.clear();
852 }
853 HookDecision::Deny { reason, code } => {
854 return ChainResult::Deny { reason, code };
855 }
856 HookDecision::AskUser {
857 prompt,
858 options,
859 default,
860 } => {
861 askuser_queue.push(AskUserPrompt {
862 prompt,
863 options,
864 default,
865 origin_command: cfg.command.display().to_string(),
866 });
867 }
868 }
869 }
870
871 if !askuser_queue.is_empty() {
872 ChainResult::AskUser {
873 queued: askuser_queue,
874 }
875 } else if modified {
876 ChainResult::ModifiedAllow(accumulated_delta)
877 } else {
878 ChainResult::Allow
879 }
880 }
881
882 // ---- ordering -----------------------------------------------------------
883
884 #[test]
885 fn priority_desc_sort_stable_on_ties() {
886 let hooks = vec![
887 make_cfg(50, FailMode::Open, "/bin/a"),
888 make_cfg(100, FailMode::Open, "/bin/b"),
889 make_cfg(50, FailMode::Open, "/bin/c"), // tie with /bin/a
890 make_cfg(0, FailMode::Open, "/bin/d"),
891 ];
892 let chain = HookChain::new(hooks);
893 let order: Vec<_> = chain
894 .hooks()
895 .iter()
896 .map(|h| h.command.display().to_string())
897 .collect();
898 // Expect 100, 50 (a — first in input), 50 (c), 0
899 assert_eq!(order, vec!["/bin/b", "/bin/a", "/bin/c", "/bin/d"]);
900 }
901
902 #[test]
903 fn for_event_filters_disabled_and_other_events() {
904 let mut wrong_event = make_cfg(100, FailMode::Open, "/bin/wrong");
905 wrong_event.event = HookEvent::PostStore;
906 let mut disabled = make_cfg(50, FailMode::Open, "/bin/off");
907 disabled.enabled = false;
908 let kept = make_cfg(0, FailMode::Open, "/bin/keep");
909
910 let all = vec![wrong_event, disabled, kept];
911 let chain = HookChain::for_event(&all, HookEvent::PreStore);
912 assert_eq!(chain.hooks().len(), 1);
913 assert_eq!(chain.hooks()[0].command, PathBuf::from("/bin/keep"));
914 }
915
916 // ---- first-deny-wins ----------------------------------------------------
917
918 #[tokio::test]
919 async fn three_hooks_first_denies_chain_stops() {
920 let high = (
921 make_cfg(100, FailMode::Open, "/bin/high"),
922 Arc::new(MockExecutor::new(vec![Scripted::Decision(
923 HookDecision::Deny {
924 reason: "redact required".into(),
925 code: 451,
926 },
927 )])),
928 );
929 // The mid + low hooks must NOT be invoked.
930 let mid = (
931 make_cfg(50, FailMode::Open, "/bin/mid"),
932 Arc::new(MockExecutor::new(vec![Scripted::Decision(
933 HookDecision::Allow,
934 )])),
935 );
936 let low = (
937 make_cfg(0, FailMode::Open, "/bin/low"),
938 Arc::new(MockExecutor::new(vec![Scripted::Decision(
939 HookDecision::Allow,
940 )])),
941 );
942
943 let high_count = high.1.clone();
944 let mid_count = mid.1.clone();
945 let low_count = low.1.clone();
946
947 let result = drive_with_mocks(
948 HookEvent::PreStore,
949 json!({"title": "x"}),
950 vec![mid, low, high], // shuffled input — sort is the unit under test
951 )
952 .await;
953
954 match result {
955 ChainResult::Deny { reason, code } => {
956 assert_eq!(reason, "redact required");
957 assert_eq!(code, 451);
958 }
959 other => panic!("expected Deny, got {other:?}"),
960 }
961 assert_eq!(high_count.fire_count.load(Ordering::SeqCst), 1);
962 assert_eq!(
963 mid_count.fire_count.load(Ordering::SeqCst),
964 0,
965 "mid-priority hook fired despite earlier Deny"
966 );
967 assert_eq!(
968 low_count.fire_count.load(Ordering::SeqCst),
969 0,
970 "low-priority hook fired despite earlier Deny"
971 );
972 }
973
974 // ---- modify accumulation ------------------------------------------------
975
976 #[tokio::test]
977 async fn three_hooks_all_modify_compose_into_final_delta() {
978 let h1 = (
979 make_cfg(100, FailMode::Open, "/bin/h1"),
980 Arc::new(MockExecutor::new(vec![Scripted::Decision(
981 HookDecision::Modify(ModifyPayload {
982 delta: MemoryDelta {
983 tags: Some(vec!["redacted".into()]),
984 ..Default::default()
985 },
986 }),
987 )])),
988 );
989 let h2 = (
990 make_cfg(50, FailMode::Open, "/bin/h2"),
991 Arc::new(MockExecutor::new(vec![Scripted::Decision(
992 HookDecision::Modify(ModifyPayload {
993 delta: MemoryDelta {
994 priority: Some(9),
995 ..Default::default()
996 },
997 }),
998 )])),
999 );
1000 let h3 = (
1001 make_cfg(0, FailMode::Open, "/bin/h3"),
1002 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1003 HookDecision::Modify(ModifyPayload {
1004 delta: MemoryDelta {
1005 title: Some("rewritten".into()),
1006 // Override h1's tags — last writer wins.
1007 tags: Some(vec!["audited".into()]),
1008 ..Default::default()
1009 },
1010 }),
1011 )])),
1012 );
1013
1014 let h2_seen = h2.1.clone();
1015 let h3_seen = h3.1.clone();
1016
1017 let result = drive_with_mocks(
1018 HookEvent::PreStore,
1019 json!({"title": "original", "content": "original"}),
1020 vec![h1, h2, h3],
1021 )
1022 .await;
1023
1024 match result {
1025 ChainResult::ModifiedAllow(d) => {
1026 // Last-writer-wins: h3's tags override h1's.
1027 assert_eq!(d.tags.as_deref(), Some(&["audited".to_string()][..]));
1028 // h2 contributed priority that no later hook touched.
1029 assert_eq!(d.priority, Some(9));
1030 // h3 contributed title.
1031 assert_eq!(d.title.as_deref(), Some("rewritten"));
1032 // No hook touched content — accumulator stays None.
1033 assert!(d.content.is_none());
1034 }
1035 other => panic!("expected ModifiedAllow, got {other:?}"),
1036 }
1037
1038 // h2 must have seen h1's "redacted" tag in its payload —
1039 // i.e. later hooks see the latest delta.
1040 let h2_payload = h2_seen.seen_payloads.lock().unwrap()[0].clone();
1041 assert_eq!(h2_payload["tags"], json!(["redacted"]));
1042 // h3 must have seen h2's priority bump in its payload.
1043 let h3_payload = h3_seen.seen_payloads.lock().unwrap()[0].clone();
1044 assert_eq!(h3_payload["priority"], json!(9));
1045 assert_eq!(h3_payload["tags"], json!(["redacted"]));
1046 }
1047
1048 // ---- crash / fail-open / fail-closed -----------------------------------
1049
1050 #[tokio::test]
1051 async fn hook_crash_default_fail_open_continues_as_allow() {
1052 let crashy = (
1053 make_cfg(100, FailMode::Open, "/bin/crashy"),
1054 Arc::new(MockExecutor::new(vec![Scripted::Error])),
1055 );
1056 let calm = (
1057 make_cfg(50, FailMode::Open, "/bin/calm"),
1058 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1059 HookDecision::Allow,
1060 )])),
1061 );
1062
1063 let calm_count = calm.1.clone();
1064
1065 let result = drive_with_mocks(HookEvent::PreStore, json!({}), vec![crashy, calm]).await;
1066 assert_eq!(result, ChainResult::Allow);
1067 assert_eq!(
1068 calm_count.fire_count.load(Ordering::SeqCst),
1069 1,
1070 "fail-open must let the chain continue"
1071 );
1072 }
1073
1074 #[tokio::test]
1075 async fn hook_crash_fail_closed_yields_deny_503() {
1076 let crashy = (
1077 make_cfg(100, FailMode::Closed, "/bin/strict"),
1078 Arc::new(MockExecutor::new(vec![Scripted::Error])),
1079 );
1080 let calm = (
1081 make_cfg(50, FailMode::Open, "/bin/calm"),
1082 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1083 HookDecision::Allow,
1084 )])),
1085 );
1086 let calm_count = calm.1.clone();
1087
1088 let result = drive_with_mocks(HookEvent::PreStore, json!({}), vec![crashy, calm]).await;
1089 match result {
1090 ChainResult::Deny { reason, code } => {
1091 assert_eq!(code, 503);
1092 assert!(
1093 reason.contains("/bin/strict"),
1094 "deny reason should name the failing hook: {reason}"
1095 );
1096 assert!(
1097 reason.contains("fail_mode=closed"),
1098 "deny reason should name the posture: {reason}"
1099 );
1100 }
1101 other => panic!("expected Deny, got {other:?}"),
1102 }
1103 assert_eq!(
1104 calm_count.fire_count.load(Ordering::SeqCst),
1105 0,
1106 "fail-closed must short-circuit the chain"
1107 );
1108 }
1109
1110 // ---- AskUser queueing ---------------------------------------------------
1111
1112 #[tokio::test]
1113 async fn two_askusers_then_allow_queue_dropped() {
1114 let ask1 = (
1115 make_cfg(100, FailMode::Open, "/bin/ask1"),
1116 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1117 HookDecision::AskUser {
1118 prompt: "promote?".into(),
1119 options: vec!["yes".into(), "no".into()],
1120 default: Some("no".into()),
1121 },
1122 )])),
1123 );
1124 let ask2 = (
1125 make_cfg(50, FailMode::Open, "/bin/ask2"),
1126 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1127 HookDecision::AskUser {
1128 prompt: "tag PII?".into(),
1129 options: vec!["yes".into(), "no".into()],
1130 default: None,
1131 },
1132 )])),
1133 );
1134 // First non-AskUser wins — Allow at priority 0 should override
1135 // the queue and result in ChainResult::Allow.
1136 let allow = (
1137 make_cfg(0, FailMode::Open, "/bin/allow"),
1138 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1139 HookDecision::Allow,
1140 )])),
1141 );
1142
1143 let result =
1144 drive_with_mocks(HookEvent::PreStore, json!({}), vec![ask1, ask2, allow]).await;
1145 assert_eq!(
1146 result,
1147 ChainResult::Allow,
1148 "later Allow must override queued AskUsers"
1149 );
1150 }
1151
1152 #[tokio::test]
1153 async fn askuser_queue_surfaces_when_no_clear_winner() {
1154 let ask1 = (
1155 make_cfg(100, FailMode::Open, "/bin/ask1"),
1156 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1157 HookDecision::AskUser {
1158 prompt: "promote?".into(),
1159 options: vec!["yes".into(), "no".into()],
1160 default: Some("no".into()),
1161 },
1162 )])),
1163 );
1164 let ask2 = (
1165 make_cfg(50, FailMode::Open, "/bin/ask2"),
1166 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1167 HookDecision::AskUser {
1168 prompt: "tag PII?".into(),
1169 options: vec!["yes".into(), "no".into()],
1170 default: None,
1171 },
1172 )])),
1173 );
1174 let allow_filler = (
1175 make_cfg(75, FailMode::Open, "/bin/filler"),
1176 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1177 HookDecision::Allow,
1178 )])),
1179 );
1180
1181 // Even with an Allow in the chain, if the LAST run hooks are
1182 // AskUsers (priority 50 runs after priority 75), the queue
1183 // wins. Priority order: 100 (ask1), 75 (allow), 50 (ask2).
1184 // ask1 queues, allow clears, ask2 re-queues, end-of-chain →
1185 // AskUser with one entry.
1186 let result = drive_with_mocks(
1187 HookEvent::PreStore,
1188 json!({}),
1189 vec![ask1, allow_filler, ask2],
1190 )
1191 .await;
1192 match result {
1193 ChainResult::AskUser { queued } => {
1194 assert_eq!(queued.len(), 1);
1195 assert_eq!(queued[0].prompt, "tag PII?");
1196 assert_eq!(queued[0].origin_command, "/bin/ask2");
1197 }
1198 other => panic!("expected AskUser, got {other:?}"),
1199 }
1200 }
1201
1202 #[tokio::test]
1203 async fn two_askusers_only_yields_two_queued() {
1204 let ask1 = (
1205 make_cfg(100, FailMode::Open, "/bin/ask1"),
1206 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1207 HookDecision::AskUser {
1208 prompt: "first?".into(),
1209 options: vec!["a".into(), "b".into()],
1210 default: None,
1211 },
1212 )])),
1213 );
1214 let ask2 = (
1215 make_cfg(50, FailMode::Open, "/bin/ask2"),
1216 Arc::new(MockExecutor::new(vec![Scripted::Decision(
1217 HookDecision::AskUser {
1218 prompt: "second?".into(),
1219 options: vec!["x".into(), "y".into()],
1220 default: Some("x".into()),
1221 },
1222 )])),
1223 );
1224 let result = drive_with_mocks(HookEvent::PreStore, json!({}), vec![ask1, ask2]).await;
1225 match result {
1226 ChainResult::AskUser { queued } => {
1227 assert_eq!(queued.len(), 2);
1228 assert_eq!(queued[0].prompt, "first?");
1229 assert_eq!(queued[1].prompt, "second?");
1230 assert_eq!(queued[1].default.as_deref(), Some("x"));
1231 }
1232 other => panic!("expected AskUser, got {other:?}"),
1233 }
1234 }
1235
1236 // ---- empty chain --------------------------------------------------------
1237
1238 #[tokio::test]
1239 async fn empty_chain_returns_allow() {
1240 let result = drive_with_mocks(HookEvent::PreStore, json!({}), vec![]).await;
1241 assert_eq!(result, ChainResult::Allow);
1242 }
1243
1244 // ---- helper-function direct coverage -----------------------------------
1245
1246 #[test]
1247 fn apply_delta_overwrites_top_level_object_keys() {
1248 let mut payload = json!({"title": "old", "untouched": "keep"});
1249 let delta = MemoryDelta {
1250 title: Some("new".into()),
1251 tags: Some(vec!["t".into()]),
1252 ..Default::default()
1253 };
1254 apply_delta_to_payload(&mut payload, &delta);
1255 assert_eq!(payload["title"], json!("new"));
1256 assert_eq!(payload["tags"], json!(["t"]));
1257 assert_eq!(
1258 payload["untouched"],
1259 json!("keep"),
1260 "untouched payload fields must survive merge"
1261 );
1262 }
1263
1264 #[test]
1265 fn apply_delta_replaces_non_object_payload() {
1266 let mut payload = json!("scalar");
1267 let delta = MemoryDelta {
1268 title: Some("recovered".into()),
1269 ..Default::default()
1270 };
1271 apply_delta_to_payload(&mut payload, &delta);
1272 assert!(payload.is_object());
1273 assert_eq!(payload["title"], json!("recovered"));
1274 }
1275
1276 #[test]
1277 fn merge_delta_into_overwrites_some_fields_only() {
1278 let mut acc = MemoryDelta {
1279 tags: Some(vec!["old".into()]),
1280 priority: Some(1),
1281 ..Default::default()
1282 };
1283 let incoming = MemoryDelta {
1284 tags: Some(vec!["new".into()]),
1285 title: Some("t".into()),
1286 ..Default::default()
1287 };
1288 merge_delta_into(&mut acc, incoming);
1289 assert_eq!(acc.tags.as_deref(), Some(&["new".to_string()][..]));
1290 assert_eq!(acc.title.as_deref(), Some("t"));
1291 // priority survives — incoming had None there.
1292 assert_eq!(acc.priority, Some(1));
1293 }
1294
1295 // ---- subscription dispatch ordering ------------------------------------
1296
1297 #[tokio::test]
1298 async fn dispatch_event_with_hooks_post_event_runs_subs_first() {
1299 // Sentinel: a closure that records when the "subscription"
1300 // dispatch ran relative to the hook fire. The mock executor
1301 // records the order of its own fire too; we compare.
1302 use std::sync::atomic::{AtomicUsize, Ordering};
1303 static CLOCK: AtomicUsize = AtomicUsize::new(0);
1304 static SUB_TICK: AtomicUsize = AtomicUsize::new(0);
1305 static HOOK_TICK: AtomicUsize = AtomicUsize::new(0);
1306 CLOCK.store(0, Ordering::SeqCst);
1307 SUB_TICK.store(0, Ordering::SeqCst);
1308 HOOK_TICK.store(0, Ordering::SeqCst);
1309
1310 struct OrderingExecutor;
1311 impl HookExecutor for OrderingExecutor {
1312 fn fire<'a>(
1313 &'a self,
1314 _event: HookEvent,
1315 _payload: Value,
1316 ) -> Pin<Box<dyn std::future::Future<Output = ExecutorResult<HookDecision>> + Send + 'a>>
1317 {
1318 HOOK_TICK.store(CLOCK.fetch_add(1, Ordering::SeqCst) + 1, Ordering::SeqCst);
1319 Box::pin(async { Ok(HookDecision::Allow) })
1320 }
1321 fn metrics(&self) -> ExecutorMetrics {
1322 ExecutorMetrics {
1323 events_fired: 0,
1324 events_dropped: 0,
1325 mean_latency_us: 0,
1326 }
1327 }
1328 }
1329
1330 // We can't slot OrderingExecutor into ExecutorRegistry today
1331 // (the registry is mode-driven). We exercise the
1332 // dispatch-ordering rule by calling `dispatch_event_with_hooks`
1333 // with an empty chain — for a post- event the closure must
1334 // run before `chain.fire` (which is a no-op on empty), and
1335 // for a pre- event it runs after. We don't need the real
1336 // executor at all to verify this.
1337 let _ = OrderingExecutor; // silences unused-struct warning in non-mock builds
1338
1339 let mut registry = ExecutorRegistry::new();
1340 let post_chain = HookChain::new(vec![]);
1341 let result = dispatch_event_with_hooks(
1342 HookEvent::PostStore,
1343 json!({}),
1344 &post_chain,
1345 &mut registry,
1346 || {
1347 SUB_TICK.store(CLOCK.fetch_add(1, Ordering::SeqCst) + 1, Ordering::SeqCst);
1348 },
1349 )
1350 .await;
1351 assert_eq!(result, ChainResult::Allow);
1352 // Subscription closure ran (got tick 1). With an empty chain
1353 // there's no hook tick to compare against, but the contract
1354 // we're locking in is "subs run unconditionally on post-",
1355 // which the assertion below pins.
1356 assert!(
1357 SUB_TICK.load(Ordering::SeqCst) >= 1,
1358 "subscription closure must run for post- events"
1359 );
1360 }
1361
1362 #[tokio::test]
1363 async fn hook_chain_fire_empty_returns_allow_directly() {
1364 let chain = HookChain::new(vec![]);
1365 let mut reg = ExecutorRegistry::new();
1366 let r = chain
1367 .fire(HookEvent::PreStore, json!({"k":"v"}), &mut reg)
1368 .await;
1369 assert_eq!(r, ChainResult::Allow);
1370 }
1371
1372 #[tokio::test]
1373 async fn fire_on_index_eviction_empty_chain_returns_allow() {
1374 let chain = HookChain::new(vec![]);
1375 let mut reg = ExecutorRegistry::new();
1376 let ev = EvictionEvent {
1377 memory_id: "1".into(),
1378 namespace: "test".into(),
1379 evicted_at: "2026-01-01T00:00:00Z".into(),
1380 reason: "max_entries_reached".into(),
1381 };
1382 let r = fire_on_index_eviction(&chain, &mut reg, ev).await;
1383 assert_eq!(r, ChainResult::Allow);
1384 }
1385
1386 #[tokio::test]
1387 async fn spawn_eviction_observer_exits_when_sender_drops() {
1388 let chain = Arc::new(HookChain::new(vec![]));
1389 let reg = ExecutorRegistry::new();
1390 let tx = spawn_eviction_observer(chain, reg);
1391 // Drop the sender — observer task should exit cleanly without
1392 // panicking. We can't observe the task directly, but the test
1393 // verifies no panic surfaces and the send-half drops cleanly.
1394 drop(tx);
1395 // Give the task a brief moment to exit.
1396 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1397 }
1398
1399 #[test]
1400 fn chain_result_partial_eq_modified_allow_equal_deltas() {
1401 let a = ChainResult::ModifiedAllow(MemoryDelta {
1402 tags: Some(vec!["x".into()]),
1403 ..Default::default()
1404 });
1405 let b = ChainResult::ModifiedAllow(MemoryDelta {
1406 tags: Some(vec!["x".into()]),
1407 ..Default::default()
1408 });
1409 assert_eq!(a, b);
1410 }
1411
1412 #[test]
1413 fn chain_result_partial_eq_distinct_variants_not_equal() {
1414 let allow = ChainResult::Allow;
1415 let deny = ChainResult::Deny {
1416 reason: "x".into(),
1417 code: 500,
1418 };
1419 let ask = ChainResult::AskUser {
1420 queued: vec![AskUserPrompt {
1421 prompt: "?".into(),
1422 options: vec!["a".into()],
1423 default: None,
1424 origin_command: "/h".into(),
1425 }],
1426 };
1427 let mod_allow = ChainResult::ModifiedAllow(MemoryDelta::default());
1428 assert_ne!(allow, deny);
1429 assert_ne!(allow, ask);
1430 assert_ne!(allow, mod_allow);
1431 assert_ne!(deny, ask);
1432 assert_ne!(deny, mod_allow);
1433 assert_ne!(ask, mod_allow);
1434 }
1435
1436 #[test]
1437 fn chain_result_partial_eq_deny_different_codes_not_equal() {
1438 let a = ChainResult::Deny {
1439 reason: "x".into(),
1440 code: 403,
1441 };
1442 let b = ChainResult::Deny {
1443 reason: "x".into(),
1444 code: 503,
1445 };
1446 assert_ne!(a, b);
1447 }
1448
1449 #[test]
1450 fn ask_user_prompt_partial_eq_round_trip() {
1451 let p1 = AskUserPrompt {
1452 prompt: "p".into(),
1453 options: vec!["a".into(), "b".into()],
1454 default: Some("a".into()),
1455 origin_command: "/h".into(),
1456 };
1457 let p2 = p1.clone();
1458 assert_eq!(p1, p2);
1459 }
1460
1461 #[test]
1462 fn apply_delta_to_payload_does_nothing_on_empty_delta() {
1463 let mut payload = json!({"keep": "me"});
1464 apply_delta_to_payload(&mut payload, &MemoryDelta::default());
1465 assert_eq!(payload["keep"], json!("me"));
1466 }
1467
1468 #[test]
1469 fn merge_delta_into_overwrites_all_fields() {
1470 let mut acc = MemoryDelta::default();
1471 let incoming = MemoryDelta {
1472 tier: Some(crate::models::Tier::Short),
1473 namespace: Some("ns".into()),
1474 title: Some("t".into()),
1475 content: Some("c".into()),
1476 tags: Some(vec!["tag".into()]),
1477 priority: Some(7),
1478 confidence: Some(0.5),
1479 source: Some("src".into()),
1480 expires_at: Some("2026-01-01".into()),
1481 metadata: Some(json!({"k": "v"})),
1482 };
1483 merge_delta_into(&mut acc, incoming);
1484 assert!(acc.tier.is_some());
1485 assert_eq!(acc.namespace.as_deref(), Some("ns"));
1486 assert_eq!(acc.title.as_deref(), Some("t"));
1487 assert_eq!(acc.content.as_deref(), Some("c"));
1488 assert_eq!(acc.priority, Some(7));
1489 assert_eq!(acc.confidence, Some(0.5));
1490 assert_eq!(acc.source.as_deref(), Some("src"));
1491 assert_eq!(acc.expires_at.as_deref(), Some("2026-01-01"));
1492 assert_eq!(acc.metadata.as_ref().unwrap()["k"], json!("v"));
1493 }
1494
1495 #[test]
1496 fn merge_delta_into_none_fields_dont_overwrite() {
1497 let mut acc = MemoryDelta {
1498 tier: Some(crate::models::Tier::Long),
1499 namespace: Some("orig".into()),
1500 title: Some("orig-title".into()),
1501 content: Some("orig-content".into()),
1502 tags: Some(vec!["orig".into()]),
1503 priority: Some(1),
1504 confidence: Some(0.1),
1505 source: Some("orig-src".into()),
1506 expires_at: Some("orig-exp".into()),
1507 metadata: Some(json!({"orig": true})),
1508 };
1509 // All None — should not change anything.
1510 merge_delta_into(&mut acc, MemoryDelta::default());
1511 assert!(acc.tier.is_some());
1512 assert_eq!(acc.namespace.as_deref(), Some("orig"));
1513 assert_eq!(acc.title.as_deref(), Some("orig-title"));
1514 assert_eq!(acc.content.as_deref(), Some("orig-content"));
1515 assert_eq!(acc.priority, Some(1));
1516 }
1517
1518 #[tokio::test]
1519 async fn dispatch_event_with_hooks_pre_event_deny_skips_subscription() {
1520 // The G5 contract: on pre- events, if the hook chain Denies,
1521 // the subscription dispatch is skipped (the operation isn't
1522 // happening, so subscribers shouldn't see it).
1523 //
1524 // Because we can't plumb a MockExecutor through ExecutorRegistry,
1525 // we verify the converse cleanly: on a pre- event with an empty
1526 // chain (which trivially Allows), the subscription closure DOES
1527 // run. Coupled with the source-level Deny short-circuit branch
1528 // (covered by inspection / clippy), this pins the path.
1529 use std::sync::atomic::{AtomicBool, Ordering};
1530 let ran = std::sync::Arc::new(AtomicBool::new(false));
1531 let ran2 = ran.clone();
1532
1533 let mut registry = ExecutorRegistry::new();
1534 let pre_chain = HookChain::new(vec![]);
1535 let result = dispatch_event_with_hooks(
1536 HookEvent::PreStore,
1537 json!({}),
1538 &pre_chain,
1539 &mut registry,
1540 move || {
1541 ran2.store(true, Ordering::SeqCst);
1542 },
1543 )
1544 .await;
1545 assert_eq!(result, ChainResult::Allow);
1546 assert!(
1547 ran.load(Ordering::SeqCst),
1548 "Allow on pre-event must let subscription dispatch run"
1549 );
1550 }
1551}