Skip to main content

ai_memory/hooks/
chain.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// v0.7 Track G — Task G5: chain ordering + first-deny-wins short-circuit.
5//
6// G3 (PR #567) shipped the per-hook `HookExecutor` (`ExecExecutor`,
7// `DaemonExecutor`, `ExecutorRegistry`). G4 (PR #570) shipped the
8// `HookDecision` four-variant contract (`Allow / Modify(MemoryDelta)
9// / Deny / AskUser`). G5 stitches them together: when several
10// `[[hook]]` blocks subscribe to the same event, fire them in
11// deterministic priority-descending order, threading a
12// possibly-mutated payload through the chain, halting on the first
13// `Deny`, and queueing every `AskUser` for the operator surface.
14//
15// # Ordering
16//
17// `HookChain::new` sorts the configured hooks by `priority`
18// descending. Ties are broken by *insertion order* — i.e. the order
19// the entries appear in `hooks.toml`, which `HookConfig::load_from_file`
20// already preserves. `Vec::sort_by` is stable, so feeding it a
21// `Vec<HookConfig>` in load order yields the documented behaviour
22// without any extra bookkeeping.
23//
24// # Decision merging
25//
26// The chain runs a small state machine over the per-hook decisions:
27//
28//   * `Allow`   — keep iterating with the same payload.
29//   * `Modify`  — merge the `MemoryDelta` into the in-flight payload
30//                 (top-level `Object` keys overwrite; nested fields
31//                 are replaced wholesale because `MemoryDelta` itself
32//                 has no nested optional sub-bags) and set the
33//                 `modified` flag so the final result widens to
34//                 `ModifiedAllow`. The *next* hook in the chain sees
35//                 the merged payload, matching the prompt's
36//                 "later hooks see the latest delta" requirement.
37//   * `Deny`    — short-circuit. The chain never invokes the rest of
38//                 the hooks. Even if earlier hooks queued AskUser
39//                 prompts, the operator-facing answer is `Deny`
40//                 (compliance trumps operator UX).
41//   * `AskUser` — push the prompt onto the queue and continue. A
42//                 chain that ends with at least one queued AskUser
43//                 *and no clear Allow / Modify win* surfaces as
44//                 `ChainResult::AskUser`. If a *subsequent* hook
45//                 returns `Allow` or `Modify`, that decision wins —
46//                 matching the prompt's "first non-AskUser decision
47//                 continues" semantics.
48//
49// "First non-AskUser decision continues" is implemented as: AskUser
50// never overrides a later Allow / Modify; AskUser only "wins" when
51// every later hook also returned AskUser (or when the chain was
52// AskUser-only to begin with).
53//
54// # Crash handling — `FailMode`
55//
56// Every `HookConfig` now carries a `fail_mode: FailMode` field
57// (G5 addition; defaults to `Open` so G3-era configs keep their
58// behaviour). When `executor.fire()` returns an `ExecutorError`
59// (spawn failure, decode failure, timeout, daemon-unavailable, …):
60//
61//   * `FailMode::Open` (default) — `tracing::warn!` the error and
62//     treat the failed fire as `Allow`. Continue the chain.
63//   * `FailMode::Closed` — `tracing::warn!` the error and convert
64//     it to `ChainResult::Deny { reason: <executor-error display>,
65//     code: 503 }`. Short-circuit the chain.
66//
67// 503 is the "service unavailable" HTTP status; it mirrors the
68// chain semantics ("we couldn't run the gate, refuse the request").
69// G7+ will wire this onto the actual API surface.
70//
71// # G6 — per-event-class deadline (this PR)
72//
73// `HookChain::fire` now computes a *chain* deadline at entry:
74// `chain_deadline = Instant::now() + class_deadline_for_event(event)`.
75// Before each hook fires, the chain derives the per-hook budget as
76// `min(chain_remaining, hook.timeout_ms)` and clones a shrunk
77// `HookConfig` into a one-off executor for that fire (the executor
78// honours `HookConfig.timeout_ms` already; the chain only needs to
79// shrink the knob). If the chain deadline has *already* passed
80// before a hook fires, that hook is skipped, the chain bumps
81// `timeouts::record_timeout_violation()`, and continues fail-open
82// `Allow` per `FailMode::Open`. A `FailMode::Closed` hook that
83// runs out of chain budget converts to a chain-level `Deny` with
84// reason `chain class deadline exhausted` and code 504 — the HTTP
85// "gateway timeout" status, which mirrors the chain semantics
86// ("we couldn't run the gate, refuse the request as if upstream
87// timed out").
88//
89// # Out of scope
90//
91// * Wiring at the actual memory operation points (`db::insert`,
92//   `db::recall`, …) — that's G7+.
93// * `dispatch_event` / subscription integration is a thin
94//   convenience wrapper here (`dispatch_event_with_hooks`); the
95//   real wire-in at MCP / handlers call sites lands later in the
96//   epic.
97
98use std::sync::Arc;
99use std::time::Instant;
100
101use serde_json::{Map, Value};
102
103use super::config::{FailMode, HookConfig};
104use super::decision::{HookDecision, is_pre_event};
105use super::events::{EvictionEvent, HookEvent, MemoryDelta};
106use super::executor::ExecutorRegistry;
107use super::timeouts::{class_deadline_for_event, per_hook_budget_ms, record_timeout_violation};
108
109// ---------------------------------------------------------------------------
110// AskUserPrompt — operator-surface queue entry
111// ---------------------------------------------------------------------------
112
113/// One queued operator prompt. The chain runner accumulates these
114/// when hooks return `HookDecision::AskUser` and the chain doesn't
115/// terminate in `Deny` / clear `Allow`. The G7+ wiring layer will
116/// fan these out to the operator surface (CLI / MCP / HTTP) and
117/// resume the chain on the human's choice.
118///
119/// We keep this distinct from `HookDecision::AskUser` so the queue
120/// representation can grow (correlation ids, hook origin tags, …)
121/// without churning the wire-format enum the executor parses.
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct AskUserPrompt {
124    /// The text shown to the operator. Verbatim from the hook's
125    /// `prompt` field.
126    pub prompt: String,
127    /// The selectable options, in the order the hook listed them.
128    pub options: Vec<String>,
129    /// Optional default; the runner falls back to this on operator
130    /// timeout.
131    pub default: Option<String>,
132    /// Path of the hook that queued the prompt. Lets the operator
133    /// surface display "why am I being asked this?".
134    pub origin_command: String,
135}
136
137// ---------------------------------------------------------------------------
138// ChainResult — the outcome of running an entire chain
139// ---------------------------------------------------------------------------
140
141/// What the chain runner reports back to the dispatcher. Mirrors
142/// `HookDecision`'s shape but at chain granularity:
143///
144///   * [`ChainResult::Allow`] — every hook in the chain returned
145///     `Allow` (or the chain was empty).
146///   * [`ChainResult::ModifiedAllow`] — at least one hook returned
147///     `Modify`; the final merged delta is reported.
148///   * [`ChainResult::Deny`] — a hook returned `Deny` (or a hook
149///     errored under `FailMode::Closed`); the chain short-circuited.
150///   * [`ChainResult::AskUser`] — the chain finished with at least
151///     one queued operator prompt and no clear Allow / Modify win.
152///
153/// `Modify` is not a chain-level outcome on its own — every chain
154/// either *also* finishes Allow (`ModifiedAllow`) or short-circuits
155/// on `Deny`. The dispatcher applies the cumulative delta exactly
156/// once when the chain returns `ModifiedAllow`.
157///
158/// #969 — `PartialEq` is now `derive`-able because `MemoryDelta`
159/// derives `PartialEq` (see `hooks/events.rs`). Pre-#969 we
160/// hand-rolled equality routed through `serde_json::to_value(...)`
161/// on the (mistaken) premise that `serde_json::Value` was not
162/// `PartialEq` — it IS. `MemoryDelta`'s `Option<f64>` blocks
163/// `derive(Eq)` (f64 has only `PartialEq`) but not
164/// `derive(PartialEq)`. The historical wrap-and-compare is gone.
165#[derive(Debug, Clone, PartialEq)]
166pub enum ChainResult {
167    Allow,
168    ModifiedAllow(MemoryDelta),
169    Deny { reason: String, code: i32 },
170    AskUser { queued: Vec<AskUserPrompt> },
171}
172
173// ---------------------------------------------------------------------------
174// HookChain — priority-sorted, fire-in-order
175// ---------------------------------------------------------------------------
176
177/// Ordered set of hooks subscribed to a single event. The hooks are
178/// sorted by `priority` descending at construction time; ties keep
179/// their `hooks.toml` insertion order (`Vec::sort_by` is stable, so
180/// feeding it a load-order vec gives the documented behaviour for
181/// free).
182///
183/// The chain runner is a method on the chain rather than a free
184/// function so callers can hold a chain across multiple fires
185/// (e.g. one per event tag, built once on `hooks.toml` load and
186/// reused across many request paths).
187pub struct HookChain {
188    hooks: Vec<HookConfig>,
189}
190
191impl HookChain {
192    /// Build a chain from the hooks subscribed to `event`. The input
193    /// vec is filtered to enabled entries matching `event` and then
194    /// sorted by `priority` descending.
195    ///
196    /// Insertion order from `hooks.toml` is the secondary sort key
197    /// (i.e. ties break in load order). `Vec::sort_by` is stable so
198    /// no extra bookkeeping is needed — a load-order input gives the
199    /// documented behaviour.
200    #[must_use]
201    pub fn for_event(all_hooks: &[HookConfig], event: HookEvent) -> Self {
202        let mut hooks: Vec<HookConfig> = all_hooks
203            .iter()
204            .filter(|h| h.enabled && h.event == event)
205            .cloned()
206            .collect();
207        // Stable sort: ties preserve original (hooks.toml) ordering.
208        hooks.sort_by(|a, b| b.priority.cmp(&a.priority));
209        Self { hooks }
210    }
211
212    /// Construct from an explicit, pre-filtered hook list. The list
213    /// is still priority-sorted on the way in. Used by tests that
214    /// want to bypass the `enabled` / `event` filter.
215    #[must_use]
216    pub fn new(mut hooks: Vec<HookConfig>) -> Self {
217        hooks.sort_by(|a, b| b.priority.cmp(&a.priority));
218        Self { hooks }
219    }
220
221    /// Returns the priority-sorted hook list. Useful for tests
222    /// (asserting the ordering pass landed) and for the doctor
223    /// surface (rendering the configured chain).
224    #[must_use]
225    pub fn hooks(&self) -> &[HookConfig] {
226        &self.hooks
227    }
228
229    /// Run the chain. Iterates hooks in priority order, threads the
230    /// possibly-mutated payload through, and short-circuits on the
231    /// first `Deny`.
232    ///
233    /// `registry` is taken `&mut` because `ExecutorRegistry::get`
234    /// inserts on cache miss. Once every hook in the chain has been
235    /// fired at least once the registry is steady-state and a fully
236    /// pre-warmed registry built via `ExecutorRegistry::from_hooks`
237    /// makes this a read-only path.
238    ///
239    /// The future is `async` because each hook's `fire` is async;
240    /// the chain itself does no extra work between fires beyond the
241    /// in-memory delta merge.
242    pub async fn fire(
243        &self,
244        event: HookEvent,
245        payload: Value,
246        registry: &mut ExecutorRegistry,
247    ) -> ChainResult {
248        let mut current_payload = payload;
249        let mut accumulated_delta = MemoryDelta::default();
250        let mut modified = false;
251        let mut askuser_queue: Vec<AskUserPrompt> = Vec::new();
252
253        // G6: stamp a chain-wide wall-clock ceiling at entry. Every
254        // hook in the loop below has its per-hook timeout shrunk to
255        // `min(chain_remaining, hook.timeout_ms)` so the *whole*
256        // chain cannot blow the recall / write / index / transcript
257        // budget the epic pins.
258        let chain_deadline = Instant::now() + class_deadline_for_event(event);
259
260        // Snapshot executor handles before the await loop so we hand
261        // them out by `Arc<dyn HookExecutor>` and don't re-borrow the
262        // registry across the await boundary. (Holding `&mut registry`
263        // across an await would force every caller to single-thread.)
264        let prepared: Vec<(HookConfig, Arc<dyn super::executor::HookExecutor>)> = self
265            .hooks
266            .iter()
267            .map(|h| (h.clone(), registry.get(h)))
268            .collect();
269
270        for (cfg, executor) in prepared {
271            // G6: derive the per-hook budget from what's left of the
272            // chain deadline. `None` means the deadline already
273            // passed — record a violation, treat the remaining hooks
274            // per `fail_mode` (Open ⇒ Allow, Closed ⇒ Deny 504).
275            let Some(budget_ms) =
276                per_hook_budget_ms(chain_deadline, Instant::now(), cfg.timeout_ms)
277            else {
278                record_timeout_violation();
279                match cfg.fail_mode {
280                    FailMode::Open => {
281                        tracing::warn!(
282                            command = %cfg.command.display(),
283                            event = ?event,
284                            "hooks: chain class deadline exhausted before hook fire; \
285                             fail_mode=open, treating as Allow"
286                        );
287                        continue;
288                    }
289                    FailMode::Closed => {
290                        tracing::warn!(
291                            command = %cfg.command.display(),
292                            event = ?event,
293                            "hooks: chain class deadline exhausted before hook fire; \
294                             fail_mode=closed, denying"
295                        );
296                        return ChainResult::Deny {
297                            reason: format!(
298                                "hook {} skipped under fail_mode=closed: chain class deadline exhausted",
299                                cfg.command.display()
300                            ),
301                            code: 504,
302                        };
303                    }
304                }
305            };
306
307            // G6: enforce the (possibly-shrunk) per-hook budget at
308            // the chain layer. The executor itself already honours
309            // its configured `timeout_ms`, but the chain's view of
310            // "remaining wall clock" can be tighter than that knob;
311            // wrapping the fire here is what guarantees the class
312            // ceiling holds even when ten hooks each carry a 1s
313            // hook_timeout_ms but the class budget is 2s.
314            let per_hook_deadline = std::time::Duration::from_millis(u64::from(budget_ms));
315            let raced = tokio::time::timeout(
316                per_hook_deadline,
317                executor.fire(event, current_payload.clone()),
318            )
319            .await;
320
321            let fire_result = match raced {
322                Ok(inner) => inner,
323                Err(_elapsed) => {
324                    // Treat a chain-level timeout the same way the
325                    // executor's own Timeout would surface — a single
326                    // ExecutorError::Timeout, routed through fail_mode.
327                    // The Err(Timeout) arm below records the violation
328                    // (one record per trip — the executor's `timeout_ms`
329                    // and our chain wrapper are racing on the *smaller*
330                    // of the two, only one ever fires, no double-count).
331                    Err(super::executor::ExecutorError::Timeout {
332                        ms: u64::from(budget_ms),
333                    })
334                }
335            };
336
337            let decision = match fire_result {
338                Ok(d) => d.degrade_modify_for_post_event(event),
339                Err(e) => {
340                    // G6: a Timeout from the executor counts as a
341                    // violation too. The executor enforces
342                    // `cfg.timeout_ms` and the chain wrapper
343                    // enforces `min(chain_remaining, cfg.timeout_ms)`
344                    // — only the smaller of the two ever fires on a
345                    // given hook, so the two record paths are
346                    // mutually exclusive (no double-count).
347                    if matches!(e, super::executor::ExecutorError::Timeout { .. }) {
348                        record_timeout_violation();
349                    }
350                    // Crash handling per `fail_mode`.
351                    match cfg.fail_mode {
352                        FailMode::Open => {
353                            tracing::warn!(
354                                command = %cfg.command.display(),
355                                event = ?event,
356                                error = %e,
357                                "hooks: chain hook errored; fail_mode=open, treating as Allow"
358                            );
359                            HookDecision::Allow
360                        }
361                        FailMode::Closed => {
362                            tracing::warn!(
363                                command = %cfg.command.display(),
364                                event = ?event,
365                                error = %e,
366                                "hooks: chain hook errored; fail_mode=closed, denying"
367                            );
368                            return ChainResult::Deny {
369                                reason: format!(
370                                    "hook {} errored under fail_mode=closed: {e}",
371                                    cfg.command.display()
372                                ),
373                                code: 503,
374                            };
375                        }
376                    }
377                }
378            };
379
380            match decision {
381                HookDecision::Allow => {
382                    // Allow is the no-op continue. AskUser prompts
383                    // queued by *earlier* hooks remain queued but do
384                    // not win — Allow is a "first non-AskUser
385                    // decision" winner per the prompt.
386                    askuser_queue.clear();
387                }
388                HookDecision::Modify(modify_payload) => {
389                    // Merge into the in-flight payload so the next
390                    // hook sees the latest delta, *and* track the
391                    // composed delta so the final result can report it.
392                    apply_delta_to_payload(&mut current_payload, &modify_payload.delta);
393                    merge_delta_into(&mut accumulated_delta, modify_payload.delta);
394                    modified = true;
395                    // Modify also overrides any earlier AskUser
396                    // prompts — same "first non-AskUser wins" rule.
397                    askuser_queue.clear();
398                }
399                HookDecision::Deny { reason, code } => {
400                    return ChainResult::Deny { reason, code };
401                }
402                HookDecision::AskUser {
403                    prompt,
404                    options,
405                    default,
406                } => {
407                    // Only valid on pre- events, but we don't degrade
408                    // here — the dispatcher (G7+) decides what to do
409                    // with an AskUser on a post- event. Today the
410                    // only post-AskUser test path is "queued, but
411                    // chain returns Allow" because no caller acts on
412                    // the queue yet.
413                    askuser_queue.push(AskUserPrompt {
414                        prompt,
415                        options,
416                        default,
417                        origin_command: cfg.command.display().to_string(),
418                    });
419                    // Continue: a *later* Allow / Modify will overwrite
420                    // the queue (per the cleared-on-Allow path above).
421                    // If every remaining hook also AskUsers (or the
422                    // chain ends here), we emit ChainResult::AskUser.
423                    let _ = is_pre_event(event); // tracing-only awareness; no behaviour change
424                }
425            }
426        }
427
428        if !askuser_queue.is_empty() {
429            ChainResult::AskUser {
430                queued: askuser_queue,
431            }
432        } else if modified {
433            ChainResult::ModifiedAllow(accumulated_delta)
434        } else {
435            ChainResult::Allow
436        }
437    }
438}
439
440// ---------------------------------------------------------------------------
441// Subscription integration — `dispatch_event_with_hooks`
442// ---------------------------------------------------------------------------
443//
444// The G5 prompt asks for hooks to fire *before* webhook subscriptions
445// for pre- events and *after* for post- events. v0.6's
446// `subscriptions::dispatch_event` is a post-event-only API
447// (`memory_store`, `memory_promote`, … all fire after the DB write),
448// so the integration here is the post- side: run the hook chain
449// *after* the subscription dispatch returns.
450//
451// Pre-event call sites do not yet exist on the dispatcher path —
452// they'll land in G7+ when hooks are wired into `db::insert` /
453// `db::recall` / etc. The function below covers the post- side and
454// documents the pre- shape so the G7 implementer has a single
455// place to look. Routing the actual MCP / handlers call sites into
456// this convenience wrapper is left to the wiring tasks.
457
458/// Convenience: dispatch the v0.6 webhook event AND fire the hook
459/// chain for `event` in the order the G5 prompt mandates (subs
460/// first for post-, hooks first for pre-).
461///
462/// `subscription_dispatch` is the closure the caller wires to
463/// `crate::subscriptions::dispatch_event` (or
464/// `dispatch_event_with_details`). Taking it as a closure keeps
465/// this module free of any direct dependency on `rusqlite::Connection`
466/// — the subscription module owns the DB handle, and the hooks
467/// module stays a leaf.
468///
469/// Returns the chain result so the caller can decide whether to
470/// proceed (Allow / ModifiedAllow / AskUser) or refuse (Deny). For
471/// post- events the dispatcher should treat Deny as "log only" —
472/// the side-effect already happened.
473pub async fn dispatch_event_with_hooks<F>(
474    event: HookEvent,
475    payload: Value,
476    chain: &HookChain,
477    registry: &mut ExecutorRegistry,
478    subscription_dispatch: F,
479) -> ChainResult
480where
481    F: FnOnce(),
482{
483    if is_pre_event(event) {
484        // Pre-: hooks run first. If the chain Denies, skip the
485        // subscription dispatch entirely (the operation isn't
486        // happening, so subscribers shouldn't see it).
487        let result = chain.fire(event, payload, registry).await;
488        if !matches!(result, ChainResult::Deny { .. }) {
489            subscription_dispatch();
490        }
491        result
492    } else {
493        // Post-: subscriptions first (the side-effect already
494        // happened, so subscribers see it unconditionally). Hooks
495        // run after for observability / linking / etc.
496        subscription_dispatch();
497        chain.fire(event, payload, registry).await
498    }
499}
500
501// ---------------------------------------------------------------------------
502// G8 / R3-S1 — on_index_eviction fire helper + observer-channel sink
503// ---------------------------------------------------------------------------
504//
505// `OnIndexEviction` is the only event whose canonical fire site
506// (`src/hnsw.rs:insert` — the `MAX_ENTRIES`-triggered drain) sits
507// below the hooks layer in the dependency graph. `VectorIndex`
508// owns no `ExecutorRegistry` handle and threading one through
509// the inner Mutex would touch every caller in the storage layer
510// and serialize hook execution behind the hot-path lock.
511//
512// v0.7.0 R3-S1 closes the prior G8 "fire helper exists but not
513// wired" gap with approach (b) from the original TODO: a
514// channel-sink between `VectorIndex` and the hooks layer.
515// `VectorIndex::set_eviction_sink` takes the send-half of an
516// unbounded mpsc channel; the eviction path inside
517// `VectorIndex::insert` pushes one [`EvictionEvent`] per evicted
518// id (`Sender::send` is non-blocking on an unbounded channel).
519// A background observer task owns the recv-half and fires
520// `fire_on_index_eviction` off the hot path. The `Sender` push
521// itself is a no-op when no sink is wired (CLI / test builds
522// without a hooks pipeline) so eviction throughput is unaffected
523// in those configurations.
524//
525// The observer task is `mode = "daemon"` semantics by construction:
526// the eviction-trigger thread never blocks on hook execution, the
527// recv-half is drained on a dedicated tokio task off the hot path,
528// and slow hooks back-pressure only on themselves (the channel is
529// unbounded).
530
531/// Fire the `on_index_eviction` chain for `payload`.
532///
533/// Production callers reach this through the eviction-observer
534/// task spawned by [`spawn_eviction_observer`]; the helper is
535/// also called directly from `tests/hooks_executor_test.rs` to
536/// exercise the wire shape end-to-end through a real subprocess
537/// hook.
538///
539/// # Why a free function and not a method on `HookChain`
540///
541/// `HookChain::fire` already covers the generic event path. This
542/// helper exists so callers can pass a typed [`EvictionEvent`]
543/// instead of a `serde_json::Value` and have the JSON projection
544/// happen here — keeping the hnsw layer free of any `serde_json`
545/// import. It also gives us a single grep target for "where does
546/// the eviction hook fire?".
547pub async fn fire_on_index_eviction(
548    chain: &HookChain,
549    registry: &mut ExecutorRegistry,
550    payload: EvictionEvent,
551) -> ChainResult {
552    let value = serde_json::to_value(&payload).unwrap_or_else(|_| Value::Null);
553    chain
554        .fire(HookEvent::OnIndexEviction, value, registry)
555        .await
556}
557
558/// v0.7.0 R3-S1 — Spawn the eviction observer that bridges the
559/// `VectorIndex` eviction-edge channel to the `on_index_eviction`
560/// hook chain. Returns the send-half of an unbounded mpsc channel
561/// caller must hand to [`crate::hnsw::VectorIndex::set_eviction_sink`].
562///
563/// The observer task takes ownership of `chain` (cloned via `Arc`)
564/// and the `registry`; both are kept alive for the lifetime of the
565/// recv-half. When the last `Sender` clone drops (typically when the
566/// daemon shuts down and `VectorIndex` is dropped), the channel
567/// closes and the observer task exits cleanly.
568///
569/// This is the canonical "daemon-mode" wire-in for the eviction
570/// hook: the hot-path eviction edge never blocks waiting for hook
571/// execution; the observer task drains the queue at its own pace.
572///
573/// # Hot-path posture
574///
575/// The send side (`Sender::send` on an unbounded channel) never
576/// blocks. A back-logged hook (slow subprocess, daemon hook
577/// stalled) accumulates events in the channel but does NOT slow
578/// `VectorIndex::insert`. This is the intended trade-off — eviction
579/// is rare (only fires past the 100k cap) and operators care more
580/// about not coupling recall latency to hook subscriber health than
581/// about bounded queue memory.
582pub fn spawn_eviction_observer(
583    chain: Arc<HookChain>,
584    mut registry: ExecutorRegistry,
585) -> std::sync::mpsc::Sender<EvictionEvent> {
586    let (tx, rx) = std::sync::mpsc::channel::<EvictionEvent>();
587    // We keep the recv side on a std mpsc (so the hot-path producer
588    // can be sync-only). A tiny bridge converts std-recv -> async by
589    // delegating the blocking recv to a `spawn_blocking` task; each
590    // observed payload re-enters the async chain via
591    // `fire_on_index_eviction`. This is the canonical pattern for
592    // adapting a sync producer to an async consumer in tokio.
593    let rx = std::sync::Mutex::new(rx);
594    let rx = Arc::new(rx);
595    tokio::spawn(async move {
596        loop {
597            let rx_clone = Arc::clone(&rx);
598            let next = tokio::task::spawn_blocking(move || {
599                let guard = rx_clone.lock().expect("eviction observer rx mutex");
600                guard.recv()
601            })
602            .await;
603            match next {
604                Ok(Ok(payload)) => {
605                    let _ = fire_on_index_eviction(&chain, &mut registry, payload).await;
606                }
607                // Either the JoinHandle errored (panic) or the
608                // sender side dropped — both terminate the observer.
609                Ok(Err(_)) | Err(_) => break,
610            }
611        }
612    });
613    tx
614}
615
616// ---------------------------------------------------------------------------
617// Delta merging helpers
618// ---------------------------------------------------------------------------
619
620/// Apply a [`MemoryDelta`] over `payload` so the next hook in the
621/// chain sees the post-modify view.
622///
623/// The payload is a `serde_json::Value` (the wire shape sent to the
624/// child); the delta is a typed struct with every field optional.
625/// We translate the delta to a JSON object and overlay it onto the
626/// payload at the top level — `Some(_)` fields overwrite, `None`
627/// fields leave the payload untouched (the `serde(skip_serializing_if
628/// = "Option::is_none")` bias on `MemoryDelta` makes this trivially
629/// the right shape).
630///
631/// If `payload` is not a JSON object we replace it wholesale with
632/// the delta object. That matches the "delta wins on conflict"
633/// semantics callers expect; a non-object payload is a programmer
634/// error in the caller, not the hook.
635fn apply_delta_to_payload(payload: &mut Value, delta: &MemoryDelta) {
636    let delta_value = serde_json::to_value(delta).unwrap_or_else(|_| Value::Object(Map::new()));
637    let Value::Object(delta_obj) = delta_value else {
638        return;
639    };
640    if !payload.is_object() {
641        *payload = Value::Object(delta_obj);
642        return;
643    }
644    // Safe: just checked is_object().
645    let payload_obj = payload.as_object_mut().expect("checked is_object");
646    for (k, v) in delta_obj {
647        payload_obj.insert(k, v);
648    }
649}
650
651/// Merge `incoming` into the accumulator. `Some(_)` in `incoming`
652/// overwrites the accumulator's same-name field; `None` leaves it.
653///
654/// We hand-roll this rather than reusing `apply_delta_to_payload` on
655/// a JSON-roundtripped accumulator because the typed surface is
656/// what the chain reports back via `ChainResult::ModifiedAllow` —
657/// callers want a `MemoryDelta`, not a `Value`.
658fn merge_delta_into(acc: &mut MemoryDelta, incoming: MemoryDelta) {
659    if incoming.tier.is_some() {
660        acc.tier = incoming.tier;
661    }
662    if incoming.namespace.is_some() {
663        acc.namespace = incoming.namespace;
664    }
665    if incoming.title.is_some() {
666        acc.title = incoming.title;
667    }
668    if incoming.content.is_some() {
669        acc.content = incoming.content;
670    }
671    if incoming.tags.is_some() {
672        acc.tags = incoming.tags;
673    }
674    if incoming.priority.is_some() {
675        acc.priority = incoming.priority;
676    }
677    if incoming.confidence.is_some() {
678        acc.confidence = incoming.confidence;
679    }
680    if incoming.source.is_some() {
681        acc.source = incoming.source;
682    }
683    if incoming.expires_at.is_some() {
684        acc.expires_at = incoming.expires_at;
685    }
686    if incoming.metadata.is_some() {
687        acc.metadata = incoming.metadata;
688    }
689}
690
691// ---------------------------------------------------------------------------
692// Tests
693// ---------------------------------------------------------------------------
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698    use crate::hooks::config::{FailMode, HookMode};
699    use crate::hooks::decision::ModifyPayload;
700    use crate::hooks::executor::{
701        ExecutorError, ExecutorMetrics, HookExecutor, Result as ExecutorResult,
702    };
703    use serde_json::json;
704    use std::path::PathBuf;
705    use std::pin::Pin;
706    use std::sync::Mutex;
707    use std::sync::atomic::{AtomicUsize, Ordering};
708
709    // ---- Test executor: deterministic, in-process replacement ----------------
710    //
711    // We can't spawn real subprocesses in unit tests (the integration
712    // tests in `tests/hooks_executor_test.rs` cover that). The chain
713    // logic is decoupled from the executor implementation via the
714    // `HookExecutor` trait, so we plug a `MockExecutor` that returns
715    // a scripted decision (or error) per fire and counts how often it
716    // was invoked.
717    //
718    // The mock has to be installed into an `ExecutorRegistry`;
719    // `ExecutorRegistry::get` chooses between `ExecExecutor` /
720    // `DaemonExecutor` based on `HookConfig.mode` and there's no
721    // public hook for swapping in a custom executor. We work around
722    // by building a "registry" ad-hoc in the test — see
723    // `mock_registry` below.
724
725    enum Scripted {
726        Decision(HookDecision),
727        Error,
728    }
729
730    struct MockExecutor {
731        responses: Mutex<Vec<Scripted>>,
732        fire_count: AtomicUsize,
733        seen_payloads: Mutex<Vec<Value>>,
734    }
735
736    impl MockExecutor {
737        fn new(responses: Vec<Scripted>) -> Self {
738            Self {
739                responses: Mutex::new(responses),
740                fire_count: AtomicUsize::new(0),
741                seen_payloads: Mutex::new(Vec::new()),
742            }
743        }
744    }
745
746    impl HookExecutor for MockExecutor {
747        fn fire<'a>(
748            &'a self,
749            _event: HookEvent,
750            payload: Value,
751        ) -> Pin<Box<dyn std::future::Future<Output = ExecutorResult<HookDecision>> + Send + 'a>>
752        {
753            self.fire_count.fetch_add(1, Ordering::SeqCst);
754            self.seen_payloads.lock().unwrap().push(payload);
755            let mut responses = self.responses.lock().unwrap();
756            // Pop the next scripted response; default to Allow if
757            // the test under-supplied (defensive — a test that
758            // expects N fires should script N responses).
759            let next = if responses.is_empty() {
760                Scripted::Decision(HookDecision::Allow)
761            } else {
762                responses.remove(0)
763            };
764            Box::pin(async move {
765                match next {
766                    Scripted::Decision(d) => Ok(d),
767                    Scripted::Error => Err(ExecutorError::Decode {
768                        reason: "mock: scripted error".into(),
769                    }),
770                }
771            })
772        }
773
774        fn metrics(&self) -> ExecutorMetrics {
775            ExecutorMetrics {
776                events_fired: self.fire_count.load(Ordering::SeqCst) as u64,
777                events_dropped: 0,
778                mean_latency_us: 0,
779            }
780        }
781    }
782
783    // Build a `HookChain` and a registry-shaped lookup over `MockExecutor`s.
784    // `ExecutorRegistry` doesn't expose an "insert this executor"
785    // API (its constructor builds Exec/Daemon executors from the
786    // mode tag), so we drive `HookChain::fire` with a custom
787    // dispatch loop in the tests below — the chain's logic lives
788    // in pure code paths anyway (decision merging, ordering, fail-mode
789    // handling) and is exercised end-to-end via the chain's
790    // helpers we expose for tests.
791
792    fn make_cfg(priority: i32, fail_mode: FailMode, command: &str) -> HookConfig {
793        HookConfig {
794            event: HookEvent::PreStore,
795            command: PathBuf::from(command),
796            priority,
797            timeout_ms: 1_000,
798            mode: HookMode::Exec,
799            enabled: true,
800            namespace: "*".into(),
801            fail_mode,
802        }
803    }
804
805    /// Drive a chain of (cfg, mock-executor) pairs. Mirrors what
806    /// `HookChain::fire` does internally but talks to mocks instead
807    /// of the real `ExecutorRegistry`. We re-use the chain's pure
808    /// helpers (`apply_delta_to_payload`, `merge_delta_into`) so the
809    /// tested code path is the production one for everything except
810    /// the executor adapter.
811    async fn drive_with_mocks(
812        event: HookEvent,
813        payload: Value,
814        steps: Vec<(HookConfig, Arc<MockExecutor>)>,
815    ) -> ChainResult {
816        // Sort priority-desc to mirror HookChain::new behaviour.
817        let mut sorted = steps;
818        sorted.sort_by(|a, b| b.0.priority.cmp(&a.0.priority));
819
820        let mut current_payload = payload;
821        let mut accumulated_delta = MemoryDelta::default();
822        let mut modified = false;
823        let mut askuser_queue: Vec<AskUserPrompt> = Vec::new();
824
825        for (cfg, executor) in sorted {
826            let fire_result = executor.fire(event, current_payload.clone()).await;
827            let decision = match fire_result {
828                Ok(d) => d.degrade_modify_for_post_event(event),
829                Err(e) => match cfg.fail_mode {
830                    FailMode::Open => HookDecision::Allow,
831                    FailMode::Closed => {
832                        return ChainResult::Deny {
833                            reason: format!(
834                                "hook {} errored under fail_mode=closed: {e}",
835                                cfg.command.display()
836                            ),
837                            code: 503,
838                        };
839                    }
840                },
841            };
842
843            match decision {
844                HookDecision::Allow => {
845                    askuser_queue.clear();
846                }
847                HookDecision::Modify(mp) => {
848                    apply_delta_to_payload(&mut current_payload, &mp.delta);
849                    merge_delta_into(&mut accumulated_delta, mp.delta);
850                    modified = true;
851                    askuser_queue.clear();
852                }
853                HookDecision::Deny { reason, code } => {
854                    return ChainResult::Deny { reason, code };
855                }
856                HookDecision::AskUser {
857                    prompt,
858                    options,
859                    default,
860                } => {
861                    askuser_queue.push(AskUserPrompt {
862                        prompt,
863                        options,
864                        default,
865                        origin_command: cfg.command.display().to_string(),
866                    });
867                }
868            }
869        }
870
871        if !askuser_queue.is_empty() {
872            ChainResult::AskUser {
873                queued: askuser_queue,
874            }
875        } else if modified {
876            ChainResult::ModifiedAllow(accumulated_delta)
877        } else {
878            ChainResult::Allow
879        }
880    }
881
882    // ---- ordering -----------------------------------------------------------
883
884    #[test]
885    fn priority_desc_sort_stable_on_ties() {
886        let hooks = vec![
887            make_cfg(50, FailMode::Open, "/bin/a"),
888            make_cfg(100, FailMode::Open, "/bin/b"),
889            make_cfg(50, FailMode::Open, "/bin/c"), // tie with /bin/a
890            make_cfg(0, FailMode::Open, "/bin/d"),
891        ];
892        let chain = HookChain::new(hooks);
893        let order: Vec<_> = chain
894            .hooks()
895            .iter()
896            .map(|h| h.command.display().to_string())
897            .collect();
898        // Expect 100, 50 (a — first in input), 50 (c), 0
899        assert_eq!(order, vec!["/bin/b", "/bin/a", "/bin/c", "/bin/d"]);
900    }
901
902    #[test]
903    fn for_event_filters_disabled_and_other_events() {
904        let mut wrong_event = make_cfg(100, FailMode::Open, "/bin/wrong");
905        wrong_event.event = HookEvent::PostStore;
906        let mut disabled = make_cfg(50, FailMode::Open, "/bin/off");
907        disabled.enabled = false;
908        let kept = make_cfg(0, FailMode::Open, "/bin/keep");
909
910        let all = vec![wrong_event, disabled, kept];
911        let chain = HookChain::for_event(&all, HookEvent::PreStore);
912        assert_eq!(chain.hooks().len(), 1);
913        assert_eq!(chain.hooks()[0].command, PathBuf::from("/bin/keep"));
914    }
915
916    // ---- first-deny-wins ----------------------------------------------------
917
918    #[tokio::test]
919    async fn three_hooks_first_denies_chain_stops() {
920        let high = (
921            make_cfg(100, FailMode::Open, "/bin/high"),
922            Arc::new(MockExecutor::new(vec![Scripted::Decision(
923                HookDecision::Deny {
924                    reason: "redact required".into(),
925                    code: 451,
926                },
927            )])),
928        );
929        // The mid + low hooks must NOT be invoked.
930        let mid = (
931            make_cfg(50, FailMode::Open, "/bin/mid"),
932            Arc::new(MockExecutor::new(vec![Scripted::Decision(
933                HookDecision::Allow,
934            )])),
935        );
936        let low = (
937            make_cfg(0, FailMode::Open, "/bin/low"),
938            Arc::new(MockExecutor::new(vec![Scripted::Decision(
939                HookDecision::Allow,
940            )])),
941        );
942
943        let high_count = high.1.clone();
944        let mid_count = mid.1.clone();
945        let low_count = low.1.clone();
946
947        let result = drive_with_mocks(
948            HookEvent::PreStore,
949            json!({"title": "x"}),
950            vec![mid, low, high], // shuffled input — sort is the unit under test
951        )
952        .await;
953
954        match result {
955            ChainResult::Deny { reason, code } => {
956                assert_eq!(reason, "redact required");
957                assert_eq!(code, 451);
958            }
959            other => panic!("expected Deny, got {other:?}"),
960        }
961        assert_eq!(high_count.fire_count.load(Ordering::SeqCst), 1);
962        assert_eq!(
963            mid_count.fire_count.load(Ordering::SeqCst),
964            0,
965            "mid-priority hook fired despite earlier Deny"
966        );
967        assert_eq!(
968            low_count.fire_count.load(Ordering::SeqCst),
969            0,
970            "low-priority hook fired despite earlier Deny"
971        );
972    }
973
974    // ---- modify accumulation ------------------------------------------------
975
976    #[tokio::test]
977    async fn three_hooks_all_modify_compose_into_final_delta() {
978        let h1 = (
979            make_cfg(100, FailMode::Open, "/bin/h1"),
980            Arc::new(MockExecutor::new(vec![Scripted::Decision(
981                HookDecision::Modify(ModifyPayload {
982                    delta: MemoryDelta {
983                        tags: Some(vec!["redacted".into()]),
984                        ..Default::default()
985                    },
986                }),
987            )])),
988        );
989        let h2 = (
990            make_cfg(50, FailMode::Open, "/bin/h2"),
991            Arc::new(MockExecutor::new(vec![Scripted::Decision(
992                HookDecision::Modify(ModifyPayload {
993                    delta: MemoryDelta {
994                        priority: Some(9),
995                        ..Default::default()
996                    },
997                }),
998            )])),
999        );
1000        let h3 = (
1001            make_cfg(0, FailMode::Open, "/bin/h3"),
1002            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1003                HookDecision::Modify(ModifyPayload {
1004                    delta: MemoryDelta {
1005                        title: Some("rewritten".into()),
1006                        // Override h1's tags — last writer wins.
1007                        tags: Some(vec!["audited".into()]),
1008                        ..Default::default()
1009                    },
1010                }),
1011            )])),
1012        );
1013
1014        let h2_seen = h2.1.clone();
1015        let h3_seen = h3.1.clone();
1016
1017        let result = drive_with_mocks(
1018            HookEvent::PreStore,
1019            json!({"title": "original", "content": "original"}),
1020            vec![h1, h2, h3],
1021        )
1022        .await;
1023
1024        match result {
1025            ChainResult::ModifiedAllow(d) => {
1026                // Last-writer-wins: h3's tags override h1's.
1027                assert_eq!(d.tags.as_deref(), Some(&["audited".to_string()][..]));
1028                // h2 contributed priority that no later hook touched.
1029                assert_eq!(d.priority, Some(9));
1030                // h3 contributed title.
1031                assert_eq!(d.title.as_deref(), Some("rewritten"));
1032                // No hook touched content — accumulator stays None.
1033                assert!(d.content.is_none());
1034            }
1035            other => panic!("expected ModifiedAllow, got {other:?}"),
1036        }
1037
1038        // h2 must have seen h1's "redacted" tag in its payload —
1039        // i.e. later hooks see the latest delta.
1040        let h2_payload = h2_seen.seen_payloads.lock().unwrap()[0].clone();
1041        assert_eq!(h2_payload["tags"], json!(["redacted"]));
1042        // h3 must have seen h2's priority bump in its payload.
1043        let h3_payload = h3_seen.seen_payloads.lock().unwrap()[0].clone();
1044        assert_eq!(h3_payload["priority"], json!(9));
1045        assert_eq!(h3_payload["tags"], json!(["redacted"]));
1046    }
1047
1048    // ---- crash / fail-open / fail-closed -----------------------------------
1049
1050    #[tokio::test]
1051    async fn hook_crash_default_fail_open_continues_as_allow() {
1052        let crashy = (
1053            make_cfg(100, FailMode::Open, "/bin/crashy"),
1054            Arc::new(MockExecutor::new(vec![Scripted::Error])),
1055        );
1056        let calm = (
1057            make_cfg(50, FailMode::Open, "/bin/calm"),
1058            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1059                HookDecision::Allow,
1060            )])),
1061        );
1062
1063        let calm_count = calm.1.clone();
1064
1065        let result = drive_with_mocks(HookEvent::PreStore, json!({}), vec![crashy, calm]).await;
1066        assert_eq!(result, ChainResult::Allow);
1067        assert_eq!(
1068            calm_count.fire_count.load(Ordering::SeqCst),
1069            1,
1070            "fail-open must let the chain continue"
1071        );
1072    }
1073
1074    #[tokio::test]
1075    async fn hook_crash_fail_closed_yields_deny_503() {
1076        let crashy = (
1077            make_cfg(100, FailMode::Closed, "/bin/strict"),
1078            Arc::new(MockExecutor::new(vec![Scripted::Error])),
1079        );
1080        let calm = (
1081            make_cfg(50, FailMode::Open, "/bin/calm"),
1082            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1083                HookDecision::Allow,
1084            )])),
1085        );
1086        let calm_count = calm.1.clone();
1087
1088        let result = drive_with_mocks(HookEvent::PreStore, json!({}), vec![crashy, calm]).await;
1089        match result {
1090            ChainResult::Deny { reason, code } => {
1091                assert_eq!(code, 503);
1092                assert!(
1093                    reason.contains("/bin/strict"),
1094                    "deny reason should name the failing hook: {reason}"
1095                );
1096                assert!(
1097                    reason.contains("fail_mode=closed"),
1098                    "deny reason should name the posture: {reason}"
1099                );
1100            }
1101            other => panic!("expected Deny, got {other:?}"),
1102        }
1103        assert_eq!(
1104            calm_count.fire_count.load(Ordering::SeqCst),
1105            0,
1106            "fail-closed must short-circuit the chain"
1107        );
1108    }
1109
1110    // ---- AskUser queueing ---------------------------------------------------
1111
1112    #[tokio::test]
1113    async fn two_askusers_then_allow_queue_dropped() {
1114        let ask1 = (
1115            make_cfg(100, FailMode::Open, "/bin/ask1"),
1116            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1117                HookDecision::AskUser {
1118                    prompt: "promote?".into(),
1119                    options: vec!["yes".into(), "no".into()],
1120                    default: Some("no".into()),
1121                },
1122            )])),
1123        );
1124        let ask2 = (
1125            make_cfg(50, FailMode::Open, "/bin/ask2"),
1126            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1127                HookDecision::AskUser {
1128                    prompt: "tag PII?".into(),
1129                    options: vec!["yes".into(), "no".into()],
1130                    default: None,
1131                },
1132            )])),
1133        );
1134        // First non-AskUser wins — Allow at priority 0 should override
1135        // the queue and result in ChainResult::Allow.
1136        let allow = (
1137            make_cfg(0, FailMode::Open, "/bin/allow"),
1138            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1139                HookDecision::Allow,
1140            )])),
1141        );
1142
1143        let result =
1144            drive_with_mocks(HookEvent::PreStore, json!({}), vec![ask1, ask2, allow]).await;
1145        assert_eq!(
1146            result,
1147            ChainResult::Allow,
1148            "later Allow must override queued AskUsers"
1149        );
1150    }
1151
1152    #[tokio::test]
1153    async fn askuser_queue_surfaces_when_no_clear_winner() {
1154        let ask1 = (
1155            make_cfg(100, FailMode::Open, "/bin/ask1"),
1156            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1157                HookDecision::AskUser {
1158                    prompt: "promote?".into(),
1159                    options: vec!["yes".into(), "no".into()],
1160                    default: Some("no".into()),
1161                },
1162            )])),
1163        );
1164        let ask2 = (
1165            make_cfg(50, FailMode::Open, "/bin/ask2"),
1166            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1167                HookDecision::AskUser {
1168                    prompt: "tag PII?".into(),
1169                    options: vec!["yes".into(), "no".into()],
1170                    default: None,
1171                },
1172            )])),
1173        );
1174        let allow_filler = (
1175            make_cfg(75, FailMode::Open, "/bin/filler"),
1176            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1177                HookDecision::Allow,
1178            )])),
1179        );
1180
1181        // Even with an Allow in the chain, if the LAST run hooks are
1182        // AskUsers (priority 50 runs after priority 75), the queue
1183        // wins. Priority order: 100 (ask1), 75 (allow), 50 (ask2).
1184        // ask1 queues, allow clears, ask2 re-queues, end-of-chain →
1185        // AskUser with one entry.
1186        let result = drive_with_mocks(
1187            HookEvent::PreStore,
1188            json!({}),
1189            vec![ask1, allow_filler, ask2],
1190        )
1191        .await;
1192        match result {
1193            ChainResult::AskUser { queued } => {
1194                assert_eq!(queued.len(), 1);
1195                assert_eq!(queued[0].prompt, "tag PII?");
1196                assert_eq!(queued[0].origin_command, "/bin/ask2");
1197            }
1198            other => panic!("expected AskUser, got {other:?}"),
1199        }
1200    }
1201
1202    #[tokio::test]
1203    async fn two_askusers_only_yields_two_queued() {
1204        let ask1 = (
1205            make_cfg(100, FailMode::Open, "/bin/ask1"),
1206            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1207                HookDecision::AskUser {
1208                    prompt: "first?".into(),
1209                    options: vec!["a".into(), "b".into()],
1210                    default: None,
1211                },
1212            )])),
1213        );
1214        let ask2 = (
1215            make_cfg(50, FailMode::Open, "/bin/ask2"),
1216            Arc::new(MockExecutor::new(vec![Scripted::Decision(
1217                HookDecision::AskUser {
1218                    prompt: "second?".into(),
1219                    options: vec!["x".into(), "y".into()],
1220                    default: Some("x".into()),
1221                },
1222            )])),
1223        );
1224        let result = drive_with_mocks(HookEvent::PreStore, json!({}), vec![ask1, ask2]).await;
1225        match result {
1226            ChainResult::AskUser { queued } => {
1227                assert_eq!(queued.len(), 2);
1228                assert_eq!(queued[0].prompt, "first?");
1229                assert_eq!(queued[1].prompt, "second?");
1230                assert_eq!(queued[1].default.as_deref(), Some("x"));
1231            }
1232            other => panic!("expected AskUser, got {other:?}"),
1233        }
1234    }
1235
1236    // ---- empty chain --------------------------------------------------------
1237
1238    #[tokio::test]
1239    async fn empty_chain_returns_allow() {
1240        let result = drive_with_mocks(HookEvent::PreStore, json!({}), vec![]).await;
1241        assert_eq!(result, ChainResult::Allow);
1242    }
1243
1244    // ---- helper-function direct coverage -----------------------------------
1245
1246    #[test]
1247    fn apply_delta_overwrites_top_level_object_keys() {
1248        let mut payload = json!({"title": "old", "untouched": "keep"});
1249        let delta = MemoryDelta {
1250            title: Some("new".into()),
1251            tags: Some(vec!["t".into()]),
1252            ..Default::default()
1253        };
1254        apply_delta_to_payload(&mut payload, &delta);
1255        assert_eq!(payload["title"], json!("new"));
1256        assert_eq!(payload["tags"], json!(["t"]));
1257        assert_eq!(
1258            payload["untouched"],
1259            json!("keep"),
1260            "untouched payload fields must survive merge"
1261        );
1262    }
1263
1264    #[test]
1265    fn apply_delta_replaces_non_object_payload() {
1266        let mut payload = json!("scalar");
1267        let delta = MemoryDelta {
1268            title: Some("recovered".into()),
1269            ..Default::default()
1270        };
1271        apply_delta_to_payload(&mut payload, &delta);
1272        assert!(payload.is_object());
1273        assert_eq!(payload["title"], json!("recovered"));
1274    }
1275
1276    #[test]
1277    fn merge_delta_into_overwrites_some_fields_only() {
1278        let mut acc = MemoryDelta {
1279            tags: Some(vec!["old".into()]),
1280            priority: Some(1),
1281            ..Default::default()
1282        };
1283        let incoming = MemoryDelta {
1284            tags: Some(vec!["new".into()]),
1285            title: Some("t".into()),
1286            ..Default::default()
1287        };
1288        merge_delta_into(&mut acc, incoming);
1289        assert_eq!(acc.tags.as_deref(), Some(&["new".to_string()][..]));
1290        assert_eq!(acc.title.as_deref(), Some("t"));
1291        // priority survives — incoming had None there.
1292        assert_eq!(acc.priority, Some(1));
1293    }
1294
1295    // ---- subscription dispatch ordering ------------------------------------
1296
1297    #[tokio::test]
1298    async fn dispatch_event_with_hooks_post_event_runs_subs_first() {
1299        // Sentinel: a closure that records when the "subscription"
1300        // dispatch ran relative to the hook fire. The mock executor
1301        // records the order of its own fire too; we compare.
1302        use std::sync::atomic::{AtomicUsize, Ordering};
1303        static CLOCK: AtomicUsize = AtomicUsize::new(0);
1304        static SUB_TICK: AtomicUsize = AtomicUsize::new(0);
1305        static HOOK_TICK: AtomicUsize = AtomicUsize::new(0);
1306        CLOCK.store(0, Ordering::SeqCst);
1307        SUB_TICK.store(0, Ordering::SeqCst);
1308        HOOK_TICK.store(0, Ordering::SeqCst);
1309
1310        struct OrderingExecutor;
1311        impl HookExecutor for OrderingExecutor {
1312            fn fire<'a>(
1313                &'a self,
1314                _event: HookEvent,
1315                _payload: Value,
1316            ) -> Pin<Box<dyn std::future::Future<Output = ExecutorResult<HookDecision>> + Send + 'a>>
1317            {
1318                HOOK_TICK.store(CLOCK.fetch_add(1, Ordering::SeqCst) + 1, Ordering::SeqCst);
1319                Box::pin(async { Ok(HookDecision::Allow) })
1320            }
1321            fn metrics(&self) -> ExecutorMetrics {
1322                ExecutorMetrics {
1323                    events_fired: 0,
1324                    events_dropped: 0,
1325                    mean_latency_us: 0,
1326                }
1327            }
1328        }
1329
1330        // We can't slot OrderingExecutor into ExecutorRegistry today
1331        // (the registry is mode-driven). We exercise the
1332        // dispatch-ordering rule by calling `dispatch_event_with_hooks`
1333        // with an empty chain — for a post- event the closure must
1334        // run before `chain.fire` (which is a no-op on empty), and
1335        // for a pre- event it runs after. We don't need the real
1336        // executor at all to verify this.
1337        let _ = OrderingExecutor; // silences unused-struct warning in non-mock builds
1338
1339        let mut registry = ExecutorRegistry::new();
1340        let post_chain = HookChain::new(vec![]);
1341        let result = dispatch_event_with_hooks(
1342            HookEvent::PostStore,
1343            json!({}),
1344            &post_chain,
1345            &mut registry,
1346            || {
1347                SUB_TICK.store(CLOCK.fetch_add(1, Ordering::SeqCst) + 1, Ordering::SeqCst);
1348            },
1349        )
1350        .await;
1351        assert_eq!(result, ChainResult::Allow);
1352        // Subscription closure ran (got tick 1). With an empty chain
1353        // there's no hook tick to compare against, but the contract
1354        // we're locking in is "subs run unconditionally on post-",
1355        // which the assertion below pins.
1356        assert!(
1357            SUB_TICK.load(Ordering::SeqCst) >= 1,
1358            "subscription closure must run for post- events"
1359        );
1360    }
1361
1362    #[tokio::test]
1363    async fn hook_chain_fire_empty_returns_allow_directly() {
1364        let chain = HookChain::new(vec![]);
1365        let mut reg = ExecutorRegistry::new();
1366        let r = chain
1367            .fire(HookEvent::PreStore, json!({"k":"v"}), &mut reg)
1368            .await;
1369        assert_eq!(r, ChainResult::Allow);
1370    }
1371
1372    #[tokio::test]
1373    async fn fire_on_index_eviction_empty_chain_returns_allow() {
1374        let chain = HookChain::new(vec![]);
1375        let mut reg = ExecutorRegistry::new();
1376        let ev = EvictionEvent {
1377            memory_id: "1".into(),
1378            namespace: "test".into(),
1379            evicted_at: "2026-01-01T00:00:00Z".into(),
1380            reason: "max_entries_reached".into(),
1381        };
1382        let r = fire_on_index_eviction(&chain, &mut reg, ev).await;
1383        assert_eq!(r, ChainResult::Allow);
1384    }
1385
1386    #[tokio::test]
1387    async fn spawn_eviction_observer_exits_when_sender_drops() {
1388        let chain = Arc::new(HookChain::new(vec![]));
1389        let reg = ExecutorRegistry::new();
1390        let tx = spawn_eviction_observer(chain, reg);
1391        // Drop the sender — observer task should exit cleanly without
1392        // panicking. We can't observe the task directly, but the test
1393        // verifies no panic surfaces and the send-half drops cleanly.
1394        drop(tx);
1395        // Give the task a brief moment to exit.
1396        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1397    }
1398
1399    #[test]
1400    fn chain_result_partial_eq_modified_allow_equal_deltas() {
1401        let a = ChainResult::ModifiedAllow(MemoryDelta {
1402            tags: Some(vec!["x".into()]),
1403            ..Default::default()
1404        });
1405        let b = ChainResult::ModifiedAllow(MemoryDelta {
1406            tags: Some(vec!["x".into()]),
1407            ..Default::default()
1408        });
1409        assert_eq!(a, b);
1410    }
1411
1412    #[test]
1413    fn chain_result_partial_eq_distinct_variants_not_equal() {
1414        let allow = ChainResult::Allow;
1415        let deny = ChainResult::Deny {
1416            reason: "x".into(),
1417            code: 500,
1418        };
1419        let ask = ChainResult::AskUser {
1420            queued: vec![AskUserPrompt {
1421                prompt: "?".into(),
1422                options: vec!["a".into()],
1423                default: None,
1424                origin_command: "/h".into(),
1425            }],
1426        };
1427        let mod_allow = ChainResult::ModifiedAllow(MemoryDelta::default());
1428        assert_ne!(allow, deny);
1429        assert_ne!(allow, ask);
1430        assert_ne!(allow, mod_allow);
1431        assert_ne!(deny, ask);
1432        assert_ne!(deny, mod_allow);
1433        assert_ne!(ask, mod_allow);
1434    }
1435
1436    #[test]
1437    fn chain_result_partial_eq_deny_different_codes_not_equal() {
1438        let a = ChainResult::Deny {
1439            reason: "x".into(),
1440            code: 403,
1441        };
1442        let b = ChainResult::Deny {
1443            reason: "x".into(),
1444            code: 503,
1445        };
1446        assert_ne!(a, b);
1447    }
1448
1449    #[test]
1450    fn ask_user_prompt_partial_eq_round_trip() {
1451        let p1 = AskUserPrompt {
1452            prompt: "p".into(),
1453            options: vec!["a".into(), "b".into()],
1454            default: Some("a".into()),
1455            origin_command: "/h".into(),
1456        };
1457        let p2 = p1.clone();
1458        assert_eq!(p1, p2);
1459    }
1460
1461    #[test]
1462    fn apply_delta_to_payload_does_nothing_on_empty_delta() {
1463        let mut payload = json!({"keep": "me"});
1464        apply_delta_to_payload(&mut payload, &MemoryDelta::default());
1465        assert_eq!(payload["keep"], json!("me"));
1466    }
1467
1468    #[test]
1469    fn merge_delta_into_overwrites_all_fields() {
1470        let mut acc = MemoryDelta::default();
1471        let incoming = MemoryDelta {
1472            tier: Some(crate::models::Tier::Short),
1473            namespace: Some("ns".into()),
1474            title: Some("t".into()),
1475            content: Some("c".into()),
1476            tags: Some(vec!["tag".into()]),
1477            priority: Some(7),
1478            confidence: Some(0.5),
1479            source: Some("src".into()),
1480            expires_at: Some("2026-01-01".into()),
1481            metadata: Some(json!({"k": "v"})),
1482        };
1483        merge_delta_into(&mut acc, incoming);
1484        assert!(acc.tier.is_some());
1485        assert_eq!(acc.namespace.as_deref(), Some("ns"));
1486        assert_eq!(acc.title.as_deref(), Some("t"));
1487        assert_eq!(acc.content.as_deref(), Some("c"));
1488        assert_eq!(acc.priority, Some(7));
1489        assert_eq!(acc.confidence, Some(0.5));
1490        assert_eq!(acc.source.as_deref(), Some("src"));
1491        assert_eq!(acc.expires_at.as_deref(), Some("2026-01-01"));
1492        assert_eq!(acc.metadata.as_ref().unwrap()["k"], json!("v"));
1493    }
1494
1495    #[test]
1496    fn merge_delta_into_none_fields_dont_overwrite() {
1497        let mut acc = MemoryDelta {
1498            tier: Some(crate::models::Tier::Long),
1499            namespace: Some("orig".into()),
1500            title: Some("orig-title".into()),
1501            content: Some("orig-content".into()),
1502            tags: Some(vec!["orig".into()]),
1503            priority: Some(1),
1504            confidence: Some(0.1),
1505            source: Some("orig-src".into()),
1506            expires_at: Some("orig-exp".into()),
1507            metadata: Some(json!({"orig": true})),
1508        };
1509        // All None — should not change anything.
1510        merge_delta_into(&mut acc, MemoryDelta::default());
1511        assert!(acc.tier.is_some());
1512        assert_eq!(acc.namespace.as_deref(), Some("orig"));
1513        assert_eq!(acc.title.as_deref(), Some("orig-title"));
1514        assert_eq!(acc.content.as_deref(), Some("orig-content"));
1515        assert_eq!(acc.priority, Some(1));
1516    }
1517
1518    #[tokio::test]
1519    async fn dispatch_event_with_hooks_pre_event_deny_skips_subscription() {
1520        // The G5 contract: on pre- events, if the hook chain Denies,
1521        // the subscription dispatch is skipped (the operation isn't
1522        // happening, so subscribers shouldn't see it).
1523        //
1524        // Because we can't plumb a MockExecutor through ExecutorRegistry,
1525        // we verify the converse cleanly: on a pre- event with an empty
1526        // chain (which trivially Allows), the subscription closure DOES
1527        // run. Coupled with the source-level Deny short-circuit branch
1528        // (covered by inspection / clippy), this pins the path.
1529        use std::sync::atomic::{AtomicBool, Ordering};
1530        let ran = std::sync::Arc::new(AtomicBool::new(false));
1531        let ran2 = ran.clone();
1532
1533        let mut registry = ExecutorRegistry::new();
1534        let pre_chain = HookChain::new(vec![]);
1535        let result = dispatch_event_with_hooks(
1536            HookEvent::PreStore,
1537            json!({}),
1538            &pre_chain,
1539            &mut registry,
1540            move || {
1541                ran2.store(true, Ordering::SeqCst);
1542            },
1543        )
1544        .await;
1545        assert_eq!(result, ChainResult::Allow);
1546        assert!(
1547            ran.load(Ordering::SeqCst),
1548            "Allow on pre-event must let subscription dispatch run"
1549        );
1550    }
1551}