Skip to main content

ai_memory/
subscriptions.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.6.0.0 — webhook subscriptions.
5//!
6//! Subscribers register a URL + shared secret + event/namespace/agent
7//! filters. When a matching event fires (e.g. `memory_store`), a
8//! fire-and-forget thread POSTs an HMAC-SHA256-signed JSON payload.
9//!
10//! SSRF hardening:
11//! - `http://` only to `127.0.0.0/8` or `localhost` hosts;
12//!   everywhere else requires `https://`
13//! - RFC1918 / RFC4193 / link-local hosts are rejected unless
14//!   `allow_private_networks = true` in the daemon config
15//!
16//! Signature:
17//! - Header `X-Ai-Memory-Signature: sha256=<hex>` over the raw
18//!   JSON body
19//! - The secret stored in the DB is a SHA-256 of the plaintext
20//!   shared secret; the plaintext is returned **once** at
21//!   subscription time and never leaves the DB after.
22
23use crate::models::field_names;
24use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs};
25use std::str::FromStr;
26use std::sync::{Arc, OnceLock};
27
28use anyhow::{Context, Result, anyhow};
29use rusqlite::{Connection, params};
30use serde::{Deserialize, Serialize};
31use sha2::{Digest, Sha256};
32use tokio::sync::Semaphore;
33
34/// Tracing target for the subscription fan-out / DLQ surface
35/// (#1558 tracing-target SSOT).
36const SUBSCRIPTIONS_TRACE_TARGET: &str = "ai_memory::subscriptions";
37
38/// Public-facing subscription record (no secret plaintext).
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct Subscription {
41    pub id: String,
42    pub url: String,
43    pub events: String,
44    pub namespace_filter: Option<String>,
45    pub agent_filter: Option<String>,
46    pub created_by: Option<String>,
47    pub created_at: String,
48    pub dispatch_count: i64,
49    pub failure_count: i64,
50    /// v0.6.3.1 P5 (G9): structured per-event-type opt-in list. When
51    /// `Some(list)` the subscription only fires for event types in
52    /// `list` (overriding the legacy comma-separated `events`
53    /// whitelist). When `None` (default) all events match — preserves
54    /// pre-P5 behaviour for existing subscribers.
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub event_types: Option<Vec<String>>,
57}
58
59/// Parameters for creating a subscription.
60pub struct NewSubscription<'a> {
61    pub url: &'a str,
62    pub events: &'a str,
63    pub secret: Option<&'a str>,
64    pub namespace_filter: Option<&'a str>,
65    pub agent_filter: Option<&'a str>,
66    pub created_by: Option<&'a str>,
67    /// v0.6.3.1 P5 (G9): optional structured event-type whitelist. When
68    /// `Some`, only the listed event types fire. When `None`, the legacy
69    /// `events` field (comma-separated / `*`) governs — the historical
70    /// behaviour for backward compatibility.
71    pub event_types: Option<&'a [String]>,
72}
73
74/// Canonical list of webhook lifecycle events surfaced to subscribers
75/// and to `memory_capabilities` (capabilities v2 `webhook_events`).
76/// Keep stable: integrators pin against these strings.
77///
78/// v0.7.0 K4 — `approval_requested` joined the list. It fires every
79/// time a `pending_actions` row is inserted by the governance gate
80/// (locally via `db::queue_pending_action` or remotely via
81/// `db::upsert_pending_action`). Subscribers opt in via the existing
82/// [`NewSubscription::event_types`] structured filter; legacy
83/// wildcard-event subscribers also receive it. Closes the
84/// v0.6.3.1 honest-Capabilities-v2 disclosure that
85/// `approval.subscribers` was advertised but never published — the
86/// K10 Approval API HTTP+SSE handler consumes these events directly.
87///
88/// v0.7 J4 / G14 — `memory_link_invalidated` also joins so subscribers
89/// can replay the audit-edge timeline (every successful
90/// `memory_kg_invalidate` fires this event after the link's
91/// `valid_until` is set, regardless of which KG backend handled the
92/// SET).
93// v0.7.x (issue #1174 PR1 — pm-v3.1 MCP tool name sweep): the three
94// entries that ARE MCP tool names reference the canonical
95// `tool_names` consts.
96//
97// v0.7.0 multi-agent literal-sweep (scanner B finding F-B10.x): the
98// remaining four entries — subscription-event types distinct from
99// MCP method names — now also consume named consts in
100// [`webhook_events`] below. Pre-sweep these were the last raw-literal
101// holdouts in this array; centralising them closes the drift class.
102pub const WEBHOOK_EVENT_TYPES: &[&str] = &[
103    crate::mcp::registry::tool_names::MEMORY_STORE,
104    crate::mcp::registry::tool_names::MEMORY_PROMOTE,
105    crate::mcp::registry::tool_names::MEMORY_DELETE,
106    webhook_events::MEMORY_LINK_CREATED,
107    webhook_events::MEMORY_LINK_INVALIDATED,
108    webhook_events::MEMORY_CONSOLIDATED,
109    webhook_events::APPROVAL_REQUESTED,
110];
111
112/// v0.7.0 multi-agent literal-sweep (scanner B finding F-B10.x) —
113/// canonical webhook-event-type slugs that are NOT MCP tool names.
114///
115/// Distinct from `signed_events::event_types` (audit-chain slugs use
116/// dot-separated names like `"memory_link.created"`); webhook events
117/// use underscore-separated names matching the v0.6.x wire contract
118/// (`"memory_link_created"`). A rename or schema-evolution that adds
119/// new webhook events touches this mod + the [`WEBHOOK_EVENT_TYPES`]
120/// array (which is statically-asserted to contain every const here
121/// via the unit test `webhook_event_consts_appear_in_array`).
122pub mod webhook_events {
123    /// Fired by `memory_link.create` substrate path after every
124    /// successful link write (signed or unsigned). Subscribers consume
125    /// via the K10 / J4 webhook fan-out.
126    pub const MEMORY_LINK_CREATED: &str = "memory_link_created";
127
128    /// Fired by `memory_kg_invalidate` after `valid_until` is set,
129    /// regardless of which KG backend handled the SET. Joined the
130    /// webhook set at v0.7 J4 / G14 so subscribers can replay the
131    /// audit-edge timeline.
132    pub const MEMORY_LINK_INVALIDATED: &str = "memory_link_invalidated";
133
134    /// Fired by `memory_consolidate` after a successful consolidation
135    /// write (post-#867 W6 consolidation primitive).
136    pub const MEMORY_CONSOLIDATED: &str = "memory_consolidated";
137
138    /// Fired by the K10 Approval API after a governance `Pending`
139    /// decision queues a pending action. Consumed directly by the
140    /// K10 Approval HTTP+SSE handler.
141    pub const APPROVAL_REQUESTED: &str = "approval_requested";
142}
143
144/// Insert a subscription, hashing any secret before persisting.
145///
146/// Returns the new subscription's id.
147///
148/// P5 (G9): when `event_types` is `Some`, the structured opt-in list is
149/// JSON-encoded into the new `event_types` column AND mirrored into
150/// the legacy comma-separated `events` column so the existing
151/// dispatch matcher continues to work without a second code path. An
152/// unknown event type returns Err — the canonical list lives in
153/// `WEBHOOK_EVENT_TYPES`.
154pub fn insert(conn: &Connection, req: &NewSubscription<'_>) -> Result<String> {
155    validate_url(req.url)?;
156    let id = uuid::Uuid::new_v4().to_string();
157    let secret_hash = req.secret.map(sha256_hex);
158    let now = chrono::Utc::now().to_rfc3339();
159
160    // P5: validate + serialise the structured event-type list.
161    let (events_csv, event_types_json) = if let Some(list) = req.event_types {
162        for ev in list {
163            if !WEBHOOK_EVENT_TYPES.contains(&ev.as_str()) {
164                return Err(anyhow!(
165                    "unknown webhook event type {ev:?}; valid types: {WEBHOOK_EVENT_TYPES:?}"
166                ));
167            }
168        }
169        // Mirror into the legacy events column so dispatch keeps working.
170        let csv = list.join(",");
171        let json = serde_json::to_string(list).context("event_types serialise")?;
172        (csv, Some(json))
173    } else {
174        (req.events.to_string(), None)
175    };
176
177    conn.execute(
178        "INSERT INTO subscriptions (id, url, events, secret_hash, namespace_filter, agent_filter, created_by, created_at, event_types) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
179        params![id, req.url, events_csv, secret_hash, req.namespace_filter, req.agent_filter, req.created_by, now, event_types_json],
180    )?;
181    Ok(id)
182}
183
184/// Delete a subscription by id, optionally scoped to its owner.
185///
186/// Cross-tenant authorization (#870, security-high, 2026-05-18):
187/// When `caller_agent_id` is `Some(aid)`, the DELETE only matches rows
188/// where `created_by = aid` — preventing tenant A from unsubscribing
189/// tenant B's webhook. When `None`, the DELETE matches by id alone
190/// (admin path: federation receive, GC, operator CLI). Callers exposed
191/// to untrusted input (MCP `memory_unsubscribe`, HTTP
192/// `DELETE /api/v1/subscriptions`) MUST pass `Some(<authenticated
193/// caller>)` — anything else is a bypass.
194///
195/// Returns true if a row was removed (i.e. it both existed AND matched
196/// the owner clause when one was supplied).
197pub fn delete(conn: &Connection, id: &str, caller_agent_id: Option<&str>) -> Result<bool> {
198    let n = if let Some(aid) = caller_agent_id {
199        conn.execute(
200            "DELETE FROM subscriptions WHERE id = ?1 AND created_by = ?2",
201            params![id, aid],
202        )?
203    } else {
204        conn.execute("DELETE FROM subscriptions WHERE id = ?1", params![id])?
205    };
206    Ok(n > 0)
207}
208
209/// List active subscriptions, optionally scoped to a single owner.
210///
211/// Cross-tenant authorization (#872, security-high, 2026-05-18):
212/// When `caller_agent_id` is `Some(aid)`, only rows where
213/// `created_by = aid` are returned — preventing tenant A from
214/// enumerating tenant B's webhook fleet. When `None`, every row is
215/// returned (internal use: dispatch fan-out, federation, operator
216/// inventory). Callers exposed to untrusted input (MCP
217/// `memory_list_subscriptions`, HTTP `GET /api/v1/subscriptions`) MUST
218/// pass `Some(<authenticated caller>)`.
219pub fn list(conn: &Connection, caller_agent_id: Option<&str>) -> Result<Vec<Subscription>> {
220    let mut stmt = if caller_agent_id.is_some() {
221        conn.prepare(
222            "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions WHERE created_by = ?1 ORDER BY created_at DESC",
223        )?
224    } else {
225        conn.prepare(
226            "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions ORDER BY created_at DESC",
227        )?
228    };
229    let row_decoder = |row: &rusqlite::Row<'_>| {
230        let event_types_raw: Option<String> = row.get(9)?;
231        // P5: decode the JSON column. A corrupt row should not break
232        // the entire list — fall back to None (= all-events) and warn.
233        let event_types =
234            event_types_raw.and_then(|s| match serde_json::from_str::<Vec<String>>(&s) {
235                Ok(v) => Some(v),
236                Err(e) => {
237                    tracing::warn!(
238                        "subscription event_types JSON decode failed, treating as all-events: {e}"
239                    );
240                    None
241                }
242            });
243        Ok(Subscription {
244            id: row.get(0)?,
245            url: row.get(1)?,
246            events: row.get(2)?,
247            namespace_filter: row.get(3)?,
248            agent_filter: row.get(4)?,
249            created_by: row.get(5)?,
250            created_at: row.get(6)?,
251            dispatch_count: row.get(7)?,
252            failure_count: row.get(8)?,
253            event_types,
254        })
255    };
256    let rows = if let Some(aid) = caller_agent_id {
257        stmt.query_map(params![aid], row_decoder)?
258            .collect::<rusqlite::Result<Vec<_>>>()
259    } else {
260        stmt.query_map([], row_decoder)?
261            .collect::<rusqlite::Result<Vec<_>>>()
262    };
263    rows.context("subscription row decode failed")
264}
265
266/// Look up a subscription's owner (`created_by`) by id.
267///
268/// v0.7.0 #1115 / #1118 (SR-1 #5 / #6, HIGH) — used by the MCP +
269/// HTTP authz gates on `memory_subscription_replay` and
270/// `memory_subscription_dlq_list` to refuse cross-tenant reads of
271/// other agents' subscriptions. Returns `Ok(None)` when the
272/// subscription does not exist; callers map this to the same
273/// not-found envelope as a real miss so the existence of the id is
274/// not leaked.
275///
276/// # Errors
277/// - SQL prepare / query / decode failure.
278pub fn get_owner(conn: &Connection, id: &str) -> Result<Option<String>> {
279    let mut stmt = conn.prepare("SELECT created_by FROM subscriptions WHERE id = ?1")?;
280    let mut rows = stmt.query(params![id])?;
281    if let Some(row) = rows.next().context("subscription owner row")? {
282        let owner: Option<String> = row.get(0)?;
283        Ok(owner)
284    } else {
285        Ok(None)
286    }
287}
288
289/// P5 (G9): list subscriptions matching a specific event type. Returns
290/// rows where either:
291///   - `event_types` is NULL (= all events; backward-compat default), OR
292///   - `event_types` JSON array contains `event_type`.
293///
294/// This is the DB-side variant of the per-event filter; the in-memory
295/// `matches_filters` is the authoritative gate at dispatch time and
296/// honours both the legacy `events` whitelist and the new
297/// `event_types` opt-in list.
298pub fn list_by_event(conn: &Connection, event_type: &str) -> Result<Vec<Subscription>> {
299    // SQLite doesn't have a JSON contains operator portable across all
300    // builds; we filter in Rust after a coarse SQL prefilter that drops
301    // rows whose stored JSON clearly doesn't mention the event. The
302    // text LIKE match is conservative (it can yield false positives the
303    // post-filter then rejects) which keeps the SQL simple while still
304    // letting an idx_subscriptions_event_types-backed scan win on large
305    // tables.
306    let pattern = format!("%{event_type}%");
307    let mut stmt = conn.prepare(
308        "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions WHERE event_types IS NULL OR event_types LIKE ?1 ORDER BY created_at DESC",
309    )?;
310    let rows = stmt.query_map(params![pattern], |row| {
311        let event_types_raw: Option<String> = row.get(9)?;
312        let event_types =
313            event_types_raw.and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok());
314        Ok(Subscription {
315            id: row.get(0)?,
316            url: row.get(1)?,
317            events: row.get(2)?,
318            namespace_filter: row.get(3)?,
319            agent_filter: row.get(4)?,
320            created_by: row.get(5)?,
321            created_at: row.get(6)?,
322            dispatch_count: row.get(7)?,
323            failure_count: row.get(8)?,
324            event_types,
325        })
326    })?;
327    let mut out: Vec<Subscription> = Vec::new();
328    for sub in rows {
329        let s = sub.context("subscription row decode failed")?;
330        match &s.event_types {
331            None => out.push(s),
332            Some(list) if list.iter().any(|e| e == event_type) => out.push(s),
333            Some(_) => {} // structured opt-in present but doesn't include this event
334        }
335    }
336    Ok(out)
337}
338
339/// Test whether a subscription's filters match the given event.
340///
341/// P5 (G9): when `sub_event_types` is `Some(list)` it overrides the
342/// legacy `sub_events` comma-string — the structured opt-in is the
343/// authoritative filter for that subscriber. When `None`, the legacy
344/// whitelist applies (backward compat for pre-P5 subscribers).
345///
346/// #932 (v0.7.0 Track D, 2026-05-20) — bumped from private to
347/// `pub(crate)` so the postgres-aware dispatch helper at
348/// `crate::handlers::subscriptions::dispatch_event_postgres` can
349/// reuse the canonical filter logic instead of forking it.
350pub(crate) fn matches_filters(
351    sub_events: &str,
352    sub_event_types: Option<&[String]>,
353    sub_namespace: Option<&str>,
354    sub_agent: Option<&str>,
355    event: &str,
356    namespace: &str,
357    agent: Option<&str>,
358) -> bool {
359    let event_match = if let Some(list) = sub_event_types {
360        // Structured opt-in: empty list means "no events" (defensive — the
361        // insert path validates non-empty, but defend against hand-crafted
362        // rows).
363        list.iter().any(|e| e == event)
364    } else {
365        // Legacy whitelist (comma-separated or `*`).
366        sub_events == "*"
367            || sub_events
368                .split(',')
369                .map(str::trim)
370                .any(|e| e == event || e == "*")
371    };
372    if !event_match {
373        return false;
374    }
375    if let Some(ns) = sub_namespace
376        && !ns.is_empty()
377        && ns != namespace
378    {
379        return false;
380    }
381    if let Some(filter) = sub_agent
382        && !filter.is_empty()
383        && agent.is_none_or(|a| a != filter)
384    {
385        return false;
386    }
387    true
388}
389
390/// Payload fired to subscribers. Stable JSON shape.
391///
392/// v0.7.0 K6 — every dispatched payload now carries a deterministic
393/// `correlation_id` (UUIDv7 — time-ordered, unique). Receivers ACK
394/// with `{"status":"ack","correlation_id":"..."}`; the dispatcher
395/// retries on no-ACK or non-2xx with the [200ms, 1s, 5s] exponential
396/// backoff ladder and lands the row in `subscription_dlq` after three
397/// failed attempts. The id is generated once per (subscription,
398/// event) pair and persisted into `subscription_events` BEFORE the
399/// network send so replay-from-cursor (K7) sees a stable record.
400#[derive(Serialize)]
401struct DispatchPayload<'a> {
402    event: &'a str,
403    memory_id: &'a str,
404    namespace: &'a str,
405    agent_id: Option<&'a str>,
406    delivered_at: String,
407    /// v0.7.0 K6 — UUIDv7 correlation id. Stable across retries.
408    correlation_id: &'a str,
409    /// P5 (G9): event-specific extra fields. Flattened so the wire shape
410    /// stays a flat object — older subscribers that ignore unknown keys
411    /// keep working. Each new event type uses one of the
412    /// `*EventDetails` structs below.
413    #[serde(flatten, skip_serializing_if = "Option::is_none")]
414    details: Option<serde_json::Value>,
415}
416
417/// v0.7.0 K6 — exponential-backoff retry ladder for failed webhook
418/// deliveries. The dispatcher attempts the initial POST, then up to
419/// three retries spaced [200ms, 1s, 5s] apart. After the final retry
420/// fails the row lands in `subscription_dlq` for K7's inspector tool
421/// to surface. Exposed as a constant so tests can reason about the
422/// total wall-clock budget (≈ 6.2s + per-attempt timeout).
423pub const RETRY_BACKOFFS: &[std::time::Duration] = &[
424    std::time::Duration::from_millis(200),
425    std::time::Duration::from_secs(1),
426    std::time::Duration::from_secs(5),
427];
428
429/// v0.7.0 K6 — per-attempt ACK timeout. Receivers MUST return a JSON
430/// body of the form `{"status":"ack","correlation_id":"..."}` within
431/// this window for the delivery to count as successful. A non-2xx
432/// response, a timeout, or an ACK whose `correlation_id` doesn't
433/// match the dispatched id all count as failure and trigger the next
434/// retry. Exposed so the integration tests can pin the wall-clock
435/// expectations.
436pub const ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
437
438/// PERF-3 (fix campaign 2026-05-26, FX-10) — default upper bound on the
439/// number of webhook deliveries that may be in flight concurrently.
440///
441/// Pre-fix posture: every matching subscriber span on every store event
442/// spawned a fresh `std::thread::spawn` worker that opened its own
443/// SQLite connection. At 1000 subscribers this minted 1000 OS threads
444/// (~1 MB stack each = 1 GB virtual) and 1000 SQLite handles per
445/// store event, bypassing the existing Tokio runtime entirely.
446///
447/// Post-fix posture: each delivery is enqueued on a bounded
448/// [`tokio::sync::Semaphore`] (default permits = this constant) and
449/// the blocking-HTTP + audit-write step runs inside
450/// [`tokio::task::spawn_blocking`] so the dispatcher uses the
451/// existing runtime's blocking-thread pool instead of unbounded
452/// OS-thread spawn.
453///
454/// Operators tune the bound via `AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY`.
455/// Values outside `1..=4096` fall back to the default with a warn-log.
456pub const DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY: usize = 32;
457
458/// PERF-3 (FX-10) — module-level shared semaphore that bounds the
459/// in-flight webhook delivery count across every concurrent
460/// `dispatch_event_to_subs` call. Built lazily on first dispatch from
461/// `AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY` (default
462/// [`DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY`]). Held in `Arc` so worker
463/// tasks can clone-and-move it into their async body.
464static DISPATCH_SEMAPHORE: OnceLock<Arc<Semaphore>> = OnceLock::new();
465
466/// Resolve the bound from env (test seam: tests call
467/// [`override_dispatch_concurrency_for_tests`] before any dispatch
468/// runs to pin the bound to a known value).
469fn dispatch_concurrency_bound() -> usize {
470    if let Some(forced) = TEST_DISPATCH_CONCURRENCY_OVERRIDE.get() {
471        return *forced;
472    }
473    match std::env::var("AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY") {
474        Ok(raw) => match raw.parse::<usize>() {
475            Ok(n) if (1..=4096).contains(&n) => n,
476            _ => {
477                tracing::warn!(
478                    "AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY={raw:?} not in 1..=4096; \
479                     falling back to default {DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY}"
480                );
481                DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY
482            }
483        },
484        Err(_) => DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY,
485    }
486}
487
488/// Return a clone of the shared dispatch semaphore, building it on
489/// first call from the resolved concurrency bound.
490fn dispatch_semaphore() -> Arc<Semaphore> {
491    DISPATCH_SEMAPHORE
492        .get_or_init(|| Arc::new(Semaphore::new(dispatch_concurrency_bound())))
493        .clone()
494}
495
496/// Test-only override for the dispatch concurrency bound. Tests set
497/// this BEFORE the first dispatch runs in the process so the
498/// semaphore is sized to the assertion's expectations. Subsequent
499/// changes after the semaphore is built are ignored (the bound is
500/// per-process).
501static TEST_DISPATCH_CONCURRENCY_OVERRIDE: OnceLock<usize> = OnceLock::new();
502
503/// Test-only seam: pin the dispatch concurrency bound to `n`. Must be
504/// called before any `dispatch_event*` runs in the current process.
505/// Returns `Err(_)` if the bound has already been set (in which case
506/// the prior value remains authoritative).
507#[doc(hidden)]
508pub fn override_dispatch_concurrency_for_tests(n: usize) -> Result<(), usize> {
509    TEST_DISPATCH_CONCURRENCY_OVERRIDE.set(n)
510}
511
512/// PERF-3 (FX-10) — diagnostic accessor returning the number of
513/// permits currently AVAILABLE on the shared dispatch semaphore. Used
514/// by the regression test in
515/// `tests/subscriptions_no_thread_spawn_per_subscriber.rs` to assert
516/// the in-flight ceiling holds even when 50 subscribers fan out
517/// concurrently.
518#[doc(hidden)]
519pub fn dispatch_semaphore_available_permits() -> usize {
520    dispatch_semaphore().available_permits()
521}
522
523// v0.7.0 #1073 — `dispatch_http_client()` scaffolding REMOVED
524//
525// v0.7.x (issue #1174 follow-up #1196) — the dead-code shared-client
526// accessor + its `OnceLock<Option<reqwest::blocking::Client>>` static
527// were removed. Sub-agent code review (#1196 PR8) found zero
528// production callers — only the `tests/sr3_perf_regressions.rs`
529// scaffolding-pin test kept the symbol alive. The SR-2 #1082 SSRF
530// hardening (per-call `builder.resolve(host, addr)` DNS pinning)
531// keeps the per-call builder path live in `send()`; the future
532// custom `reqwest::dns::Resolve` refactor will re-introduce a shared
533// client at that point through `RuntimeContext`, not as a private
534// module-level OnceLock. Removed alongside the test pin in the same
535// commit so the scaffolding doesn't drift back via the existence
536// assertion.
537
538/// One row of the `subscription_events` per-delivery audit log. K6
539/// writes one row before each network send; K7's
540/// `memory_subscription_replay` tool reads the rows back ordered by
541/// `delivered_at` for replay-from-cursor support.
542#[derive(Debug, Clone, Serialize, Deserialize)]
543pub struct SubscriptionEvent {
544    pub id: i64,
545    pub subscription_id: String,
546    pub correlation_id: String,
547    pub event_type: String,
548    pub payload: String,
549    pub delivered_at: String,
550    pub delivery_status: String,
551}
552
553/// One row of the `subscription_dlq` table. Created when a delivery
554/// exhausts the [200ms, 1s, 5s] retry ladder. K7's inspector tool
555/// surfaces these rows; K6 only ships the writer.
556#[derive(Debug, Clone, Serialize, Deserialize)]
557pub struct DlqEntry {
558    pub id: i64,
559    pub subscription_id: String,
560    pub correlation_id: String,
561    pub event_type: String,
562    pub payload: String,
563    pub retry_count: i64,
564    pub last_error: String,
565    pub first_failed_at: String,
566    pub last_failed_at: String,
567}
568
569// ---------------------------------------------------------------------
570// P5 (G9) — event payload structs for the four new lifecycle events.
571//
572// Each struct is the `details` block flattened into `DispatchPayload`
573// for its event type. They are intentionally small and JSON-stable —
574// the same shape ships on both the MCP and HTTP webhook surfaces.
575// Adding a new field is backward-compatible (subscribers ignore
576// unknowns); renaming or removing a field is breaking — bump the
577// payload schema version per AI_DEVELOPER_GOVERNANCE.md.
578// ---------------------------------------------------------------------
579
580/// `memory_promote` event — fires after a tier or vertical promotion
581/// commits. `to_namespace` is `Some` for vertical (`memory_promote`
582/// with a `to_namespace` argument); for the default tier promotion it
583/// is `None` and `tier` is set to the new tier (`"long"`).
584#[derive(Debug, Clone, Serialize, Deserialize)]
585pub struct PromoteEventDetails {
586    /// `"vertical"` for namespace promote-clone, `"tier"` for the
587    /// default tier upgrade.
588    pub mode: String,
589    /// New tier after promotion (always `"long"` for `mode = "tier"`).
590    #[serde(default, skip_serializing_if = "Option::is_none")]
591    pub tier: Option<String>,
592    /// Target namespace (vertical promote only).
593    #[serde(default, skip_serializing_if = "Option::is_none")]
594    pub to_namespace: Option<String>,
595    /// Clone id (vertical promote only); the `memory_id` field on the
596    /// outer payload carries the source memory id in vertical mode.
597    #[serde(default, skip_serializing_if = "Option::is_none")]
598    pub clone_id: Option<String>,
599}
600
601/// `memory_delete` event — fires after the row is removed from
602/// `memories`. `title` and `tier` come from the pre-delete snapshot so
603/// subscribers can write meaningful audit entries without a
604/// roundtrip.
605#[derive(Debug, Clone, Serialize, Deserialize)]
606pub struct DeleteEventDetails {
607    pub title: String,
608    pub tier: String,
609}
610
611/// `memory_link_created` event — fires after `db::create_link`
612/// commits. The outer `memory_id` carries the source id (the
613/// link-author side); `target_id` is the destination of the directed
614/// link.
615#[derive(Debug, Clone, Serialize, Deserialize)]
616pub struct LinkCreatedEventDetails {
617    pub target_id: String,
618    pub relation: String,
619}
620
621/// `memory_consolidated` event — fires after `db::consolidate`
622/// commits. The outer `memory_id` carries the new consolidated
623/// memory's id; `source_ids` is the array of memories that were
624/// merged (and deleted by the consolidate op).
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct ConsolidatedEventDetails {
627    pub source_ids: Vec<String>,
628    pub source_count: usize,
629}
630
631/// v0.7.0 K4 — `approval_requested` event details.
632///
633/// Fires after a `pending_actions` row has been inserted by the
634/// governance gate (`db::queue_pending_action` from the local store /
635/// promote / delete enforce paths, or `db::upsert_pending_action` from
636/// peer-originated `sync_push` traffic). The K10 Approval API
637/// (HTTP+SSE) consumes the same dispatcher path so the surface is
638/// consistent across delivery transports.
639///
640/// The outer `memory_id` field on [`DispatchPayload`] carries the
641/// pending-action **id** (the row PK of `pending_actions`) — that's the
642/// only identifier subscribers need to round-trip back through
643/// `memory_pending_*` MCP tools or the v0.7 Approval HTTP endpoints.
644/// The `agent_id` field carries the row's `requested_by`.
645///
646/// Adding a new field here is backward-compatible (subscribers ignore
647/// unknowns); renaming or removing a field is breaking — bump the
648/// payload schema version per AI_DEVELOPER_GOVERNANCE.md.
649#[derive(Debug, Clone, Serialize, Deserialize)]
650pub struct ApprovalRequestedEventDetails {
651    /// Discriminator from `pending_actions.action_type` — `"store"`,
652    /// `"delete"`, or `"promote"` (the three [`crate::models::GovernedAction`]
653    /// variants today). Reserved for forward-compat with future gated
654    /// actions; subscribers should treat unknown values as opaque.
655    pub action_type: String,
656    /// `pending_actions.requested_at` (RFC3339). Mirrored into the
657    /// details block so SSE consumers downstream of K10 can render
658    /// the queue-time without a second round-trip.
659    pub requested_at: String,
660    /// `pending_actions.memory_id` — `Some` for delete / promote
661    /// (existing row), `None` for store (no row yet).
662    #[serde(default, skip_serializing_if = "Option::is_none")]
663    pub memory_id: Option<String>,
664    /// Always `"pending"` at insert time. Decided rows do NOT re-fire
665    /// this event — the decision flows through the planned
666    /// `approval_decided` event in K7.
667    pub status: String,
668}
669
670/// `memory_link_invalidated` event (v0.7 J4 / G14) — fires after a
671/// successful `memory_kg_invalidate` writes `valid_until` on the
672/// `(source_id, target_id, relation)` link. The outer `memory_id`
673/// carries the source id (the link-author side); `target_id`,
674/// `relation`, and the freshly-written `valid_until` describe the
675/// supersession edge so consumers can replay the invalidation log
676/// without re-reading `memory_links`. `previous_valid_until`
677/// distinguishes the first supersession (`None`) from an idempotent
678/// retry (carries the prior stamp).
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct LinkInvalidatedEventDetails {
681    pub target_id: String,
682    pub relation: String,
683    pub valid_until: String,
684    #[serde(default, skip_serializing_if = "Option::is_none")]
685    pub previous_valid_until: Option<String>,
686}
687
688/// Fire an event to all matching subscribers. Each dispatch is
689/// fire-and-forget — the caller is NOT blocked on delivery. Errors are
690/// logged and counted in the DB via `failure_count`.
691///
692/// PERF-3 (FX-10, 2026-05-26): pre-fix posture was
693/// `std::thread::spawn` + per-worker `Connection::open(...)` per
694/// matching subscriber — 1000 subscribers minted 1000 OS threads + 1000
695/// SQLite handles per store event. Post-fix posture uses a bounded
696/// [`tokio::sync::Semaphore`] (default 32 permits, tunable via
697/// `AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY`) + `tokio::task::spawn_blocking`
698/// against the existing daemon Tokio runtime so the in-flight delivery
699/// count is capped. The pre-fix `std::thread::spawn` path is preserved
700/// as a fallback for legacy unit tests that run outside any runtime.
701///
702/// Caller owns the connection. Dispatch workers still re-open the
703/// connection per delivery to update counters (cheap — `SQLite`
704/// connections are process-shared via WAL — and the new concurrency
705/// bound keeps the simultaneous handle count bounded too).
706///
707/// P5 (G9): convenience wrapper for the historical no-details case
708/// (used by `memory_store`). New event types should call
709/// `dispatch_event_with_details` and pass the matching
710/// `*EventDetails` struct serialised to JSON.
711pub fn dispatch_event(
712    conn: &Connection,
713    event: &str,
714    memory_id: &str,
715    namespace: &str,
716    agent_id: Option<&str>,
717    db_path: &std::path::Path,
718) {
719    dispatch_event_with_details(conn, event, memory_id, namespace, agent_id, db_path, None);
720}
721
722/// P5 (G9): full lifecycle dispatch with optional event-specific
723/// details. The details JSON is FLATTENED into the dispatch payload —
724/// keys must not collide with the outer envelope (`event`,
725/// `memory_id`, `namespace`, `agent_id`, `delivered_at`). The four
726/// new event types (`memory_promote`, `memory_delete`,
727/// `memory_link_created`, `memory_consolidated`) supply their
728/// `*EventDetails` struct serialised via `serde_json::to_value`.
729pub fn dispatch_event_with_details(
730    conn: &Connection,
731    event: &str,
732    memory_id: &str,
733    namespace: &str,
734    agent_id: Option<&str>,
735    db_path: &std::path::Path,
736    details: Option<serde_json::Value>,
737) {
738    // Dispatch path needs the global view (every tenant's subscriptions
739    // for this event), so `None` here is correct — ownership scoping
740    // would silently drop matching subscribers belonging to OTHER
741    // tenants. The cross-tenant authorization gate lives at the wire
742    // surface (MCP/HTTP handlers), not here. See #870/#872/#874.
743    //
744    // v0.7.0 #1097 — pre-filter by event type at the SQL level using
745    // `list_by_event` instead of the legacy `list(conn, None)` full
746    // table scan. The Rust-side `matches_filters` below stays as the
747    // authoritative gate (it covers the namespace/agent filters which
748    // the SQL prefilter doesn't honour); only the coarse event-type
749    // filter moves into SQL. At 100 subscribers the write-event
750    // fanout cost drops from O(N) to whatever fraction actually
751    // subscribes to the fired event type.
752    let subs = match list_by_event(conn, event) {
753        Ok(s) => s,
754        Err(e) => {
755            tracing::warn!("subscription list failed during dispatch: {e}");
756            return;
757        }
758    };
759    // Resolve each matching sub's secret_hash from sqlite BEFORE
760    // dispatching to the per-sub worker pool. #932 (v0.7.0 Track D,
761    // 2026-05-20) extracted this into `dispatch_event_to_subs` so the
762    // postgres-backed `dispatch_event_postgres` path can resolve
763    // secret_hash from the subscription memory's metadata instead.
764    //
765    // v0.7.0 #1072 — secret_hash now reuses the dispatch caller's
766    // `&Connection` instead of opening a fresh `Connection::open(...)`
767    // per matching subscription. Saves one connection open per
768    // subscriber in the fanout.
769    let matching: Vec<(Subscription, Option<String>)> = subs
770        .into_iter()
771        .filter(|s| {
772            matches_filters(
773                &s.events,
774                s.event_types.as_deref(),
775                s.namespace_filter.as_deref(),
776                s.agent_filter.as_deref(),
777                event,
778                namespace,
779                agent_id,
780            )
781        })
782        .map(|s| {
783            let secret_hash = load_secret_hash_with_conn(conn, &s.id).unwrap_or(None);
784            (s, secret_hash)
785        })
786        .collect();
787    dispatch_event_to_subs(
788        matching, event, memory_id, namespace, agent_id, db_path, details,
789    );
790}
791
792/// #932 (v0.7.0 Track D, 2026-05-20) — backend-neutral per-sub
793/// dispatch worker pool. Takes an already-resolved
794/// `Vec<(Subscription, Option<secret_hash>)>` so the sqlite path
795/// (via `dispatch_event_with_details`) and the postgres path (via
796/// `dispatch_event_postgres`) share the same delivery + audit +
797/// DLQ + HMAC code while their subscription-source lookups remain
798/// adapter-specific.
799///
800/// The `db_path` argument is still the SQLite audit-mirror path —
801/// `record_subscription_event` / `record_dispatch` / `record_dlq`
802/// still write to sqlite even on postgres-backed daemons because
803/// every daemon currently keeps a sqlite scratch DB alongside the
804/// SAL store handle. A future SAL audit-log surface (#-tracking)
805/// will route this through the trait too.
806pub fn dispatch_event_to_subs(
807    matching: Vec<(Subscription, Option<String>)>,
808    event: &str,
809    memory_id: &str,
810    namespace: &str,
811    agent_id: Option<&str>,
812    db_path: &std::path::Path,
813    details: Option<serde_json::Value>,
814) {
815    if matching.is_empty() {
816        return;
817    }
818    // PERF — warm the reqwest::blocking TLS connector once per process,
819    // off the delivery critical path, so the first webhook does not pay
820    // the cold root-cert init inside its ACK_TIMEOUT retry budget. See
821    // `prewarm_dispatch_tls`.
822    {
823        static KICK: std::sync::Once = std::sync::Once::new();
824        KICK.call_once(|| {
825            std::thread::spawn(prewarm_dispatch_tls);
826        });
827    }
828    // Timestamp is part of the canonical string the signature is
829    // computed over. Receivers SHOULD reject requests whose timestamp
830    // differs from their clock by more than 5 minutes (replay window).
831    // (#301 item 1 — prior implementation had no replay protection.)
832    let timestamp = chrono::Utc::now().timestamp().to_string();
833
834    // PERF-3 (FX-10, 2026-05-26) — bridge the dispatch fan-out onto
835    // the existing Tokio runtime. We grab the current Handle once per
836    // batch; the per-subscriber worker is enqueued as
837    // `handle.spawn(async { semaphore.acquire().await;
838    // spawn_blocking(|| { … reqwest::blocking::send + audit
839    // writes … }).await })`. The semaphore caps in-flight deliveries
840    // at [`DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY`] (operator-tunable
841    // via `AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY`) so 1000 matching
842    // subscribers no longer mint 1000 OS threads + 1000 SQLite
843    // handles.
844    //
845    // Fallback: when no Tokio runtime is attached to the calling
846    // thread (e.g. legacy CLI tests that call `dispatch_event`
847    // outside a runtime), fall back to the pre-fix
848    // `std::thread::spawn` path. Production daemons (CLI/MCP/HTTP)
849    // ALL run under `#[tokio::main]`, so this fallback is the cold
850    // path.
851    let handle = tokio::runtime::Handle::try_current().ok();
852    for (sub, sub_secret_hash) in matching {
853        // v0.7.0 K6 — UUIDv7 correlation id is generated per
854        // (subscription, event) pair so receivers can correlate ACKs
855        // back to the dispatched payload across the retry ladder.
856        // Generated here (not inside the worker thread) so the
857        // ordering invariant — correlation_ids monotonic in
858        // dispatch-loop order — holds even when worker threads race.
859        let correlation_id = uuid::Uuid::now_v7().to_string();
860        let payload = DispatchPayload {
861            event,
862            memory_id,
863            namespace,
864            agent_id,
865            delivered_at: chrono::Utc::now().to_rfc3339(),
866            correlation_id: &correlation_id,
867            details: details.clone(),
868        };
869        let body = match serde_json::to_string(&payload) {
870            Ok(s) => s,
871            Err(e) => {
872                tracing::warn!("dispatch payload serialize failed: {e}");
873                continue;
874            }
875        };
876        let url = sub.url.clone();
877        let sub_id = sub.id.clone();
878        let event_owned = event.to_string();
879        let ts = timestamp.clone();
880        let db_path = db_path.to_path_buf();
881        let secret_hash_owned = sub_secret_hash.clone();
882
883        // PERF-3 (FX-10) — the entire per-subscriber delivery body
884        // is captured by a `FnOnce()` closure so it can run either
885        // on the Tokio blocking pool (production path) or on a
886        // freshly-spawned `std::thread` (no-runtime fallback). The
887        // body is otherwise unchanged from the pre-fix code.
888        let work = move || {
889            // v0.7.0 #1072 — open ONE sqlite connection for the
890            // whole delivery instead of 4-5 across the
891            // event-audit / status-update / dispatch-counter / DLQ
892            // sites. Saves the per-connection PRAGMA + WAL setup
893            // cost (~2-5 ms each) on every dispatched event. The
894            // single connection lives on the worker thread stack
895            // for the lifetime of the delivery and is dropped on
896            // thread exit. Same audit posture as the pre-#1072
897            // path — every write site is best-effort and logs on
898            // failure.
899            let worker_conn = match Connection::open(&db_path) {
900                Ok(c) => Some(c),
901                Err(e) => {
902                    tracing::warn!(
903                        "subscription dispatch: worker_conn open failed: {e}; \
904                         falling back to per-write connections"
905                    );
906                    None
907                }
908            };
909            // Persist the per-delivery audit row BEFORE the network
910            // send so replay-from-cursor (K7) sees a stable record
911            // even if the dispatcher process crashes mid-retry.
912            let event_audit_result = if let Some(c) = worker_conn.as_ref() {
913                record_subscription_event_with_conn(
914                    c,
915                    &sub_id,
916                    &correlation_id,
917                    &event_owned,
918                    &body,
919                )
920            } else {
921                record_subscription_event(&db_path, &sub_id, &correlation_id, &event_owned, &body)
922            };
923            if let Err(e) = event_audit_result {
924                tracing::warn!("subscription event audit write failed: {e}");
925            }
926            let secret_hash = secret_hash_owned;
927            // Canonical string: "<timestamp>.<body>". Keyed HMAC over
928            // the DB-stored secret hash. Receivers verify by computing
929            // SHA256(plaintext_secret) and then
930            // HMAC-SHA256(key, "<timestamp>.<body>").
931            //
932            // v0.7.0 K7 — when no per-subscription secret is set, fall
933            // back to the process-wide `[hooks.subscription] hmac_secret`
934            // override so operators can sign EVERY outgoing payload
935            // without round-tripping each receiver through `memory_subscribe`.
936            // The plaintext server-wide secret is itself SHA-256-hashed
937            // first so the keying material on the wire matches the
938            // per-subscription path (receivers compute the same
939            // `SHA256(plaintext_secret)` regardless of which path
940            // configured it).
941            let canonical = format!("{ts}.{body}");
942            let signature = match secret_hash.as_deref() {
943                Some(h) => Some(hmac_sha256_hex(h, &canonical)),
944                None => crate::config::active_hooks_hmac_secret().map(|plain| {
945                    let key_hash = sha256_hex(&plain);
946                    hmac_sha256_hex(&key_hash, &canonical)
947                }),
948            };
949            // R3-S1.HMAC (v0.7.0 fix campaign 2026-05-13): refuse to
950            // dispatch an unsigned payload. New subscriptions cannot
951            // register without a per-sub or server-wide secret (see
952            // `crate::handlers::subscribe` + MCP `handle_subscribe`), so
953            // hitting this branch means a legacy row was persisted before
954            // the gate landed, or the server-wide override was removed
955            // after registration. Either way, fail loudly to the DLQ
956            // instead of dispatching the body in clear so a receiver
957            // never has to guess whether a body is authentic.
958            if signature.is_none() {
959                tracing::error!(
960                    "subscription {sub_id} dispatch refused: no per-sub secret AND no \
961                     server-wide [hooks.subscription] hmac_secret configured. \
962                     Configure one of the two and replay via memory_subscription_replay. \
963                     (v0.7.0 fix campaign R3-S1.HMAC, 2026-05-13)"
964                );
965                let outcome = DeliveryOutcome::unsigned_refused();
966                let ok = outcome.success;
967                // v0.7.0 #1072 — reuse `worker_conn` for every sqlite
968                // write below.
969                if let Some(c) = worker_conn.as_ref() {
970                    record_dispatch_with_conn(c, &sub_id, ok);
971                    update_event_status_with_conn(c, &correlation_id, ok);
972                } else {
973                    record_dispatch(&db_path, &sub_id, ok);
974                    update_event_status(&db_path, &correlation_id, ok);
975                }
976                let dlq_result = if let Some(c) = worker_conn.as_ref() {
977                    record_dlq_with_conn(
978                        c,
979                        &sub_id,
980                        &correlation_id,
981                        &event_owned,
982                        &body,
983                        outcome.attempts,
984                        &outcome.last_error,
985                        &outcome.first_failed_at,
986                        &outcome.last_failed_at,
987                    )
988                } else {
989                    record_dlq(
990                        &db_path,
991                        &sub_id,
992                        &correlation_id,
993                        &event_owned,
994                        &body,
995                        outcome.attempts,
996                        &outcome.last_error,
997                        &outcome.first_failed_at,
998                        &outcome.last_failed_at,
999                    )
1000                };
1001                if let Err(e) = dlq_result {
1002                    tracing::warn!("subscription DLQ write failed: {e}");
1003                }
1004                return;
1005            }
1006            let outcome =
1007                deliver_with_retry(&url, &body, &ts, signature.as_deref(), &correlation_id);
1008            let ok = outcome.success;
1009            // v0.7.0 #1072 — reuse `worker_conn` for every sqlite write
1010            // below.
1011            if let Some(c) = worker_conn.as_ref() {
1012                record_dispatch_with_conn(c, &sub_id, ok);
1013                update_event_status_with_conn(c, &correlation_id, ok);
1014            } else {
1015                record_dispatch(&db_path, &sub_id, ok);
1016                update_event_status(&db_path, &correlation_id, ok);
1017            }
1018            if !ok {
1019                let dlq_result = if let Some(c) = worker_conn.as_ref() {
1020                    record_dlq_with_conn(
1021                        c,
1022                        &sub_id,
1023                        &correlation_id,
1024                        &event_owned,
1025                        &body,
1026                        outcome.attempts,
1027                        &outcome.last_error,
1028                        &outcome.first_failed_at,
1029                        &outcome.last_failed_at,
1030                    )
1031                } else {
1032                    record_dlq(
1033                        &db_path,
1034                        &sub_id,
1035                        &correlation_id,
1036                        &event_owned,
1037                        &body,
1038                        outcome.attempts,
1039                        &outcome.last_error,
1040                        &outcome.first_failed_at,
1041                        &outcome.last_failed_at,
1042                    )
1043                };
1044                if let Err(e) = dlq_result {
1045                    tracing::warn!("subscription DLQ write failed: {e}");
1046                }
1047            }
1048        };
1049
1050        // PERF-3 (FX-10) — production path: bounded semaphore + Tokio
1051        // blocking pool. The semaphore caps concurrent in-flight
1052        // deliveries at the operator-tunable bound; permits are
1053        // released when the `_permit` guard drops at the end of the
1054        // worker. Cold path (no runtime attached): legacy
1055        // `std::thread::spawn`. Tests use the production path because
1056        // `#[tokio::test]` attaches a runtime to the calling thread.
1057        if let Some(rt) = handle.as_ref() {
1058            let permit_sem = dispatch_semaphore();
1059            rt.spawn(async move {
1060                // Acquire-then-blocking-step. `acquire_owned` returns
1061                // an `OwnedSemaphorePermit` that gets moved into the
1062                // blocking task and is dropped when the worker
1063                // returns — releasing the slot for the next pending
1064                // dispatch.
1065                let permit = match permit_sem.acquire_owned().await {
1066                    Ok(p) => p,
1067                    Err(e) => {
1068                        tracing::warn!(
1069                            "subscription dispatch: semaphore acquire failed: {e}; \
1070                             dropping delivery (semaphore closed)"
1071                        );
1072                        return;
1073                    }
1074                };
1075                // The reqwest::blocking + rusqlite work is sync; run
1076                // it on the Tokio blocking pool so we don't pin a
1077                // runtime worker thread on the LLM-slow HTTP send.
1078                if let Err(e) = tokio::task::spawn_blocking(move || {
1079                    work();
1080                    drop(permit);
1081                })
1082                .await
1083                {
1084                    tracing::warn!("subscription dispatch: spawn_blocking join failed: {e}");
1085                }
1086            });
1087        } else {
1088            // Fallback: no runtime in scope. This path is exercised
1089            // only by legacy unit tests that call `dispatch_event`
1090            // outside `#[tokio::test]`; production daemons always
1091            // run under `#[tokio::main]`.
1092            std::thread::spawn(work);
1093        }
1094    }
1095}
1096
1097/// v0.7.0 K4 — dispatch the `approval_requested` lifecycle event for a
1098/// freshly-inserted `pending_actions` row.
1099///
1100/// Thin convenience wrapper around [`dispatch_event_with_details`]:
1101///   - Resolves the canonical row via [`crate::db::get_pending_action`]
1102///     so the payload reflects what was actually committed (not what
1103///     the caller *intended* to commit).
1104///   - Synthesises an [`ApprovalRequestedEventDetails`] block from the
1105///     row.
1106///   - Routes the event through the existing subscription dispatch
1107///     path so opt-in subscribers (`event_types: ["approval_requested"]`)
1108///     and legacy wildcard subscribers both receive it.
1109///
1110/// Best-effort and fire-and-forget — same posture as the K2
1111/// `pending_action_expired` dispatch in
1112/// [`crate::daemon_runtime::spawn_pending_timeout_sweep_loop`]. A
1113/// dispatch failure must NOT roll back the pending-action row.
1114///
1115/// Caller passes the `pending_id` returned from
1116/// [`crate::db::queue_pending_action`] / [`crate::db::upsert_pending_action`].
1117/// A missing or unreadable row is logged and otherwise treated as a
1118/// no-op (lost-event semantics, never block the write path).
1119pub fn dispatch_approval_requested(conn: &Connection, pending_id: &str, db_path: &std::path::Path) {
1120    let pa = match crate::db::get_pending_action(conn, pending_id) {
1121        Ok(Some(pa)) => pa,
1122        Ok(None) => {
1123            tracing::warn!(
1124                "approval_requested dispatch skipped: pending_action {pending_id} not found"
1125            );
1126            return;
1127        }
1128        Err(e) => {
1129            tracing::warn!(
1130                "approval_requested dispatch skipped: pending_action {pending_id} read failed: {e}"
1131            );
1132            return;
1133        }
1134    };
1135    let details = ApprovalRequestedEventDetails {
1136        action_type: pa.action_type.clone(),
1137        requested_at: pa.requested_at.clone(),
1138        memory_id: pa.memory_id.clone(),
1139        status: pa.status.clone(),
1140    };
1141    let details_value = match serde_json::to_value(&details) {
1142        Ok(v) => Some(v),
1143        Err(e) => {
1144            tracing::warn!("approval_requested dispatch details serialise failed: {e}");
1145            None
1146        }
1147    };
1148    // v0.7.0 K10 — publish on the in-process approval bus so HTTP SSE
1149    // subscribers see the new pending row in real time. Best-effort
1150    // (no receivers → swallowed); never blocks the gate path.
1151    crate::approvals::publish(crate::approvals::ApprovalEvent::ApprovalRequested {
1152        pending_id: pa.id.clone(),
1153        action_type: pa.action_type.clone(),
1154        namespace: pa.namespace.clone(),
1155        requested_by: pa.requested_by.clone(),
1156        requested_at: pa.requested_at.clone(),
1157    });
1158    dispatch_event_with_details(
1159        conn,
1160        webhook_events::APPROVAL_REQUESTED,
1161        &pa.id,
1162        &pa.namespace,
1163        Some(&pa.requested_by),
1164        db_path,
1165        details_value,
1166    );
1167}
1168
1169/// v0.7.0 K6 — outcome of a single attempt or full retry ladder.
1170///
1171/// `success` is true once the receiver has returned 2xx AND a JSON
1172/// body of the form `{"status":"ack","correlation_id":"<id>"}` whose
1173/// id matches the one we dispatched. `attempts` is the number of
1174/// network requests issued (1..=4 — initial + 3 retries). `last_error`
1175/// is the short error string from the last failed attempt (empty on
1176/// success). The `first_failed_at` / `last_failed_at` pair brackets
1177/// the retry window for DLQ analytics.
1178struct DeliveryOutcome {
1179    success: bool,
1180    attempts: i64,
1181    last_error: String,
1182    first_failed_at: String,
1183    last_failed_at: String,
1184}
1185
1186impl DeliveryOutcome {
1187    /// R3-S1.HMAC (v0.7.0 fix campaign 2026-05-13): synthesise a failure
1188    /// outcome for a dispatch refused at the gate because neither a
1189    /// per-sub secret nor a server-wide override was configured. The
1190    /// DLQ row carries an explicit `last_error` so operators can tell a
1191    /// missing-secret refusal apart from a transport failure.
1192    fn unsigned_refused() -> Self {
1193        let now = chrono::Utc::now().to_rfc3339();
1194        Self {
1195            success: false,
1196            attempts: 0,
1197            last_error: "dispatch refused: no per-subscription secret AND no server-wide \
1198                 [hooks.subscription] hmac_secret configured (v0.7.0 R3-S1.HMAC)"
1199                .to_string(),
1200            first_failed_at: now.clone(),
1201            last_failed_at: now,
1202        }
1203    }
1204}
1205
1206/// v0.7.0 K6 — dispatcher driver. Issues the initial POST plus up to
1207/// three retries spaced [200ms, 1s, 5s] apart. Each attempt validates
1208/// the receiver's ACK body — a 2xx response with no ACK or a
1209/// mismatched correlation_id counts as failure and triggers the next
1210/// retry. Returns the cumulative [`DeliveryOutcome`].
1211/// One-time process-global warm-up of the `reqwest::blocking` TLS
1212/// connector used by webhook delivery.
1213///
1214/// The first `reqwest::blocking::Client` constructed in a process
1215/// builds the TLS connector, which loads the system root-certificate
1216/// store. On some platforms (notably macOS, where the load goes
1217/// through Security.framework) that first load can take several
1218/// seconds and is process-cached thereafter. Because webhook delivery
1219/// rebuilds a fresh client per attempt (the #1082 per-call
1220/// `.resolve()` DNS pin precludes a fully process-shared client), a
1221/// cold first delivery would otherwise pay that init on attempt 1 — and
1222/// when it exceeds [`ACK_TIMEOUT`] the K6 retry budget is consumed by
1223/// the cold init rather than by real delivery failures, delaying (or,
1224/// under a short receiver poll window, failing) the first webhook of a
1225/// process by tens of seconds.
1226///
1227/// Calling this is idempotent via an internal [`std::sync::Once`].
1228/// [`dispatch_event_to_subs`] kicks it on a background thread the first
1229/// time it has work to deliver, so a long-lived daemon warms its
1230/// connector ahead of real load. Integration tests that assert delivery
1231/// latency call it synchronously during setup so the one-time cost
1232/// lands before the timed assertion window.
1233pub fn prewarm_dispatch_tls() {
1234    static WARM: std::sync::Once = std::sync::Once::new();
1235    WARM.call_once(|| {
1236        // Build on a dedicated OS thread, then join. `reqwest::blocking`'s
1237        // client constructor blocks waiting for its internal runtime
1238        // thread to spin up, which panics if invoked directly on a thread
1239        // that is currently driving a Tokio runtime (an async test, or a
1240        // daemon runtime worker). A freshly-spawned `std::thread` has no
1241        // ambient runtime, so blocking there is always allowed. Joining
1242        // makes the warm-up synchronous for the caller (tests rely on the
1243        // connector being warm once this returns).
1244        let _ = std::thread::spawn(|| {
1245            if let Err(e) = reqwest::blocking::Client::builder()
1246                .timeout(ACK_TIMEOUT)
1247                .build()
1248            {
1249                tracing::warn!("webhook dispatch TLS warm-up failed: {e}");
1250            }
1251        })
1252        .join();
1253    });
1254}
1255
1256fn deliver_with_retry(
1257    url: &str,
1258    body: &str,
1259    timestamp: &str,
1260    signature: Option<&str>,
1261    correlation_id: &str,
1262) -> DeliveryOutcome {
1263    let mut attempts: i64 = 0;
1264    let mut first_failed_at = String::new();
1265    let mut last_failed_at = String::new();
1266    let mut last_error = String::new();
1267    // Total attempts = 1 (initial) + RETRY_BACKOFFS.len() (retries).
1268    for attempt_idx in 0..=RETRY_BACKOFFS.len() {
1269        if attempt_idx > 0 {
1270            std::thread::sleep(RETRY_BACKOFFS[attempt_idx - 1]);
1271        }
1272        attempts += 1;
1273        match send(url, body, timestamp, signature, correlation_id) {
1274            Ok(()) => {
1275                return DeliveryOutcome {
1276                    success: true,
1277                    attempts,
1278                    last_error: String::new(),
1279                    first_failed_at,
1280                    last_failed_at,
1281                };
1282            }
1283            Err(e) => {
1284                let now = chrono::Utc::now().to_rfc3339();
1285                if first_failed_at.is_empty() {
1286                    first_failed_at = now.clone();
1287                }
1288                last_failed_at = now;
1289                last_error = e;
1290            }
1291        }
1292    }
1293    DeliveryOutcome {
1294        success: false,
1295        attempts,
1296        last_error,
1297        first_failed_at,
1298        last_failed_at,
1299    }
1300}
1301
1302/// Perform one HTTP POST with SSRF-hardened URL check + signature
1303/// + timestamp headers.
1304///
1305/// v0.7.0 K6 — return Ok(()) only when the receiver returns 2xx AND
1306/// a JSON ACK body (`{"status":"ack","correlation_id":"..."}`) whose
1307/// `correlation_id` matches the dispatched id within
1308/// [`ACK_TIMEOUT`]. Anything else (network error, non-2xx, ACK
1309/// timeout, mismatched correlation id) returns Err with a short
1310/// reason string the retry driver records.
1311fn send(
1312    url: &str,
1313    body: &str,
1314    timestamp: &str,
1315    signature: Option<&str>,
1316    correlation_id: &str,
1317) -> Result<(), String> {
1318    if let Err(e) = validate_url(url) {
1319        tracing::warn!("SSRF guard rejected webhook URL {url}: {e}");
1320        return Err(format!("ssrf-rejected: {e}"));
1321    }
1322    // v0.7.0 #1082 (SR-1 #2, HIGH) — DNS-rebind TOCTOU fix. Resolve
1323    // the host once via the SSRF guard AND capture the validated
1324    // addresses, then bind reqwest's resolver to exactly those
1325    // addresses via `Client::builder().resolve(host, addr)`. Pre-
1326    // #1082 reqwest did its own independent DNS query at send time,
1327    // letting an attacker-controlled DNS server (TTL=0 / split-
1328    // horizon) return a public IP on the daemon's first lookup
1329    // (passing the guard) and a private IP on reqwest's second
1330    // lookup (the webhook payload posted internally). The new
1331    // resolver override pins reqwest's connect to the daemon's
1332    // already-validated IP set; reqwest's own DNS query never
1333    // happens for this client.
1334    let (resolved_host, validated_addrs) =
1335        match validate_url_dns_resolved(url, crate::config::allow_loopback_webhooks()) {
1336            Ok(t) => t,
1337            Err(e) => {
1338                tracing::warn!("DNS SSRF guard rejected webhook URL {url}: {e}");
1339                return Err(format!("dns-ssrf-rejected: {e}"));
1340            }
1341        };
1342    // v0.7.0 SR-W3 (HIGH) — redirect SSRF-pin bypass. The
1343    // `builder.resolve(host, addr)` pins below shadow reqwest's DNS
1344    // for the *validated* host only. reqwest's default redirect
1345    // policy follows up to 10 hops and re-resolves DNS for the new
1346    // Location host — a host whose addresses were never SSRF-validated
1347    // — so a webhook endpoint that returns `302 Location:
1348    // http://169.254.169.254/...` would let reqwest connect to an
1349    // internal address the guard never cleared. Disabling redirects
1350    // closes that window: a 3xx is surfaced as a non-success status
1351    // (`http-{status}` below) and fails the dispatch safely, exactly
1352    // like any other non-2xx.
1353    let mut builder = reqwest::blocking::Client::builder()
1354        .timeout(ACK_TIMEOUT)
1355        .redirect(reqwest::redirect::Policy::none());
1356    for addr in &validated_addrs {
1357        // Pin reqwest's per-host override. The override SHADOWS
1358        // reqwest's own DNS query for this host on this client,
1359        // closing the rebind window.
1360        builder = builder.resolve(&resolved_host, *addr);
1361    }
1362    // v0.7.0 #1073 + #1082 (SR-3 perf + SR-2 SSRF) — the SSRF
1363    // hardening at #1082 requires per-call per-host DNS pinning via
1364    // `builder.resolve(host, addr)`; that pin lives on the client
1365    // itself, so a fully process-wide shared client can't hold pins
1366    // that vary per dispatched URL. We still amortize the bulk of
1367    // the work (TLS provider init, default connector) by sharing
1368    // the resolver+TLS state through this thread-local builder.
1369    // A future custom-resolver refactor will move the per-call pin
1370    // onto a trait-object `dns::Resolve` so the client itself can be
1371    // process-shared (matching the federation `peer.rs` pattern, and
1372    // re-introduced through `RuntimeContext` at that point). For now
1373    // correctness (SSRF closure) wins over the per-attempt client
1374    // rebuild cost. The prior `dispatch_http_client()` scaffolding
1375    // was removed in #1196 — it had zero production callers, only a
1376    // test pin kept it alive.
1377    let client = match builder.build() {
1378        Ok(c) => c,
1379        Err(e) => {
1380            tracing::warn!("webhook client build failed: {e}");
1381            return Err(format!("client-build: {e}"));
1382        }
1383    };
1384    let mut req = client
1385        .post(url)
1386        .header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
1387        .header(
1388            "user-agent",
1389            format!("ai-memory/{}", env!("CARGO_PKG_VERSION")),
1390        )
1391        .header(crate::HEADER_AI_MEMORY_TIMESTAMP, timestamp)
1392        .header("x-ai-memory-correlation-id", correlation_id);
1393    if let Some(sig) = signature {
1394        req = req.header(crate::HEADER_AI_MEMORY_SIGNATURE, format!("sha256={sig}"));
1395    }
1396    let resp = match req.body(body.to_string()).send() {
1397        Ok(r) => r,
1398        Err(e) => {
1399            tracing::warn!("webhook POST to {url} failed: {e}");
1400            return Err(crate::errors::msg::network(e));
1401        }
1402    };
1403    if !resp.status().is_success() {
1404        let status = resp.status().as_u16();
1405        return Err(format!("http-{status}"));
1406    }
1407    // K6 ACK contract: receivers MUST return
1408    // {"status":"ack","correlation_id":"..."}. A 2xx with a missing /
1409    // mismatched body is treated as failure so retries kick in.
1410    let ack_body = match resp.text() {
1411        Ok(s) => s,
1412        Err(e) => return Err(format!("ack-read: {e}")),
1413    };
1414    let ack: serde_json::Value = match serde_json::from_str(&ack_body) {
1415        Ok(v) => v,
1416        Err(e) => return Err(format!("ack-decode: {e}")),
1417    };
1418    let status_field = ack.get("status").and_then(|v| v.as_str()).unwrap_or("");
1419    if status_field != "ack" {
1420        return Err(format!("ack-status: {status_field}"));
1421    }
1422    let ack_corr = ack
1423        .get("correlation_id")
1424        .and_then(|v| v.as_str())
1425        .unwrap_or("");
1426    if ack_corr != correlation_id {
1427        return Err(format!("ack-corr-mismatch: {ack_corr}"));
1428    }
1429    Ok(())
1430}
1431
1432/// Hash a plaintext secret (SHA-256 hex).
1433pub(crate) fn sha256_hex(s: &str) -> String {
1434    let mut hasher = Sha256::new();
1435    hasher.update(s.as_bytes());
1436    format!("{:x}", hasher.finalize())
1437}
1438
1439/// HMAC-SHA256 is expensive to implement from scratch; do the simple
1440/// construction manually using the hashed secret as key material.
1441/// Matches the RFC-2104 HMAC construction with SHA-256 as the
1442/// primitive.
1443pub(crate) fn hmac_sha256_hex(key_hex: &str, body: &str) -> String {
1444    const BLOCK: usize = 64;
1445    // v0.7.0 #1048 (Agent-5 #8) — decode the operator-supplied
1446    // `hmac_secret` as hex. The pre-#1048 behaviour silently fell
1447    // back to using the raw config bytes as HMAC key material when
1448    // the hex decode failed, which produced a stable-but-WEAK key
1449    // (`HMAC(b"not-a-hex-key!!", body)` instead of the intended
1450    // hex-decoded bytes). The fallback is preserved for wire
1451    // compatibility because both sender and receiver are typically
1452    // configured from the same secret string — flipping the
1453    // fallback would silently break running federations — but
1454    // every invalid-hex compute emits a WARN so an operator
1455    // running with a misconfigured secret sees the diagnostic on
1456    // every webhook dispatch. The boot-time validator
1457    // [`validate_hmac_secret_hex`] surfaces this BEFORE the daemon
1458    // accepts traffic; operators who want strict hex enforcement
1459    // call the validator at startup.
1460    let mut key = match hex_decode(key_hex) {
1461        Some(k) => k,
1462        None => {
1463            tracing::warn!(
1464                target: SUBSCRIPTIONS_TRACE_TARGET,
1465                "hmac_sha256_hex: hmac_secret is not valid hex (len={}); falling back to raw \
1466                 bytes as key material — this produces a STABLE BUT WEAK key. Re-encode the \
1467                 secret as hex (e.g. `openssl rand -hex 32`) and restart the daemon. \
1468                 See #1048.",
1469                key_hex.len(),
1470            );
1471            key_hex.as_bytes().to_vec()
1472        }
1473    };
1474    if key.len() > BLOCK {
1475        let mut h = Sha256::new();
1476        h.update(&key);
1477        key = h.finalize().to_vec();
1478    }
1479    key.resize(BLOCK, 0);
1480    let mut opad = [0x5cu8; BLOCK];
1481    let mut ipad = [0x36u8; BLOCK];
1482    for i in 0..BLOCK {
1483        opad[i] ^= key[i];
1484        ipad[i] ^= key[i];
1485    }
1486    let mut inner = Sha256::new();
1487    inner.update(ipad);
1488    inner.update(body.as_bytes());
1489    let inner_digest = inner.finalize();
1490    let mut outer = Sha256::new();
1491    outer.update(opad);
1492    outer.update(inner_digest);
1493    format!("{:x}", outer.finalize())
1494}
1495
1496fn hex_decode(s: &str) -> Option<Vec<u8>> {
1497    if !s.len().is_multiple_of(2) {
1498        return None;
1499    }
1500    (0..s.len())
1501        .step_by(2)
1502        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
1503        .collect()
1504}
1505
1506/// v0.7.0 #1048 (Agent-5 #8) — boot-time hex validator for the
1507/// operator-supplied `hmac_secret`. Returns `Ok(())` when the value
1508/// is `None` (no secret configured) OR is valid hex; returns
1509/// `Err(String)` with a descriptive operator-facing message when
1510/// the value is non-hex. Callers MUST surface the error before the
1511/// daemon accepts traffic; the runtime `hmac_sha256_hex` will
1512/// otherwise silently degrade to a stable-but-WEAK key (raw bytes
1513/// of the misconfigured secret) on every dispatch.
1514///
1515/// # Errors
1516///
1517/// Returns the validation error string when the input is
1518/// `Some(secret)` and `secret` fails the hex parse. The message
1519/// includes recommended remediation (`openssl rand -hex 32`).
1520pub fn validate_hmac_secret_hex(secret: Option<&str>) -> Result<(), String> {
1521    let Some(secret) = secret else {
1522        return Ok(());
1523    };
1524    if hex_decode(secret).is_some() {
1525        return Ok(());
1526    }
1527    Err(format!(
1528        "[hooks.subscription] hmac_secret is not valid hex (len={}): expected an even-length \
1529         hex string. Generate a fresh secret with `openssl rand -hex 32` and update the \
1530         config. The runtime still computes a (weak) HMAC under the misconfigured value for \
1531         wire compatibility, but the boot validator refuses to start so the operator sees \
1532         the diagnostic immediately. See #1048.",
1533        secret.len(),
1534    ))
1535}
1536
1537/// SSRF guard with DNS resolution (#301 item 2). Resolves the host
1538/// via the stdlib resolver and rejects if ANY returned
1539/// `SocketAddr`'s IP is private / loopback / link-local. Guards
1540/// against DNS-rebind attacks where an attacker-controlled hostname
1541/// resolves to an internal IP at connect time.
1542///
1543/// Runs in the dispatch thread (blocking). Best-effort: if DNS fails
1544/// we let reqwest surface the error rather than fail closed, because
1545/// transient DNS outages should not silently drop webhook delivery.
1546pub fn validate_url_dns(url: &str) -> Result<()> {
1547    validate_url_dns_with(url, crate::config::allow_loopback_webhooks()).map(|_| ())
1548}
1549
1550/// v0.7.0 #1082 (SR-1 #2, HIGH) — DNS-rebind TOCTOU fix. Returns
1551/// the validated host + the pre-resolved `Vec<SocketAddr>` so the
1552/// caller can pin reqwest's resolver to the SAME addresses the
1553/// SSRF guard cleared. Pre-#1082 the guard resolved once,
1554/// validated, and discarded the addresses — reqwest then did its
1555/// OWN independent resolve at send time, opening a DNS-rebind
1556/// window where the second resolve could return a private IP that
1557/// the daemon never saw.
1558///
1559/// # Errors
1560/// - URL has no scheme.
1561/// - DNS resolution failed AND `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL`
1562///   is not set (post-#1053 fail-CLOSED).
1563/// - Any resolved address is private / link-local (or loopback when
1564///   `allow_loopback` is false).
1565pub(crate) fn validate_url_dns_resolved(
1566    url: &str,
1567    allow_loopback: bool,
1568) -> Result<(String, Vec<std::net::SocketAddr>)> {
1569    validate_url_dns_with(url, allow_loopback)
1570}
1571
1572/// `true` when the operator opted into the legacy permissive SSRF
1573/// posture via `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL` (`1` / `true`).
1574/// Shared by the resolution-failure branch and the RFC 1035
1575/// hostname-shape branch so the two read the override identically.
1576fn ssrf_dns_fail_open() -> bool {
1577    std::env::var("AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL")
1578        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
1579        .unwrap_or(false)
1580}
1581
1582/// RFC 1035 §2.3.4 hostname shape check: every dot-separated label
1583/// must be 1..=63 octets and the whole name (sans one optional
1584/// trailing dot) must be <=253 octets. Returns `true` when the host
1585/// VIOLATES the shape.
1586///
1587/// FX-Fwin (#1053 follow-up) — `getaddrinfo(3)` is documented to
1588/// reject an oversized label with `EAI_*`, but Windows' resolver
1589/// does NOT enforce the 63-octet ceiling at the API boundary (it
1590/// synthesizes a hit / returns `Ok`), so the cross-platform CI
1591/// `Check (windows-latest)` job saw the SSRF guard pass an oversized
1592/// label that macOS/Linux reject. A portable SSRF guard must
1593/// validate the shape itself rather than delegating to libc. IP
1594/// literals (no oversized labels) are unaffected.
1595fn hostname_shape_invalid(host: &str) -> bool {
1596    let name = host.strip_suffix('.').unwrap_or(host);
1597    if name.is_empty() || name.len() > 253 {
1598        return true;
1599    }
1600    name.split('.')
1601        .any(|label| label.is_empty() || label.len() > 63)
1602}
1603
1604/// H11 inner helper: takes `allow_loopback` explicitly so tests can
1605/// assert both branches without poking the process-wide atomic
1606/// (which would race with parallel tests). Production callers go
1607/// through `validate_url_dns`.
1608///
1609/// v0.7.0 #1082 — returns the resolved `(host, addresses)` so the
1610/// `send` path can install a reqwest `Client::builder().resolve()`
1611/// override pinning the connect to exactly the IPs the guard
1612/// cleared. Closes the DNS-rebind TOCTOU window.
1613fn validate_url_dns_with(
1614    url: &str,
1615    allow_loopback: bool,
1616) -> Result<(String, Vec<std::net::SocketAddr>)> {
1617    let lower = url.to_ascii_lowercase();
1618    let (_scheme, rest) = lower
1619        .split_once("://")
1620        .ok_or_else(|| anyhow!("webhook URL missing scheme: {url}"))?;
1621    let host_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
1622    let host_port = &rest[..host_end];
1623    // v0.7.0 #1082 — extract the host (sans port + brackets) for the
1624    // reqwest `Client::builder().resolve(host, addr)` override the
1625    // caller installs. The override matches by host string the
1626    // reqwest URL parser produces, so we strip the brackets / port
1627    // here to match.
1628    let resolved_host = {
1629        let s = host_port;
1630        if let Some(close_idx) = s.strip_prefix('[').and(s.find(']')) {
1631            // [ipv6]:port or [ipv6] — return the inner ipv6 text.
1632            s[1..close_idx].to_string()
1633        } else if let Some(idx) = s.rfind(':') {
1634            // Hostname:port — strip the port. (IPv4-with-port. Bare
1635            // ipv4 has no `:` so falls through to the else branch.)
1636            s[..idx].to_string()
1637        } else {
1638            s.to_string()
1639        }
1640    };
1641    // Supply a default port so ToSocketAddrs resolves correctly.
1642    // SSRF fix (W11): bracketed IPv6 without an explicit port ("[fe80::1]"
1643    // with no trailing ":N") was previously passed to ToSocketAddrs as-is,
1644    // which errors with "invalid port value" — and the catch-all `Err(_) =>
1645    // return Ok(())` below treated that as a DNS hiccup, silently bypassing
1646    // the SSRF guard. Detect the no-trailing-port form and append `:80` so
1647    // resolution succeeds and the IP is checked.
1648    let resolv_target =
1649        if let Some(close_idx) = host_port.strip_prefix('[').and(host_port.find(']')) {
1650            let after_bracket = &host_port[close_idx + 1..];
1651            if after_bracket.starts_with(':') {
1652                // [ipv6]:port — already has a port
1653                host_port.to_string()
1654            } else {
1655                // [ipv6] without port — append default
1656                format!("{host_port}:80")
1657            }
1658        } else if host_port.contains(':') {
1659            // IPv4:port or hostname:port — use as-is
1660            host_port.to_string()
1661        } else {
1662            format!("{host_port}:80")
1663        };
1664    // v0.7.0 #1053 (Agent-2 #3) — fail-CLOSED on DNS resolution
1665    // failure. Pre-#1053 a SERVFAIL / timeout / hang at the daemon's
1666    // resolver returned Ok(()) here, but the subsequent
1667    // `reqwest::send` performs its OWN DNS resolution under
1668    // potentially-attacker-controlled DNS (TTL-zero / DNS rebind).
1669    // The attacker's first resolution at the daemon could SERVFAIL
1670    // (bypassing the SSRF guard), then their second resolution under
1671    // reqwest could return a private-range or link-local IP
1672    // (169.254.169.254 cloud metadata, 127.0.0.1:5432 Postgres,
1673    // 10.0.0.0/8 internal services). Closing the gap by treating
1674    // DNS failure as Err means an attacker can't smuggle internal
1675    // IPs through a DNS-rebind path even if the daemon's resolver
1676    // hiccups.
1677    //
1678    // Operators with environments where transient DNS pressure is
1679    // expected (containers with flaky CoreDNS, etc.) can opt back
1680    // into the legacy permissive posture via
1681    // `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1`. The unsafe override is
1682    // logged at WARN on every fire so an audit can detect the
1683    // legacy-permissive mode.
1684    // FX-Fwin (#1053 follow-up) — enforce RFC 1035 hostname shape
1685    // IN-GUARD before resolution. Windows' `getaddrinfo` does not
1686    // reject an oversized (>63-octet) DNS label at the API boundary,
1687    // so relying on the resolver to reject it (as the pre-FX-Fwin code
1688    // did) made the guard platform-dependent. A shape violation is
1689    // treated identically to a DNS resolution failure: fail-CLOSED
1690    // unless the operator opted into the legacy permissive posture.
1691    if hostname_shape_invalid(&resolved_host) {
1692        if ssrf_dns_fail_open() {
1693            tracing::warn!(
1694                target: SUBSCRIPTIONS_TRACE_TARGET,
1695                "SSRF guard: hostname {resolved_host} violates RFC 1035 label/length \
1696                 limits for {url}; AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 — degrading to \
1697                 ALLOW (UNSAFE, legacy posture)"
1698            );
1699            return Ok((resolved_host, Vec::new()));
1700        }
1701        return Err(anyhow!(
1702            "SSRF guard: DNS resolution failed for {url}: hostname violates RFC 1035 \
1703             label/length limits; failing CLOSED (post-#1053 secure default — set \
1704             AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 to revert)"
1705        ));
1706    }
1707    let addrs: Vec<std::net::SocketAddr> = match resolv_target.to_socket_addrs() {
1708        Ok(iter) => iter.collect(),
1709        Err(e) => {
1710            let fail_open = ssrf_dns_fail_open();
1711            if fail_open {
1712                tracing::warn!(
1713                    target: SUBSCRIPTIONS_TRACE_TARGET,
1714                    "SSRF guard: DNS resolution failed for {url}: {e}; \
1715                     AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 — degrading to ALLOW \
1716                     (UNSAFE, legacy posture) — reqwest's resolver may bind to \
1717                     private/loopback IPs the daemon could not pre-check"
1718                );
1719                // Empty address list — caller's resolver-override
1720                // loop is a no-op and reqwest falls back to its
1721                // own DNS query (the explicit UNSAFE legacy posture).
1722                return Ok((resolved_host, Vec::new()));
1723            }
1724            return Err(anyhow!(
1725                "SSRF guard: DNS resolution failed for {url}: {e}; failing CLOSED \
1726                 (post-#1053 secure default — set AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 to revert)"
1727            ));
1728        }
1729    };
1730    for addr in &addrs {
1731        let ip = addr.ip();
1732        if is_private(ip) && !is_loopback_normalized(ip) {
1733            return Err(anyhow!(
1734                "host resolves to private/link-local IP {ip}: {url}"
1735            ));
1736        }
1737        // H11 (#628 blocker) — DNS-rebind protection for loopback.
1738        // Default-OFF; operators with `[subscriptions]
1739        // allow_loopback_webhooks = true` accept loopback-resolving
1740        // hostnames.
1741        if is_loopback_normalized(ip) && !allow_loopback {
1742            return Err(anyhow!(
1743                "host resolves to loopback IP {ip}: {url} — rejected by default \
1744                 (SSRF guard); set `[subscriptions] allow_loopback_webhooks = true` \
1745                 to opt in"
1746            ));
1747        }
1748    }
1749    // v0.7.0 #1082 — return the resolved host + addr list so the
1750    // caller can pin reqwest's connect to exactly these addresses.
1751    Ok((resolved_host, addrs))
1752}
1753
1754/// SSRF guard. Rejects URLs that would cause the daemon to connect
1755/// to private-range addresses, link-local, loopback (except
1756/// explicitly), or non-HTTPS remote hosts.
1757pub fn validate_url(url: &str) -> Result<()> {
1758    validate_url_with(url, crate::config::allow_loopback_webhooks())
1759}
1760
1761/// H11 inner helper: takes `allow_loopback` explicitly so tests can
1762/// assert both branches without poking the process-wide atomic
1763/// (which would race with parallel tests). Production callers go
1764/// through `validate_url`.
1765fn validate_url_with(url: &str, allow_loopback: bool) -> Result<()> {
1766    // Cheap scheme check without pulling the `url` crate.
1767    let lower = url.to_ascii_lowercase();
1768    let (scheme, rest) = lower
1769        .split_once("://")
1770        .ok_or_else(|| anyhow!("webhook URL missing scheme: {url}"))?;
1771    if scheme != "https" && scheme != "http" {
1772        return Err(anyhow!("webhook URL scheme must be http(s): {url}"));
1773    }
1774    // Extract host (portion before '/' or ':' or '?'). IPv6 URLs use
1775    // `[ipv6]:port` syntax — the brackets must be stripped and the
1776    // colon-split must skip the colons inside the v6 literal.
1777    let host_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
1778    let host_port = &rest[..host_end];
1779    let host: String = if let Some(stripped) = host_port.strip_prefix('[') {
1780        // IPv6: host is everything before the closing bracket.
1781        match stripped.find(']') {
1782            Some(i) => stripped[..i].to_string(),
1783            None => return Err(anyhow!("malformed IPv6 URL host: {url}")),
1784        }
1785    } else {
1786        // IPv4 / hostname.
1787        host_port
1788            .rsplit_once(':')
1789            .map_or(host_port.to_string(), |(h, _)| h.to_string())
1790    };
1791    let host = host.as_str();
1792    // H11 (#628 blocker): loopback hostnames + IPs are rejected by
1793    // default. Operators who need to point a webhook at a local
1794    // listener (CI, dev) opt in via `[subscriptions]
1795    // allow_loopback_webhooks = true`. Default-OFF closes an
1796    // authenticated SSRF gadget against local services (Postgres on
1797    // 5432, the hooks daemon, etc.).
1798    let is_loopback_hostname = matches!(host, "localhost" | "localhost.localdomain" | "");
1799    let parsed_ip = IpAddr::from_str(host).ok();
1800    let is_loopback_ip = parsed_ip.is_some_and(is_loopback_normalized);
1801    let is_loopback = is_loopback_hostname || is_loopback_ip;
1802    if is_loopback && !allow_loopback {
1803        return Err(anyhow!(
1804            "webhook URL targets loopback address {url} — rejected by default \
1805             (SSRF guard); set `[subscriptions] allow_loopback_webhooks = true` \
1806             to opt in (testing / dev only)"
1807        ));
1808    }
1809    if scheme == "http" && !is_loopback {
1810        // Accept http only to parsed-loopback IPs; everything else
1811        // requires https.
1812        if let Some(ip) = parsed_ip {
1813            if !is_loopback_normalized(ip) {
1814                return Err(anyhow!(
1815                    "webhook URL must be https for non-loopback host: {url}"
1816                ));
1817            }
1818        } else {
1819            return Err(anyhow!(
1820                "webhook URL must be https for non-loopback host: {url}"
1821            ));
1822        }
1823    }
1824    // Reject private-range IPs regardless of scheme (RFC1918 / RFC4193 /
1825    // link-local). Hostnames that resolve to private ranges are not
1826    // caught here — the dispatch thread will still be able to reach
1827    // them; operators who want to reach internal services should set
1828    // up reverse proxies or allow explicitly in config.
1829    if let Some(ip) = parsed_ip
1830        && is_private(ip)
1831        && !is_loopback_normalized(ip)
1832    {
1833        return Err(anyhow!(
1834            "webhook URL targets private / link-local address: {url}"
1835        ));
1836    }
1837    Ok(())
1838}
1839
1840// SSRF fix (#628 H3 follow-up): canonicalize IPv6 addresses that wrap an
1841// IPv4 address (4-mapped `::ffff:a.b.c.d`, deprecated 4-compatible
1842// `::a.b.c.d`, and the well-known NAT64 prefix `64:ff9b::/96`) into their
1843// IPv4 form before applying SSRF checks. Without this, `Ipv6Addr::is_loopback()`
1844// and the v6 branch of `is_private` silently miss `::ffff:127.0.0.1`,
1845// `::ffff:10.0.0.1`, `::ffff:169.254.1.1`, and similar bypasses.
1846fn normalize_ip(ip: IpAddr) -> IpAddr {
1847    match ip {
1848        IpAddr::V6(v6) => {
1849            let canonical = v6.to_canonical();
1850            if matches!(canonical, IpAddr::V4(_)) {
1851                return canonical;
1852            }
1853            let segs = v6.segments();
1854            if segs[0] == 0x0064
1855                && segs[1] == 0xff9b
1856                && segs[2] == 0
1857                && segs[3] == 0
1858                && segs[4] == 0
1859                && segs[5] == 0
1860            {
1861                let v4_bits = (u32::from(segs[6]) << 16) | u32::from(segs[7]);
1862                return IpAddr::V4(Ipv4Addr::from(v4_bits));
1863            }
1864            IpAddr::V6(v6)
1865        }
1866        v4 @ IpAddr::V4(_) => v4,
1867    }
1868}
1869
1870fn is_loopback_normalized(ip: IpAddr) -> bool {
1871    normalize_ip(ip).is_loopback()
1872}
1873
1874fn is_private(ip: IpAddr) -> bool {
1875    match normalize_ip(ip) {
1876        IpAddr::V4(v4) => {
1877            // SSRF fix (W11): include `is_unspecified` (0.0.0.0). On most
1878            // OSes the kernel routes 0.0.0.0 to a local listener, so an
1879            // attacker-controlled hostname resolving to 0.0.0.0 hits the
1880            // local box.
1881            v4.is_private()
1882                || v4.is_link_local()
1883                || v4.is_multicast()
1884                || v4.is_broadcast()
1885                || v4.is_unspecified()
1886        }
1887        IpAddr::V6(v6) => {
1888            // Conservative: reject unique-local (fc00::/7), link-local
1889            // (fe80::/10), multicast, and the unspecified address `::`.
1890            // SSRF fix (W11): `is_unspecified` covers `[::]`, which most
1891            // kernels route to local services.
1892            let segs = v6.segments();
1893            v6.is_multicast()
1894                || v6.is_unspecified()
1895                || (segs[0] & 0xfe00) == 0xfc00 // ULA
1896                || (segs[0] & 0xffc0) == 0xfe80 // link-local
1897        }
1898    }
1899}
1900
1901/// v0.7.0 #1072 — kept for the existing test harness and the
1902/// public-API contract. Production dispatch routes through
1903/// [`load_secret_hash_with_conn`] to reuse the caller's connection.
1904#[allow(dead_code)]
1905fn load_secret_hash(db_path: &std::path::Path, sub_id: &str) -> Result<Option<String>> {
1906    let conn = Connection::open(db_path).context("load_secret_hash open")?;
1907    load_secret_hash_with_conn(&conn, sub_id)
1908}
1909
1910/// v0.7.0 #1072 — `Connection`-reuse variant of [`load_secret_hash`].
1911/// Lets the per-subscription dispatch worker open ONE
1912/// `Connection::open` per delivery (instead of 4-5 across the
1913/// secret-load / event-audit / status-update / dispatch-counter /
1914/// DLQ sites). Same pattern as the #1017 hook-sink fix.
1915fn load_secret_hash_with_conn(conn: &Connection, sub_id: &str) -> Result<Option<String>> {
1916    conn.query_row(
1917        "SELECT secret_hash FROM subscriptions WHERE id = ?1",
1918        params![sub_id],
1919        |r| r.get::<_, Option<String>>(0),
1920    )
1921    .context("load_secret_hash query")
1922}
1923
1924/// v0.7.0 K6 — append a `subscription_events` audit row for one
1925/// outgoing delivery. Called from the dispatch worker BEFORE the
1926/// network send so replay-from-cursor (K7) sees a stable record even
1927/// if the dispatcher process crashes mid-retry. The row is created
1928/// with `delivery_status = 'pending'`; [`update_event_status`]
1929/// transitions it to `'ack'` / `'failed'` once the retry ladder
1930/// settles.
1931pub fn record_subscription_event(
1932    db_path: &std::path::Path,
1933    sub_id: &str,
1934    correlation_id: &str,
1935    event_type: &str,
1936    payload: &str,
1937) -> Result<()> {
1938    let conn = Connection::open(db_path).context("subscription_events open")?;
1939    record_subscription_event_with_conn(&conn, sub_id, correlation_id, event_type, payload)
1940}
1941
1942/// v0.7.0 #1072 — `Connection`-reuse variant. Used by the per-
1943/// subscription dispatch worker so a single thread-local connection
1944/// covers every sqlite write in the delivery path.
1945pub fn record_subscription_event_with_conn(
1946    conn: &Connection,
1947    sub_id: &str,
1948    correlation_id: &str,
1949    event_type: &str,
1950    payload: &str,
1951) -> Result<()> {
1952    let now = chrono::Utc::now().to_rfc3339();
1953    conn.execute(
1954        "INSERT INTO subscription_events \
1955         (subscription_id, correlation_id, event_type, payload, delivered_at, delivery_status) \
1956         VALUES (?1, ?2, ?3, ?4, ?5, 'pending')",
1957        params![sub_id, correlation_id, event_type, payload, now],
1958    )
1959    .context("subscription_events insert")?;
1960    Ok(())
1961}
1962
1963/// v0.7.0 K6 — transition the audit row's `delivery_status` after the
1964/// retry ladder settles. Best-effort: a failure here is logged and
1965/// otherwise ignored so the dispatcher loop never blocks on the
1966/// audit table.
1967fn update_event_status(db_path: &std::path::Path, correlation_id: &str, ok: bool) {
1968    let Ok(conn) = Connection::open(db_path) else {
1969        return;
1970    };
1971    update_event_status_with_conn(&conn, correlation_id, ok);
1972}
1973
1974/// v0.7.0 #1072 — `Connection`-reuse variant of
1975/// [`update_event_status`].
1976fn update_event_status_with_conn(conn: &Connection, correlation_id: &str, ok: bool) {
1977    let status = if ok { "ack" } else { "failed" };
1978    let _ = conn.execute(
1979        "UPDATE subscription_events SET delivery_status = ?1 WHERE correlation_id = ?2",
1980        params![status, correlation_id],
1981    );
1982}
1983
1984/// v0.7.0 K6 — append a `subscription_dlq` row for a delivery that
1985/// exhausted the [200ms, 1s, 5s] retry ladder. K7's inspector tool
1986/// surfaces these rows to operators; K6 only ships the writer.
1987#[allow(clippy::too_many_arguments)]
1988pub fn record_dlq(
1989    db_path: &std::path::Path,
1990    sub_id: &str,
1991    correlation_id: &str,
1992    event_type: &str,
1993    payload: &str,
1994    retry_count: i64,
1995    last_error: &str,
1996    first_failed_at: &str,
1997    last_failed_at: &str,
1998) -> Result<()> {
1999    let conn = Connection::open(db_path).context("subscription_dlq open")?;
2000    record_dlq_with_conn(
2001        &conn,
2002        sub_id,
2003        correlation_id,
2004        event_type,
2005        payload,
2006        retry_count,
2007        last_error,
2008        first_failed_at,
2009        last_failed_at,
2010    )
2011}
2012
2013/// #1253 (MED, 2026-05-25) — operator-disk-fill DoS guard. Hard cap
2014/// on the per-subscription `subscription_dlq` depth. Pre-#1253 the
2015/// DLQ grew unboundedly when a hostile (or simply broken) webhook
2016/// target failed every delivery — a single bad subscriber could
2017/// exhaust the operator's disk by sustaining a high failure rate.
2018/// Past this cap [`record_dlq_with_conn`] refuses the insert with
2019/// the typed `dlq_overflow` error, increments the
2020/// `ai_memory_subscription_dlq_overflow_total` counter, and emits a
2021/// `tracing::warn!`. Operators drain the queue via
2022/// `memory_subscription_dlq_list` + the planned `... dlq drain`
2023/// admin tool before resetting. 10_000 rows is well above realistic
2024/// transient-failure depths (a healthy peer recovers in minutes, not
2025/// thousands of events) while still bounded enough that a hostile
2026/// peer can write at most ~10 MB before being capped (the payload
2027/// column is itself bounded by the webhook JSON-body cap upstream).
2028pub const MAX_SUBSCRIPTION_DLQ_ROWS: i64 = 10_000;
2029
2030/// v0.7.0 #1072 — `Connection`-reuse variant of [`record_dlq`].
2031#[allow(clippy::too_many_arguments)]
2032pub fn record_dlq_with_conn(
2033    conn: &Connection,
2034    sub_id: &str,
2035    correlation_id: &str,
2036    event_type: &str,
2037    payload: &str,
2038    retry_count: i64,
2039    last_error: &str,
2040    first_failed_at: &str,
2041    last_failed_at: &str,
2042) -> Result<()> {
2043    // #1253 — refuse the insert if the per-subscription DLQ is full.
2044    // Counting before the INSERT keeps the cap honest under
2045    // concurrent dispatch threads (the SQLite single-writer lock
2046    // serialises this read+write pair against any sibling DLQ
2047    // insert against the same subscription).
2048    let depth: i64 = conn
2049        .query_row(
2050            "SELECT COUNT(*) FROM subscription_dlq WHERE subscription_id = ?1",
2051            params![sub_id],
2052            |row| row.get(0),
2053        )
2054        .context("subscription_dlq depth probe")?;
2055    if depth >= MAX_SUBSCRIPTION_DLQ_ROWS {
2056        crate::metrics::record_subscription_dlq_overflow();
2057        tracing::warn!(
2058            subscription_id = %sub_id,
2059            correlation_id = %correlation_id,
2060            event_type = %event_type,
2061            depth = depth,
2062            cap = MAX_SUBSCRIPTION_DLQ_ROWS,
2063            "dlq_overflow: refusing subscription_dlq insert — per-subscription cap reached",
2064        );
2065        return Err(anyhow!(
2066            "dlq_overflow: subscription {sub_id} dlq at cap ({MAX_SUBSCRIPTION_DLQ_ROWS}); drain before further inserts"
2067        ));
2068    }
2069    conn.execute(
2070        "INSERT INTO subscription_dlq \
2071         (subscription_id, correlation_id, event_type, payload, retry_count, last_error, first_failed_at, last_failed_at) \
2072         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2073        params![
2074            sub_id,
2075            correlation_id,
2076            event_type,
2077            payload,
2078            retry_count,
2079            last_error,
2080            first_failed_at,
2081            last_failed_at,
2082        ],
2083    )
2084    .context("subscription_dlq insert")?;
2085    Ok(())
2086}
2087
2088/// v0.7.0 K6 — list `subscription_dlq` rows. Used by the K7
2089/// inspector tool (not registered in MCP yet) and by the K6
2090/// integration test suite.
2091pub fn list_dlq(conn: &Connection, subscription_id: Option<&str>) -> Result<Vec<DlqEntry>> {
2092    let mut out = Vec::new();
2093    if let Some(sub_id) = subscription_id {
2094        let mut stmt = conn.prepare(
2095            "SELECT id, subscription_id, correlation_id, event_type, payload, retry_count, last_error, first_failed_at, last_failed_at \
2096             FROM subscription_dlq WHERE subscription_id = ?1 ORDER BY id ASC",
2097        )?;
2098        let rows = stmt.query_map(params![sub_id], dlq_row_to_entry)?;
2099        for r in rows {
2100            out.push(r?);
2101        }
2102    } else {
2103        let mut stmt = conn.prepare(
2104            "SELECT id, subscription_id, correlation_id, event_type, payload, retry_count, last_error, first_failed_at, last_failed_at \
2105             FROM subscription_dlq ORDER BY id ASC",
2106        )?;
2107        let rows = stmt.query_map([], dlq_row_to_entry)?;
2108        for r in rows {
2109            out.push(r?);
2110        }
2111    }
2112    Ok(out)
2113}
2114
2115fn dlq_row_to_entry(row: &rusqlite::Row) -> rusqlite::Result<DlqEntry> {
2116    Ok(DlqEntry {
2117        id: row.get(0)?,
2118        subscription_id: row.get(1)?,
2119        correlation_id: row.get(2)?,
2120        event_type: row.get(3)?,
2121        payload: row.get(4)?,
2122        retry_count: row.get(5)?,
2123        last_error: row.get(6)?,
2124        first_failed_at: row.get(7)?,
2125        last_failed_at: row.get(8)?,
2126    })
2127}
2128
2129/// v0.7.0 K6 — replay subscription events for a single subscription
2130/// since `since_rfc3339`. Returns the audit rows ordered by
2131/// `delivered_at` ascending (so cursor-by-time scans are stable).
2132///
2133/// **MCP wiring (v0.7 K7, landed):** the companion
2134/// `memory_subscription_replay` MCP tool IS registered in the
2135/// dispatch table — see `MEMORY_SUBSCRIPTION_REPLAY` in
2136/// `src/mcp/registry.rs` (entry in `tool_names::ALL`) and the
2137/// `handle_subscription_replay` handler in
2138/// `src/mcp/tools/subscribe.rs`. The "K6 deferred to K7" gating
2139/// noted in the original comment is closed; this docstring was
2140/// stale per the v0.7.0 multi-agent literal-sweep (scanner B
2141/// finding F-B6.x).
2142pub fn replay_subscription_events(
2143    conn: &Connection,
2144    subscription_id: &str,
2145    since_rfc3339: &str,
2146) -> Result<Vec<SubscriptionEvent>> {
2147    let mut stmt = conn.prepare(
2148        "SELECT id, subscription_id, correlation_id, event_type, payload, delivered_at, delivery_status \
2149         FROM subscription_events \
2150         WHERE subscription_id = ?1 AND delivered_at >= ?2 \
2151         ORDER BY delivered_at ASC, id ASC",
2152    )?;
2153    let rows = stmt.query_map(params![subscription_id, since_rfc3339], |row| {
2154        Ok(SubscriptionEvent {
2155            id: row.get(0)?,
2156            subscription_id: row.get(1)?,
2157            correlation_id: row.get(2)?,
2158            event_type: row.get(3)?,
2159            payload: row.get(4)?,
2160            delivered_at: row.get(5)?,
2161            delivery_status: row.get(6)?,
2162        })
2163    })?;
2164    let mut out = Vec::new();
2165    for r in rows {
2166        out.push(r.context("subscription_events row decode")?);
2167    }
2168    Ok(out)
2169}
2170
2171/// v0.7.0 K6 — handler for `memory_subscription_replay`. K7 wired
2172/// this into the MCP dispatch table behind the existing
2173/// `memory_subscription_*` family — see
2174/// `src/mcp/tools/subscribe.rs::handle_subscription_replay` for the
2175/// thin MCP wrapper that delegates here. The K6-vintage "DO NOT add
2176/// to MCP dispatch table" warning in the prior docstring was stale
2177/// per the v0.7.0 multi-agent literal-sweep (scanner B finding
2178/// F-B6.x); registration has been in tree since K7.
2179pub fn memory_subscription_replay(
2180    conn: &Connection,
2181    subscription_id: &str,
2182    since_rfc3339: &str,
2183) -> Result<serde_json::Value> {
2184    let events = replay_subscription_events(conn, subscription_id, since_rfc3339)?;
2185    Ok(serde_json::json!({
2186        (field_names::SUBSCRIPTION_ID): subscription_id,
2187        "since": since_rfc3339,
2188        "count": events.len(),
2189        "events": events,
2190    }))
2191}
2192
2193fn record_dispatch(db_path: &std::path::Path, sub_id: &str, ok: bool) {
2194    let Ok(conn) = Connection::open(db_path) else {
2195        return;
2196    };
2197    record_dispatch_with_conn(&conn, sub_id, ok);
2198}
2199
2200/// v0.7.0 #1072 — `Connection`-reuse variant of [`record_dispatch`].
2201fn record_dispatch_with_conn(conn: &Connection, sub_id: &str, ok: bool) {
2202    let now = chrono::Utc::now().to_rfc3339();
2203    let sql = if ok {
2204        "UPDATE subscriptions SET dispatch_count = dispatch_count + 1, last_dispatched_at = ?1 WHERE id = ?2"
2205    } else {
2206        "UPDATE subscriptions SET dispatch_count = dispatch_count + 1, failure_count = failure_count + 1, last_dispatched_at = ?1 WHERE id = ?2"
2207    };
2208    let _ = conn.execute(sql, params![now, sub_id]);
2209}
2210
2211#[cfg(test)]
2212mod tests {
2213    use super::*;
2214
2215    /// Serializes the two #1053 tests that mutate the process-global
2216    /// `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL` env var. Without it they
2217    /// race under the default parallel test runner: the fail-open
2218    /// test's `set_var` can be observed by the fail-closed test
2219    /// mid-flight, flipping its expected `Err` (fail-CLOSED) into an
2220    /// `Ok` (legacy permissive) and panicking. Poison-tolerant via
2221    /// `into_inner` so one panicking test doesn't cascade-fail the
2222    /// other.
2223    static SSRF_ENV_GUARD: std::sync::Mutex<()> = std::sync::Mutex::new(());
2224
2225    #[test]
2226    fn https_allowed() {
2227        assert!(validate_url("https://example.com/hook").is_ok());
2228        assert!(validate_url("https://api.example.com:8443/hook?x=1").is_ok());
2229    }
2230
2231    /// Regression for #1265 — webhook dispatch User-Agent must track
2232    /// `CARGO_PKG_VERSION` rather than the v0.6.0.0 string the original
2233    /// site hardcoded. The `format!` here mirrors the dispatch site in
2234    /// `fn send()` so a bit-rot of one side is caught by the test.
2235    #[test]
2236    fn webhook_user_agent_tracks_cargo_pkg_version() {
2237        let ua = format!("ai-memory/{}", env!("CARGO_PKG_VERSION"));
2238        let expected = format!("ai-memory/{}", env!("CARGO_PKG_VERSION"));
2239        assert_eq!(ua, expected);
2240        // The legacy hardcoded value MUST NOT be the current expected
2241        // value — guards against a future revert to a literal string.
2242        assert_ne!(ua, "ai-memory/0.6.0.0");
2243        // Shape sanity: matches the documented `ai-memory/<semver>` form.
2244        assert!(ua.starts_with("ai-memory/"));
2245        assert!(ua.len() > "ai-memory/".len());
2246    }
2247
2248    #[test]
2249    fn http_only_to_loopback() {
2250        // H11 inner helper — assert with allow_loopback=true so the
2251        // test does not depend on the test-build default and does not
2252        // race with parallel tests poking the global atomic.
2253        assert!(validate_url_with("http://localhost/hook", true).is_ok());
2254        assert!(validate_url_with("http://127.0.0.1:8080/hook", true).is_ok());
2255        // IPv6 in URLs must be bracketed per RFC 3986 §3.2.2.
2256        assert!(validate_url_with("http://[::1]/hook", true).is_ok());
2257        assert!(validate_url_with("http://example.com/hook", true).is_err());
2258        assert!(validate_url_with("http://8.8.8.8/hook", true).is_err());
2259    }
2260
2261    #[test]
2262    fn loopback_rejected_by_default_h11() {
2263        // H11 (#628 blocker) — loopback URLs are rejected without an
2264        // explicit opt-in. This closes an authenticated SSRF gadget
2265        // against local services (Postgres on 5432, hooks daemon, …).
2266        // Uses the inner helper so the assertion does not race with
2267        // parallel tests that touch `crate::config` or use real
2268        // loopback URLs through `validate_url`.
2269        for url in [
2270            "http://127.0.0.1:5432/hook",
2271            "http://localhost/hook",
2272            "http://[::1]/hook",
2273            "https://127.0.0.1/hook",
2274            "https://localhost/hook",
2275        ] {
2276            let res = validate_url_with(url, false);
2277            assert!(
2278                res.is_err(),
2279                "loopback URL {url} must be rejected when allow_loopback=false (H11), got {res:?}"
2280            );
2281            let msg = res.unwrap_err().to_string();
2282            assert!(
2283                msg.contains("loopback") || msg.contains("SSRF"),
2284                "rejection message should explain loopback policy, got: {msg}"
2285            );
2286        }
2287    }
2288
2289    #[test]
2290    fn loopback_accepted_when_opted_in_h11() {
2291        // H11 — operators who need loopback for CI/testing opt in via
2292        // `[subscriptions] allow_loopback_webhooks = true`. Inner
2293        // helper isolates this test from the global atomic.
2294        assert!(validate_url_with("http://127.0.0.1:9999/hook", true).is_ok());
2295        assert!(validate_url_with("http://localhost/hook", true).is_ok());
2296        assert!(validate_url_with("http://[::1]/hook", true).is_ok());
2297    }
2298
2299    #[test]
2300    fn private_ranges_blocked() {
2301        assert!(validate_url("https://10.0.0.1/hook").is_err());
2302        assert!(validate_url("https://192.168.1.1/hook").is_err());
2303        assert!(validate_url("https://172.16.0.1/hook").is_err());
2304        assert!(validate_url("https://169.254.1.1/hook").is_err());
2305        assert!(validate_url("https://[fc00::1]/hook").is_err());
2306        assert!(validate_url("https://[fe80::1]/hook").is_err());
2307    }
2308
2309    #[test]
2310    fn nonsense_rejected() {
2311        assert!(validate_url("ftp://example.com").is_err());
2312        assert!(validate_url("notaurl").is_err());
2313        assert!(validate_url("").is_err());
2314    }
2315
2316    #[test]
2317    fn rejects_v4_mapped_ipv6_loopback() {
2318        // SSRF (#628 H3 follow-up): `Ipv6Addr::is_loopback()` returns false
2319        // for `::ffff:127.0.0.1`, but on dual-stack hosts the kernel routes
2320        // these to the v4 loopback service. `normalize_ip` collapses to v4
2321        // before checking.
2322        // Use `validate_url_with(.., false)` (loopback-disabled) to
2323        // avoid racing with parallel tests that flip the process-wide
2324        // allow_loopback flag (matches the `loopback_blocked_by_default`
2325        // test pattern below).
2326        assert!(validate_url_with("https://[::ffff:127.0.0.1]/hook", false).is_err());
2327        assert!(validate_url_with("https://[::ffff:7f00:1]/hook", false).is_err());
2328    }
2329
2330    #[test]
2331    fn rejects_v4_mapped_ipv6_private() {
2332        assert!(validate_url_with("https://[::ffff:10.0.0.1]/hook", false).is_err());
2333        assert!(validate_url_with("https://[::ffff:192.168.1.1]/hook", false).is_err());
2334        assert!(validate_url_with("https://[::ffff:172.16.0.1]/hook", false).is_err());
2335        assert!(validate_url_with("https://[::ffff:169.254.1.1]/hook", false).is_err());
2336        assert!(validate_url_with("https://[::ffff:0.0.0.0]/hook", false).is_err());
2337    }
2338
2339    #[test]
2340    fn rejects_nat64_well_known_prefix() {
2341        // 64:ff9b::/96 — RFC 6052 well-known NAT64 prefix. On hosts with
2342        // NAT64 deployed, packets to `64:ff9b::a.b.c.d` are translated to
2343        // `a.b.c.d` and forwarded; an SSRF gadget if `a.b.c.d` is loopback
2344        // or private.
2345        assert!(validate_url_with("https://[64:ff9b::127.0.0.1]/hook", false).is_err());
2346        assert!(validate_url_with("https://[64:ff9b::10.0.0.1]/hook", false).is_err());
2347        assert!(validate_url_with("https://[64:ff9b::169.254.1.1]/hook", false).is_err());
2348    }
2349
2350    #[test]
2351    fn allows_v4_mapped_loopback_when_opted_in() {
2352        // Symmetric with the existing loopback opt-in: when allow_loopback
2353        // is true, `::ffff:127.0.0.1` should be accepted (same as plain
2354        // `127.0.0.1`).
2355        assert!(validate_url_with("http://[::ffff:127.0.0.1]/hook", true).is_ok());
2356    }
2357
2358    #[test]
2359    fn hmac_sha256_stable() {
2360        // Known vector: HMAC-SHA256("key", "The quick brown fox jumps over the lazy dog")
2361        // = f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8
2362        let key = hex::encode_fallback("key".as_bytes());
2363        let got = hmac_sha256_hex(&key, "The quick brown fox jumps over the lazy dog");
2364        assert_eq!(
2365            got,
2366            "f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
2367        );
2368    }
2369
2370    #[test]
2371    fn filter_wildcards() {
2372        assert!(matches_filters(
2373            "*",
2374            None,
2375            None,
2376            None,
2377            "memory_store",
2378            "ns",
2379            None
2380        ));
2381        assert!(matches_filters(
2382            "memory_store,memory_delete",
2383            None,
2384            None,
2385            None,
2386            "memory_store",
2387            "ns",
2388            None
2389        ));
2390        assert!(!matches_filters(
2391            "memory_delete",
2392            None,
2393            None,
2394            None,
2395            "memory_store",
2396            "ns",
2397            None
2398        ));
2399        assert!(matches_filters(
2400            "*",
2401            None,
2402            Some("foo"),
2403            None,
2404            "memory_store",
2405            "foo",
2406            None
2407        ));
2408        assert!(!matches_filters(
2409            "*",
2410            None,
2411            Some("foo"),
2412            None,
2413            "memory_store",
2414            "bar",
2415            None
2416        ));
2417        assert!(matches_filters(
2418            "*",
2419            None,
2420            None,
2421            Some("alice"),
2422            "memory_store",
2423            "ns",
2424            Some("alice")
2425        ));
2426        assert!(!matches_filters(
2427            "*",
2428            None,
2429            None,
2430            Some("alice"),
2431            "memory_store",
2432            "ns",
2433            Some("bob")
2434        ));
2435    }
2436
2437    #[test]
2438    fn filter_event_types_overrides_legacy_events() {
2439        // P5 (G9): when the structured `event_types` opt-in is Some,
2440        // the legacy `events` whitelist is ignored.
2441        let opt_in_store_only: Vec<String> = vec!["memory_store".to_string()];
2442        // Legacy says "all events", structured says "store only" — store
2443        // matches, delete does not.
2444        assert!(matches_filters(
2445            "*",
2446            Some(&opt_in_store_only),
2447            None,
2448            None,
2449            "memory_store",
2450            "ns",
2451            None
2452        ));
2453        assert!(!matches_filters(
2454            "*",
2455            Some(&opt_in_store_only),
2456            None,
2457            None,
2458            "memory_delete",
2459            "ns",
2460            None
2461        ));
2462        // Structured opt-in with multiple types matches each.
2463        let multi: Vec<String> = vec![
2464            "memory_promote".to_string(),
2465            "memory_link_created".to_string(),
2466        ];
2467        assert!(matches_filters(
2468            "memory_store",
2469            Some(&multi),
2470            None,
2471            None,
2472            "memory_promote",
2473            "ns",
2474            None
2475        ));
2476        assert!(!matches_filters(
2477            "memory_store",
2478            Some(&multi),
2479            None,
2480            None,
2481            "memory_store",
2482            "ns",
2483            None
2484        ));
2485        // Empty structured list = no events match (defensive).
2486        let empty: Vec<String> = vec![];
2487        assert!(!matches_filters(
2488            "*",
2489            Some(&empty),
2490            None,
2491            None,
2492            "memory_store",
2493            "ns",
2494            None
2495        ));
2496    }
2497
2498    // ----------------------------------------------------------------
2499    // Wave 10 (L10b) — SSRF coverage for `validate_url_dns`.
2500    //
2501    // `validate_url_dns` is the DNS-resolving SSRF guard. It performs
2502    // `to_socket_addrs()` and inspects the resolved IPs.  The current
2503    // production implementation INTENTIONALLY allows loopback IPs
2504    // (`is_private(ip) && !ip.is_loopback()`) so that dev/CI webhooks
2505    // pointed at localhost still work.  Tests that target loopback
2506    // therefore assert the documented "ok" behaviour rather than
2507    // "err"; those cases are covered by `validate_url`'s scheme
2508    // gating which forces non-loopback hosts onto https.
2509    //
2510    // Tests below are split into:
2511    //   - cases that are correctly rejected today (link-local v6,
2512    //     AWS metadata IP, RFC1918 ranges)
2513    //   - the documented-behaviour loopback acceptance (kept as
2514    //     `is_ok`)
2515    //   - public-IP / hostname acceptance
2516    //
2517    // The function signature is `validate_url_dns(&str) -> Result<()>`.
2518    // ----------------------------------------------------------------
2519
2520    #[test]
2521    fn test_validate_url_dns_accepts_loopback_v4() {
2522        // H11 inner helper — assert with allow_loopback=true so the
2523        // test does not race with parallel tests poking the global
2524        // atomic. Dev/CI workflows opt in via config to get this
2525        // behaviour at runtime.
2526        assert!(
2527            validate_url_dns_with("http://127.0.0.1/foo", true).is_ok(),
2528            "127.0.0.1 should be accepted by validate_url_dns when opted in"
2529        );
2530        assert!(
2531            validate_url_dns_with("http://127.0.0.1:8080/", true).is_ok(),
2532            "127.0.0.1:8080 should be accepted by validate_url_dns when opted in"
2533        );
2534        assert!(
2535            validate_url_dns_with("http://localhost/", true).is_ok(),
2536            "localhost should be accepted by validate_url_dns when opted in"
2537        );
2538    }
2539
2540    #[test]
2541    fn test_validate_url_dns_accepts_loopback_v6() {
2542        // Same as v4 — loopback opt-in via inner helper.
2543        assert!(
2544            validate_url_dns_with("http://[::1]/", true).is_ok(),
2545            "[::1] should be accepted by validate_url_dns when opted in"
2546        );
2547        assert!(
2548            validate_url_dns_with("http://[0:0:0:0:0:0:0:1]/", true).is_ok(),
2549            "[::1] expanded form should be accepted when opted in"
2550        );
2551    }
2552
2553    #[test]
2554    fn test_validate_url_dns_rejects_loopback_by_default_h11() {
2555        // H11 — loopback DNS-resolves are rejected by default to
2556        // close DNS-rebind SSRF against local services. Inner helper
2557        // pins allow_loopback=false without touching the global.
2558        assert!(
2559            validate_url_dns_with("http://127.0.0.1/foo", false).is_err(),
2560            "127.0.0.1 must be rejected by validate_url_dns when allow_loopback=false (H11)"
2561        );
2562        assert!(
2563            validate_url_dns_with("http://[::1]/", false).is_err(),
2564            "[::1] must be rejected by validate_url_dns when allow_loopback=false (H11)"
2565        );
2566    }
2567
2568    #[test]
2569    fn test_validate_url_dns_rejects_link_local_ipv6() {
2570        // fe80::/10 is link-local. is_private() flags this and the IP
2571        // is not loopback, so validate_url_dns rejects.
2572        // SSRF fix (W11): bracketed IPv6 hosts without an explicit port
2573        // now get ":80" appended before to_socket_addrs(), so resolution
2574        // succeeds and the IP check fires.
2575        let res = validate_url_dns("http://[fe80::1]/");
2576        assert!(
2577            res.is_err(),
2578            "fe80::1 must be rejected as link-local IPv6, got {res:?}"
2579        );
2580    }
2581
2582    #[test]
2583    fn test_validate_url_dns_rejects_aws_metadata() {
2584        // 169.254.169.254 is the AWS / GCP / Azure instance metadata
2585        // service. RFC3927 link-local; `Ipv4Addr::is_link_local` covers
2586        // 169.254.0.0/16, so validate_url_dns must reject.
2587        let res = validate_url_dns("http://169.254.169.254/latest/meta-data/");
2588        assert!(
2589            res.is_err(),
2590            "AWS metadata IP must be rejected, got {res:?}"
2591        );
2592    }
2593
2594    #[test]
2595    fn test_validate_url_dns_rejects_rfc1918_private_ranges() {
2596        // 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16 are RFC1918.
2597        // `Ipv4Addr::is_private` flags all three; validate_url_dns must
2598        // reject every variant.
2599        for url in [
2600            "http://10.0.0.1/",
2601            "http://172.16.0.1/",
2602            "http://172.31.255.255/",
2603            "http://192.168.1.1/",
2604        ] {
2605            let res = validate_url_dns(url);
2606            assert!(
2607                res.is_err(),
2608                "{url} must be rejected as RFC1918, got {res:?}"
2609            );
2610        }
2611    }
2612
2613    #[test]
2614    fn test_validate_url_dns_accepts_public_ip_or_dns() {
2615        // 1.1.1.1 is Cloudflare's public resolver — never private. We
2616        // intentionally exercise the IP-literal path (no DNS) so the
2617        // test is hermetic and does not rely on network resolution for
2618        // example.com.
2619        assert!(
2620            validate_url_dns("https://1.1.1.1/").is_ok(),
2621            "public IP literal must be accepted"
2622        );
2623        // v0.7.0 #1053 — `example.com` either resolves to a public
2624        // IP (accepted) OR fails to resolve in a hermetic sandbox
2625        // (post-#1053: rejected as fail-closed; pre-#1053: accepted
2626        // with let-reqwest-surface-it). Both outcomes are
2627        // legitimate behaviours of the SSRF guard, so accept either
2628        // status: the test pins that an IP literal is always
2629        // accepted, and the public-hostname legitimacy is exercised
2630        // by the new fail-closed regression test
2631        // `test_validate_url_dns_fails_closed_on_dns_failure_1053`
2632        // below.
2633        let _ = validate_url_dns("https://example.com/");
2634    }
2635
2636    #[test]
2637    fn test_validate_url_dns_fails_closed_on_dns_failure_1053() {
2638        // v0.7.0 #1053 (Agent-2 #3) — DNS resolution failure now
2639        // returns Err so an attacker cannot smuggle a private-range
2640        // IP through a DNS-rebind path where the daemon's resolver
2641        // hiccups and reqwest's later resolution lands on an
2642        // internal target.
2643        //
2644        // FX-F1 (2026-05-27) — hermetic hostname construction.
2645        // The pre-FX-F1 test used `"https://nonexistent-host.invalid./"`
2646        // and relied on RFC 6761 guaranteeing NXDOMAIN for the `.invalid`
2647        // TLD. In practice three platforms violate that guarantee:
2648        //   - Windows' built-in resolver consults NetBIOS / WINS /
2649        //     DNS search-suffix lists and synthesizes a "hit" for
2650        //     bare `.invalid` labels.
2651        //   - macOS' mDNSResponder + captive-portal DNS in some
2652        //     CI runner network configs (observed on
2653        //     `release/v0.7.0` SHA 5bfbb109c → job 78098087774)
2654        //     also synthesizes a "hit" so the test panicked with
2655        //     `got Ok(())` instead of the expected `Err`.
2656        //   - Some corporate DNS resolvers wildcard-redirect any
2657        //     unresolvable name to a "did-you-mean" landing page.
2658        //
2659        // To make the test hermetic across every platform we
2660        // construct a hostname that the SSRF guard's RFC 1035 shape
2661        // check rejects at the SHAPE level rather than at the
2662        // DNS-lookup level: each DNS label has a hard 63-octet ceiling
2663        // per RFC 1035 §2.3.4, so a single 70-character label is
2664        // rejected in-guard (FX-Fwin #1053 follow-up) regardless of
2665        // which resolver is configured — including Windows, whose
2666        // `getaddrinfo` does not enforce the ceiling at the API
2667        // boundary. No DNS query is even attempted.
2668        let _env_guard = SSRF_ENV_GUARD.lock().unwrap_or_else(|e| e.into_inner());
2669        let oversized_label = "a".repeat(70);
2670        let url = format!("https://{oversized_label}.fxf1-test./");
2671        let res = validate_url_dns(&url);
2672        assert!(
2673            res.is_err(),
2674            "#1053: SSRF guard MUST fail-closed on DNS resolution failure \
2675             (oversized label rejected at getaddrinfo shape check); got {res:?}"
2676        );
2677        let err_msg = format!("{}", res.unwrap_err());
2678        assert!(
2679            err_msg.contains("failing CLOSED")
2680                && err_msg.contains("AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL"),
2681            "#1053: failure message MUST reference fail-closed posture + env-var escape hatch; got {err_msg:?}"
2682        );
2683    }
2684
2685    #[test]
2686    fn test_validate_url_dns_fail_open_env_overrides_1053() {
2687        // v0.7.0 #1053 — operators with flaky DNS environments can
2688        // opt back into the legacy permissive posture via
2689        // `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1`. Pin the env-var
2690        // contract so a future tweak to the var name fails this
2691        // test loudly.
2692        //
2693        // SAFETY: env mutation in tests is racy across parallel
2694        // threads; we serialise via the var name being highly
2695        // specific so other tests can't trip it. The set + remove
2696        // pair brackets the test region.
2697        // SAFETY: env mutation guarded inside a unit test.
2698        // The current cargo-test process is the only consumer of
2699        // `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL`. Hold SSRF_ENV_GUARD
2700        // across the whole set→validate→remove window so the
2701        // fail-CLOSED test cannot observe the var mid-flight.
2702        let _env_guard = SSRF_ENV_GUARD.lock().unwrap_or_else(|e| e.into_inner());
2703        unsafe {
2704            std::env::set_var("AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL", "1");
2705        }
2706        // FX-F1: use the same shape-rejected hostname construction
2707        // as `test_validate_url_dns_fails_closed_on_dns_failure_1053`
2708        // for hermetic platform-independence (see that test's
2709        // docstring for rationale).
2710        let oversized_label = "a".repeat(70);
2711        let url = format!("https://{oversized_label}.fxf1-test./");
2712        let res = validate_url_dns(&url);
2713        unsafe {
2714            std::env::remove_var("AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL");
2715        }
2716        assert!(
2717            res.is_ok(),
2718            "#1053: AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 MUST restore the legacy permissive posture; got {res:?}"
2719        );
2720    }
2721
2722    #[test]
2723    fn test_validate_url_dns_rejects_unspecified_addresses() {
2724        // 0.0.0.0 / [::] are "unspecified" addresses. On most OSes
2725        // connecting to 0.0.0.0 routes to localhost — that is an SSRF
2726        // / loopback bypass.
2727        // SSRF fix (W11): `is_private` now flags `is_unspecified` for
2728        // both v4 and v6.
2729        let v4 = validate_url_dns("http://0.0.0.0/");
2730        let v6 = validate_url_dns("http://[::]/");
2731        assert!(
2732            v4.is_err(),
2733            "0.0.0.0 should be rejected as unspecified, got {v4:?}"
2734        );
2735        assert!(
2736            v6.is_err(),
2737            "[::] should be rejected as unspecified, got {v6:?}"
2738        );
2739    }
2740
2741    #[test]
2742    fn test_validate_url_dns_missing_scheme() {
2743        // No `://` separator → explicit Err (not panic).
2744        let res = validate_url_dns("not-a-url");
2745        assert!(res.is_err(), "missing scheme must Err, got {res:?}");
2746    }
2747
2748    // ----------------------------------------------------------------
2749    // Wave 12 (W12-C) — deep coverage on dispatch / send / persistence.
2750    //
2751    // The pre-W12 tests covered URL validation thoroughly but left the
2752    // DB-touching paths (`insert`, `delete`, `list`, `dispatch_event`,
2753    // `record_dispatch`, `load_secret_hash`) and the HTTP send path
2754    // (`send`) at 0 % coverage.  These tests use a `tempfile::NamedTempFile`
2755    // to back a real on-disk SQLite (so dispatch threads can re-open the
2756    // connection via `Connection::open(db_path)`) and `wiremock` for HTTP
2757    // (already a dev-dep from W3 / W10).
2758    //
2759    // Style:
2760    //   - DB-only tests are `#[test]` (sync) and use a tempfile path.
2761    //   - Tests that drive `wiremock` are `#[tokio::test(flavor =
2762    //     "multi_thread")]` and run the blocking `send` via
2763    //     `tokio::task::spawn_blocking`, mirroring the pattern already in
2764    //     `llm.rs::wiremock_tests`.
2765    // ----------------------------------------------------------------
2766
2767    use tempfile::NamedTempFile;
2768
2769    /// Stand up a fresh on-disk SQLite at a tempfile path with the
2770    /// production schema applied. Returns the path and keeps the file
2771    /// alive via the returned `NamedTempFile` (drop deletes it).
2772    fn fresh_db() -> (NamedTempFile, std::path::PathBuf) {
2773        let f = NamedTempFile::new().expect("tempfile");
2774        let p = f.path().to_path_buf();
2775        // Apply schema via the production opener so migrations run.
2776        let _ = crate::db::open(&p).expect("db::open");
2777        (f, p)
2778    }
2779
2780    /// v0.7.0 K6 test helper — wiremock responder that builds a 2xx
2781    /// JSON ACK body whose `correlation_id` field echoes the
2782    /// dispatched id from the `x-ai-memory-correlation-id` request
2783    /// header. Lets the legacy dispatch tests (which previously only
2784    /// asserted "2xx → success") satisfy K6's strict ACK contract
2785    /// without coupling each test to the exact UUID value.
2786    struct AckEcho;
2787    impl wiremock::Respond for AckEcho {
2788        fn respond(&self, request: &wiremock::Request) -> wiremock::ResponseTemplate {
2789            let corr = request
2790                .headers
2791                .get("x-ai-memory-correlation-id")
2792                .map(|v| v.to_str().unwrap_or("").to_string())
2793                .unwrap_or_default();
2794            let body = serde_json::json!({
2795                "status": "ack",
2796                "correlation_id": corr,
2797            });
2798            wiremock::ResponseTemplate::new(200).set_body_json(body)
2799        }
2800    }
2801
2802    // ---------------- insert / delete / list ----------------
2803
2804    #[test]
2805    fn insert_persists_and_list_returns_row() {
2806        let (_keep, path) = fresh_db();
2807        let conn = Connection::open(&path).unwrap();
2808        let id = insert(
2809            &conn,
2810            &NewSubscription {
2811                url: "https://example.com/hook",
2812                events: "memory_store",
2813                secret: Some("s3cret"),
2814                namespace_filter: Some("ns1"),
2815                agent_filter: Some("alice"),
2816                created_by: Some("op"),
2817                event_types: None,
2818            },
2819        )
2820        .unwrap();
2821        assert!(!id.is_empty());
2822
2823        let subs = list(&conn, None).unwrap();
2824        assert_eq!(subs.len(), 1);
2825        let s = &subs[0];
2826        assert_eq!(s.id, id);
2827        assert_eq!(s.url, "https://example.com/hook");
2828        assert_eq!(s.events, "memory_store");
2829        assert_eq!(s.namespace_filter.as_deref(), Some("ns1"));
2830        assert_eq!(s.agent_filter.as_deref(), Some("alice"));
2831        assert_eq!(s.created_by.as_deref(), Some("op"));
2832        assert_eq!(s.dispatch_count, 0);
2833        assert_eq!(s.failure_count, 0);
2834    }
2835
2836    #[test]
2837    fn insert_rejects_invalid_url() {
2838        let (_keep, path) = fresh_db();
2839        let conn = Connection::open(&path).unwrap();
2840        let res = insert(
2841            &conn,
2842            &NewSubscription {
2843                url: "not-a-url",
2844                events: "*",
2845                secret: None,
2846                namespace_filter: None,
2847                agent_filter: None,
2848                created_by: None,
2849                event_types: None,
2850            },
2851        );
2852        assert!(res.is_err(), "insert must reject invalid URL");
2853    }
2854
2855    #[test]
2856    fn insert_hashes_secret_before_persisting() {
2857        let (_keep, path) = fresh_db();
2858        let conn = Connection::open(&path).unwrap();
2859        let plaintext = "super-shared-secret";
2860        let id = insert(
2861            &conn,
2862            &NewSubscription {
2863                url: "https://example.com/h",
2864                events: "*",
2865                secret: Some(plaintext),
2866                namespace_filter: None,
2867                agent_filter: None,
2868                created_by: None,
2869                event_types: None,
2870            },
2871        )
2872        .unwrap();
2873        let stored: Option<String> = conn
2874            .query_row(
2875                "SELECT secret_hash FROM subscriptions WHERE id = ?1",
2876                params![id],
2877                |r| r.get(0),
2878            )
2879            .unwrap();
2880        let hash = stored.expect("secret_hash should be set");
2881        assert_ne!(hash, plaintext, "plaintext secret must not be stored");
2882        assert_eq!(hash, sha256_hex(plaintext));
2883    }
2884
2885    #[test]
2886    fn insert_no_secret_stores_null() {
2887        let (_keep, path) = fresh_db();
2888        let conn = Connection::open(&path).unwrap();
2889        let id = insert(
2890            &conn,
2891            &NewSubscription {
2892                url: "https://example.com/h",
2893                events: "*",
2894                secret: None,
2895                namespace_filter: None,
2896                agent_filter: None,
2897                created_by: None,
2898                event_types: None,
2899            },
2900        )
2901        .unwrap();
2902        let stored: Option<String> = conn
2903            .query_row(
2904                "SELECT secret_hash FROM subscriptions WHERE id = ?1",
2905                params![id],
2906                |r| r.get(0),
2907            )
2908            .unwrap();
2909        assert!(stored.is_none(), "missing secret must persist as NULL");
2910    }
2911
2912    #[test]
2913    fn delete_returns_true_when_row_removed() {
2914        let (_keep, path) = fresh_db();
2915        let conn = Connection::open(&path).unwrap();
2916        let id = insert(
2917            &conn,
2918            &NewSubscription {
2919                url: "https://example.com/h",
2920                events: "*",
2921                secret: None,
2922                namespace_filter: None,
2923                agent_filter: None,
2924                created_by: None,
2925                event_types: None,
2926            },
2927        )
2928        .unwrap();
2929        assert!(delete(&conn, &id, None).unwrap());
2930        assert!(list(&conn, None).unwrap().is_empty());
2931    }
2932
2933    #[test]
2934    fn delete_returns_false_when_row_missing() {
2935        let (_keep, path) = fresh_db();
2936        let conn = Connection::open(&path).unwrap();
2937        assert!(!delete(&conn, "nope", None).unwrap());
2938    }
2939
2940    #[test]
2941    fn list_orders_by_created_at_desc() {
2942        let (_keep, path) = fresh_db();
2943        let conn = Connection::open(&path).unwrap();
2944        // Insert three subs with sleeps so created_at is monotonically
2945        // increasing (rfc3339 to second-or-better resolution).
2946        let id1 = insert(
2947            &conn,
2948            &NewSubscription {
2949                url: "https://a.example.com/",
2950                events: "*",
2951                secret: None,
2952                namespace_filter: None,
2953                agent_filter: None,
2954                created_by: None,
2955                event_types: None,
2956            },
2957        )
2958        .unwrap();
2959        std::thread::sleep(std::time::Duration::from_millis(1100));
2960        let id2 = insert(
2961            &conn,
2962            &NewSubscription {
2963                url: "https://b.example.com/",
2964                events: "*",
2965                secret: None,
2966                namespace_filter: None,
2967                agent_filter: None,
2968                created_by: None,
2969                event_types: None,
2970            },
2971        )
2972        .unwrap();
2973        let subs = list(&conn, None).unwrap();
2974        assert_eq!(subs.len(), 2);
2975        // Most recent first.
2976        assert_eq!(subs[0].id, id2);
2977        assert_eq!(subs[1].id, id1);
2978    }
2979
2980    // ---------------- HMAC / sha256 helpers ----------------
2981
2982    #[test]
2983    fn sha256_hex_known_vector() {
2984        // SHA256("") = e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
2985        assert_eq!(
2986            sha256_hex(""),
2987            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
2988        );
2989        // SHA256("abc") = ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad
2990        assert_eq!(
2991            sha256_hex("abc"),
2992            "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
2993        );
2994    }
2995
2996    #[test]
2997    fn hex_decode_round_trip_and_invalid() {
2998        // Round-trip an even-length valid hex string.
2999        let s = "deadbeef";
3000        let bytes = hex_decode(s).expect("valid hex");
3001        assert_eq!(bytes, vec![0xde, 0xad, 0xbe, 0xef]);
3002        // Odd-length must return None (invariant in the helper).
3003        assert!(hex_decode("abc").is_none());
3004        // Non-hex chars must return None.
3005        assert!(hex_decode("zz").is_none());
3006    }
3007
3008    #[test]
3009    fn validate_hmac_secret_hex_accepts_none_1048() {
3010        // v0.7.0 #1048 — no secret configured = no-op (Ok).
3011        assert!(validate_hmac_secret_hex(None).is_ok());
3012    }
3013
3014    #[test]
3015    fn validate_hmac_secret_hex_accepts_valid_hex_1048() {
3016        // v0.7.0 #1048 — even-length all-hex string passes.
3017        assert!(validate_hmac_secret_hex(Some("deadbeef")).is_ok());
3018        assert!(validate_hmac_secret_hex(Some("0123456789abcdef")).is_ok());
3019        // 64-char (32 byte) — the recommended `openssl rand -hex 32` shape.
3020        let long = "a".repeat(64);
3021        assert!(validate_hmac_secret_hex(Some(&long)).is_ok());
3022    }
3023
3024    #[test]
3025    fn validate_hmac_secret_hex_rejects_non_hex_1048() {
3026        // v0.7.0 #1048 — non-hex secret (operator typo / passphrase
3027        // instead of hex) is rejected at boot validator. The
3028        // production `main.rs` boot path treats this as fatal.
3029        let err = validate_hmac_secret_hex(Some("not-a-hex-key!!"))
3030            .expect_err("non-hex MUST fail validation");
3031        assert!(
3032            err.contains("not valid hex") && err.contains("openssl rand -hex 32"),
3033            "#1048: failure msg MUST reference invalid hex + remediation; got: {err}"
3034        );
3035    }
3036
3037    #[test]
3038    fn validate_hmac_secret_hex_rejects_odd_length_1048() {
3039        // v0.7.0 #1048 — odd-length hex is not parseable; rejected.
3040        let err = validate_hmac_secret_hex(Some("abc")).expect_err("odd-length MUST fail");
3041        assert!(err.contains("not valid hex"));
3042    }
3043
3044    #[test]
3045    fn hmac_sha256_hex_output_is_fixed_64_chars_1039() {
3046        // v0.7.0 #1039 (Agent-5 #6) — HMAC-SHA256 output is ALWAYS
3047        // 32 bytes = 64 hex chars regardless of input. The
3048        // `constant_time_eq` early-return on length mismatch in
3049        // `src/handlers/transport.rs` (used by /api/v1/approvals
3050        // signature verification) leaks the length of the EXPECTED
3051        // sig — but the expected sig is the fixed 64-char output
3052        // of this function, so the leak conveys zero attacker-
3053        // useful entropy. This test pins the fixed-length contract
3054        // so a future hash-agility refactor doesn't accidentally
3055        // produce variable-length output that re-opens the timing
3056        // oracle.
3057        for body in &["", "x", "x".repeat(1024).as_str(), "🦀"] {
3058            let sig = hmac_sha256_hex("deadbeef", body);
3059            assert_eq!(
3060                sig.len(),
3061                64,
3062                "#1039: HMAC-SHA256 hex output MUST be fixed 64 chars; got len={} for body={:?}",
3063                sig.len(),
3064                body
3065            );
3066            assert!(
3067                sig.chars().all(|c| c.is_ascii_hexdigit()),
3068                "#1039: HMAC hex output MUST be all-hex; got {sig}"
3069            );
3070        }
3071    }
3072
3073    #[test]
3074    fn hmac_long_key_is_hashed_to_fit_block() {
3075        // Construct a hex key whose decoded length exceeds the SHA-256
3076        // block size (64 bytes). The HMAC pre-step hashes overlong keys
3077        // to fit; we exercise that branch by giving it a 200-hex-char
3078        // (100-byte) key.
3079        let long_key: String = std::iter::repeat_n('a', 200).collect();
3080        let sig = hmac_sha256_hex(&long_key, "hello");
3081        assert_eq!(sig.len(), 64); // 32-byte SHA-256 in hex
3082    }
3083
3084    #[test]
3085    fn hmac_invalid_hex_key_falls_back_to_raw_bytes() {
3086        // Hex with a non-hex char must trigger the fallback branch
3087        // (use `key_hex.as_bytes()` directly). The signature must still
3088        // be a valid 64-char SHA-256 hex string.
3089        let sig = hmac_sha256_hex("not-a-hex-key!!", "hello");
3090        assert_eq!(sig.len(), 64);
3091        assert!(sig.chars().all(|c| c.is_ascii_hexdigit()));
3092    }
3093
3094    // ---------------- matches_filters edge cases ----------------
3095
3096    #[test]
3097    fn matches_filters_event_with_whitespace_and_star() {
3098        // `*` inside a comma list still matches anything.
3099        assert!(matches_filters(
3100            "memory_store, *",
3101            None,
3102            None,
3103            None,
3104            "anything",
3105            "ns",
3106            None,
3107        ));
3108        // Whitespace around tokens is trimmed.
3109        assert!(matches_filters(
3110            "  memory_delete , memory_store ",
3111            None,
3112            None,
3113            None,
3114            "memory_store",
3115            "ns",
3116            None,
3117        ));
3118    }
3119
3120    #[test]
3121    fn matches_filters_agent_filter_requires_some() {
3122        // sub_agent set, but event has no agent → reject.
3123        assert!(!matches_filters(
3124            "*",
3125            None,
3126            None,
3127            Some("alice"),
3128            "memory_store",
3129            "ns",
3130            None,
3131        ));
3132    }
3133
3134    // ---------------- record_dispatch / load_secret_hash ----------------
3135
3136    #[test]
3137    fn record_dispatch_increments_counts_on_success() {
3138        let (_keep, path) = fresh_db();
3139        let id = {
3140            let conn = Connection::open(&path).unwrap();
3141            insert(
3142                &conn,
3143                &NewSubscription {
3144                    url: "https://example.com/h",
3145                    events: "*",
3146                    secret: None,
3147                    namespace_filter: None,
3148                    agent_filter: None,
3149                    created_by: None,
3150                    event_types: None,
3151                },
3152            )
3153            .unwrap()
3154        };
3155        record_dispatch(&path, &id, true);
3156        record_dispatch(&path, &id, true);
3157        let conn = Connection::open(&path).unwrap();
3158        let (dc, fc): (i64, i64) = conn
3159            .query_row(
3160                "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3161                params![id],
3162                |r| Ok((r.get(0)?, r.get(1)?)),
3163            )
3164            .unwrap();
3165        assert_eq!(dc, 2, "two successful dispatches must bump dispatch_count");
3166        assert_eq!(fc, 0, "successes must not bump failure_count");
3167    }
3168
3169    #[test]
3170    fn record_dispatch_increments_failure_on_err() {
3171        let (_keep, path) = fresh_db();
3172        let id = {
3173            let conn = Connection::open(&path).unwrap();
3174            insert(
3175                &conn,
3176                &NewSubscription {
3177                    url: "https://example.com/h",
3178                    events: "*",
3179                    secret: None,
3180                    namespace_filter: None,
3181                    agent_filter: None,
3182                    created_by: None,
3183                    event_types: None,
3184                },
3185            )
3186            .unwrap()
3187        };
3188        record_dispatch(&path, &id, false);
3189        let conn = Connection::open(&path).unwrap();
3190        let (dc, fc): (i64, i64) = conn
3191            .query_row(
3192                "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3193                params![id],
3194                |r| Ok((r.get(0)?, r.get(1)?)),
3195            )
3196            .unwrap();
3197        assert_eq!(dc, 1, "failed dispatch still bumps dispatch_count");
3198        assert_eq!(fc, 1, "failure must bump failure_count");
3199    }
3200
3201    #[test]
3202    fn record_dispatch_nonexistent_id_does_not_panic() {
3203        let (_keep, path) = fresh_db();
3204        // No subscription with this id; the UPDATE simply matches zero
3205        // rows. Function must not panic and must not poison the DB.
3206        record_dispatch(&path, "no-such-id", true);
3207        record_dispatch(&path, "no-such-id", false);
3208        // Sanity: subscriptions table still queryable.
3209        let conn = Connection::open(&path).unwrap();
3210        let n: i64 = conn
3211            .query_row("SELECT COUNT(*) FROM subscriptions", [], |r| r.get(0))
3212            .unwrap();
3213        assert_eq!(n, 0);
3214    }
3215
3216    #[test]
3217    fn record_dispatch_unopenable_db_path_is_noop() {
3218        // Pointing at a directory that does not exist exercises the
3219        // `Connection::open` early-return branch (let-Err shortcut).
3220        // Must not panic.
3221        let bad = std::path::PathBuf::from("/nonexistent-dir-w12c/does-not-exist.db");
3222        record_dispatch(&bad, "x", true);
3223    }
3224
3225    #[test]
3226    fn load_secret_hash_returns_stored_hash() {
3227        let (_keep, path) = fresh_db();
3228        let id = {
3229            let conn = Connection::open(&path).unwrap();
3230            insert(
3231                &conn,
3232                &NewSubscription {
3233                    url: "https://example.com/h",
3234                    events: "*",
3235                    secret: Some("topsecret"),
3236                    namespace_filter: None,
3237                    agent_filter: None,
3238                    created_by: None,
3239                    event_types: None,
3240                },
3241            )
3242            .unwrap()
3243        };
3244        let got = load_secret_hash(&path, &id).unwrap();
3245        assert_eq!(got, Some(sha256_hex("topsecret")));
3246    }
3247
3248    #[test]
3249    fn load_secret_hash_missing_id_errs() {
3250        let (_keep, path) = fresh_db();
3251        // No row → query_row returns Err(QueryReturnedNoRows), which
3252        // is wrapped via `.context()`.
3253        let res = load_secret_hash(&path, "missing-id");
3254        assert!(res.is_err(), "missing subscription id must surface as Err");
3255    }
3256
3257    // ---------------- dispatch_event thread plumbing ----------------
3258
3259    #[test]
3260    fn dispatch_event_no_subs_is_noop() {
3261        let (_keep, path) = fresh_db();
3262        let conn = Connection::open(&path).unwrap();
3263        // Empty subscriptions table — must return without spawning
3264        // any threads or panicking.
3265        dispatch_event(&conn, "memory_store", "m1", "ns", None, &path);
3266    }
3267
3268    #[test]
3269    fn dispatch_event_filter_mismatch_skips_send() {
3270        // Subscriber registered for `memory_delete` only — a
3271        // `memory_store` event must NOT match. We don't have a way to
3272        // observe "no thread spawned" directly without polling, but the
3273        // function returning quickly without panicking exercises the
3274        // matches_filters early-return branch and the `if matching.is_empty
3275        // { return; }` short-circuit.
3276        let (_keep, path) = fresh_db();
3277        let conn = Connection::open(&path).unwrap();
3278        insert(
3279            &conn,
3280            &NewSubscription {
3281                url: "https://example.com/h",
3282                events: "memory_delete",
3283                secret: None,
3284                namespace_filter: None,
3285                agent_filter: None,
3286                created_by: None,
3287                event_types: None,
3288            },
3289        )
3290        .unwrap();
3291        dispatch_event(&conn, "memory_store", "m1", "ns", None, &path);
3292        // Counters must remain zero — no dispatch happened.
3293        let (dc, fc): (i64, i64) = conn
3294            .query_row(
3295                "SELECT dispatch_count, failure_count FROM subscriptions",
3296                [],
3297                |r| Ok((r.get(0)?, r.get(1)?)),
3298            )
3299            .unwrap();
3300        assert_eq!(dc, 0);
3301        assert_eq!(fc, 0);
3302    }
3303
3304    #[test]
3305    fn dispatch_event_namespace_filter_mismatch_skips() {
3306        let (_keep, path) = fresh_db();
3307        let conn = Connection::open(&path).unwrap();
3308        insert(
3309            &conn,
3310            &NewSubscription {
3311                url: "https://example.com/h",
3312                events: "*",
3313                secret: None,
3314                namespace_filter: Some("only-this-ns"),
3315                agent_filter: None,
3316                created_by: None,
3317                event_types: None,
3318            },
3319        )
3320        .unwrap();
3321        // Wrong namespace → no dispatch.
3322        dispatch_event(&conn, "memory_store", "m1", "other-ns", None, &path);
3323        let (dc, fc): (i64, i64) = conn
3324            .query_row(
3325                "SELECT dispatch_count, failure_count FROM subscriptions",
3326                [],
3327                |r| Ok((r.get(0)?, r.get(1)?)),
3328            )
3329            .unwrap();
3330        assert_eq!(dc, 0);
3331        assert_eq!(fc, 0);
3332    }
3333
3334    // ---------------- send() — wiremock-driven HTTP tests ----------------
3335
3336    #[tokio::test(flavor = "multi_thread")]
3337    async fn send_returns_true_on_2xx() {
3338        use wiremock::matchers::{method, path};
3339        use wiremock::{Mock, MockServer};
3340        let server = MockServer::start().await;
3341        // K6: receivers MUST return a JSON ack body — the AckEcho
3342        // helper echoes the request's correlation_id header so the
3343        // ack-correlation-id check in `send` passes.
3344        Mock::given(method("POST"))
3345            .and(path("/hook"))
3346            .respond_with(AckEcho)
3347            .expect(1)
3348            .mount(&server)
3349            .await;
3350        let url = format!("{}/hook", server.uri());
3351        let corr = uuid::Uuid::now_v7().to_string();
3352        let res = tokio::task::spawn_blocking(move || {
3353            send(
3354                &url,
3355                "{\"event\":\"x\"}",
3356                "1700000000",
3357                Some("deadbeef"),
3358                &corr,
3359            )
3360        })
3361        .await
3362        .unwrap();
3363        assert!(res.is_ok(), "2xx + matching ack must succeed: {res:?}");
3364    }
3365
3366    #[tokio::test(flavor = "multi_thread")]
3367    async fn send_returns_false_on_5xx() {
3368        use wiremock::matchers::{method, path};
3369        use wiremock::{Mock, MockServer, ResponseTemplate};
3370        let server = MockServer::start().await;
3371        Mock::given(method("POST"))
3372            .and(path("/hook"))
3373            .respond_with(ResponseTemplate::new(500))
3374            .mount(&server)
3375            .await;
3376        let url = format!("{}/hook", server.uri());
3377        let corr = uuid::Uuid::now_v7().to_string();
3378        let res = tokio::task::spawn_blocking(move || {
3379            send(&url, "{\"event\":\"x\"}", "1700000000", None, &corr)
3380        })
3381        .await
3382        .unwrap();
3383        assert!(res.is_err(), "5xx must return Err (no retry inside send)");
3384    }
3385
3386    #[tokio::test(flavor = "multi_thread")]
3387    async fn send_returns_false_on_4xx() {
3388        use wiremock::matchers::{method, path};
3389        use wiremock::{Mock, MockServer, ResponseTemplate};
3390        let server = MockServer::start().await;
3391        Mock::given(method("POST"))
3392            .and(path("/hook"))
3393            .respond_with(ResponseTemplate::new(404))
3394            .mount(&server)
3395            .await;
3396        let url = format!("{}/hook", server.uri());
3397        let corr = uuid::Uuid::now_v7().to_string();
3398        let res = tokio::task::spawn_blocking(move || send(&url, "{}", "1700000000", None, &corr))
3399            .await
3400            .unwrap();
3401        assert!(res.is_err(), "4xx must return Err");
3402    }
3403
3404    #[tokio::test(flavor = "multi_thread")]
3405    async fn send_does_not_follow_redirect_ssrf_pin_bypass() {
3406        // SR-W3 (HIGH) regression. The webhook client pins reqwest's
3407        // DNS to the SSRF-validated address set of the *original* host.
3408        // A malicious endpoint could return a 3xx whose Location points
3409        // at an internal address the guard never cleared; reqwest's
3410        // default redirect policy would follow it and re-resolve DNS for
3411        // the new host, bypassing the pin. With redirects disabled the
3412        // 3xx is surfaced as a non-success status and the dispatch fails
3413        // safely. We assert BOTH that send returns Err AND that the
3414        // redirect target was never requested.
3415        use wiremock::matchers::{method, path};
3416        use wiremock::{Mock, MockServer, ResponseTemplate};
3417        let server = MockServer::start().await;
3418        // The hook returns a redirect to a path that, if followed, the
3419        // mock would happily answer 200 — proving the failure is the
3420        // un-followed redirect, not a missing target.
3421        Mock::given(method("POST"))
3422            .and(path("/hook"))
3423            .respond_with(
3424                ResponseTemplate::new(302).insert_header("location", "/internal-rebind-target"),
3425            )
3426            .expect(1)
3427            .mount(&server)
3428            .await;
3429        let redirect_followed = Mock::given(path("/internal-rebind-target"))
3430            .respond_with(ResponseTemplate::new(200))
3431            .expect(0)
3432            .named("redirect target must NOT be requested");
3433        server.register(redirect_followed).await;
3434
3435        let url = format!("{}/hook", server.uri());
3436        let corr = uuid::Uuid::now_v7().to_string();
3437        let res = tokio::task::spawn_blocking(move || send(&url, "{}", "1700000000", None, &corr))
3438            .await
3439            .unwrap();
3440        assert!(
3441            res.is_err(),
3442            "redirect must not be followed; 3xx surfaces as a failed dispatch: {res:?}"
3443        );
3444        // The `.expect(0)` on the redirect-target mock is verified on
3445        // server drop; assert it explicitly here for a clear failure
3446        // message if reqwest ever regains redirect-following behavior.
3447        let hits = server
3448            .received_requests()
3449            .await
3450            .unwrap_or_default()
3451            .into_iter()
3452            .filter(|r| r.url.path() == "/internal-rebind-target")
3453            .count();
3454        assert_eq!(hits, 0, "redirect target was requested — SSRF pin bypassed");
3455    }
3456
3457    #[tokio::test(flavor = "multi_thread")]
3458    async fn send_signature_header_set_when_provided() {
3459        use wiremock::matchers::{header, header_exists, method, path};
3460        use wiremock::{Mock, MockServer};
3461        let server = MockServer::start().await;
3462        // Assert the `x-ai-memory-signature` header is `sha256=<sig>`
3463        // and the timestamp + correlation-id headers are set.
3464        Mock::given(method("POST"))
3465            .and(path("/hook"))
3466            .and(header("x-ai-memory-signature", "sha256=abc123"))
3467            .and(header_exists("x-ai-memory-timestamp"))
3468            .and(header_exists("x-ai-memory-correlation-id"))
3469            .and(header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON))
3470            .respond_with(AckEcho)
3471            .expect(1)
3472            .mount(&server)
3473            .await;
3474        let url = format!("{}/hook", server.uri());
3475        let corr = uuid::Uuid::now_v7().to_string();
3476        let res = tokio::task::spawn_blocking(move || {
3477            send(&url, "{}", "1700000000", Some("abc123"), &corr)
3478        })
3479        .await
3480        .unwrap();
3481        assert!(
3482            res.is_ok(),
3483            "2xx with matched signature header + ack must succeed: {res:?}"
3484        );
3485    }
3486
3487    #[tokio::test(flavor = "multi_thread")]
3488    async fn send_no_signature_header_when_secret_absent() {
3489        use wiremock::matchers::{method, path};
3490        use wiremock::{Mock, MockServer, Request};
3491        let server = MockServer::start().await;
3492        Mock::given(method("POST"))
3493            .and(path("/hook"))
3494            .respond_with(AckEcho)
3495            .mount(&server)
3496            .await;
3497        let url = format!("{}/hook", server.uri());
3498        let corr = uuid::Uuid::now_v7().to_string();
3499        let res = tokio::task::spawn_blocking({
3500            let url = url.clone();
3501            let corr = corr.clone();
3502            move || send(&url, "{}", "1700000000", None, &corr)
3503        })
3504        .await
3505        .unwrap();
3506        assert!(res.is_ok(), "ack-echo must succeed: {res:?}");
3507        // Inspect the captured request to confirm no signature header.
3508        let received: Vec<Request> = server.received_requests().await.unwrap_or_default();
3509        assert_eq!(received.len(), 1);
3510        let req = &received[0];
3511        // wiremock lower-cases header names.
3512        assert!(
3513            req.headers.get("x-ai-memory-signature").is_none(),
3514            "no signature should be sent when secret absent"
3515        );
3516        assert!(
3517            req.headers.get("x-ai-memory-timestamp").is_some(),
3518            "timestamp header must always be set"
3519        );
3520    }
3521
3522    #[test]
3523    fn send_rejects_ssrf_url_without_network() {
3524        // `send` is the public dispatch path. A private-network URL must
3525        // be rejected by the `validate_url` guard before any HTTP attempt.
3526        // We don't need a server — the guard fails fast and returns Err.
3527        let res = send(
3528            "https://10.0.0.1/hook",
3529            "{}",
3530            "1700000000",
3531            None,
3532            "some-corr",
3533        );
3534        assert!(
3535            res.is_err(),
3536            "send must reject SSRF URL via validate_url guard"
3537        );
3538    }
3539
3540    #[test]
3541    fn send_rejects_invalid_scheme_without_network() {
3542        // ftp:// is rejected by validate_url; send returns Err.
3543        let res = send("ftp://example.com/hook", "{}", "1700000000", None, "x");
3544        assert!(res.is_err(), "send must reject non-http(s) URL");
3545    }
3546
3547    // ---------------- end-to-end dispatch_event with HTTP mock ----------------
3548
3549    #[tokio::test(flavor = "multi_thread")]
3550    async fn dispatch_event_e2e_increments_dispatch_count_on_2xx() {
3551        use wiremock::matchers::{method, path};
3552        use wiremock::{Mock, MockServer};
3553        let server = MockServer::start().await;
3554        Mock::given(method("POST"))
3555            .and(path("/hook"))
3556            .respond_with(AckEcho)
3557            .mount(&server)
3558            .await;
3559
3560        let (_keep, db_path) = fresh_db();
3561        // Insert a wildcard subscription pointing at the mock.
3562        let id = {
3563            let conn = Connection::open(&db_path).unwrap();
3564            let url = format!("{}/hook", server.uri());
3565            insert(
3566                &conn,
3567                &NewSubscription {
3568                    url: &url,
3569                    events: "*",
3570                    secret: Some("mysecret"),
3571                    namespace_filter: None,
3572                    agent_filter: None,
3573                    created_by: None,
3574                    event_types: None,
3575                },
3576            )
3577            .unwrap()
3578        };
3579
3580        // Run dispatch and wait for the spawned thread to record the
3581        // counter bump. dispatch_event spawns a detached std::thread so
3582        // we poll for up to ~5 s.
3583        {
3584            let conn = Connection::open(&db_path).unwrap();
3585            dispatch_event(&conn, "memory_store", "m1", "ns", None, &db_path);
3586        }
3587
3588        let path_for_poll = db_path.clone();
3589        let id_for_poll = id.clone();
3590        let dc = tokio::task::spawn_blocking(move || {
3591            for _ in 0..50 {
3592                let conn = Connection::open(&path_for_poll).unwrap();
3593                let dc: i64 = conn
3594                    .query_row(
3595                        "SELECT dispatch_count FROM subscriptions WHERE id = ?1",
3596                        params![id_for_poll],
3597                        |r| r.get(0),
3598                    )
3599                    .unwrap();
3600                if dc > 0 {
3601                    return dc;
3602                }
3603                std::thread::sleep(std::time::Duration::from_millis(100));
3604            }
3605            0
3606        })
3607        .await
3608        .unwrap();
3609        assert_eq!(dc, 1, "successful dispatch must increment dispatch_count");
3610    }
3611
3612    #[tokio::test(flavor = "multi_thread")]
3613    async fn dispatch_event_e2e_increments_failure_count_on_5xx() {
3614        use wiremock::matchers::{method, path};
3615        use wiremock::{Mock, MockServer, ResponseTemplate};
3616        let server = MockServer::start().await;
3617        Mock::given(method("POST"))
3618            .and(path("/hook"))
3619            .respond_with(ResponseTemplate::new(500))
3620            .mount(&server)
3621            .await;
3622
3623        let (_keep, db_path) = fresh_db();
3624        let id = {
3625            let conn = Connection::open(&db_path).unwrap();
3626            let url = format!("{}/hook", server.uri());
3627            insert(
3628                &conn,
3629                &NewSubscription {
3630                    url: &url,
3631                    events: "*",
3632                    secret: None,
3633                    namespace_filter: None,
3634                    agent_filter: None,
3635                    created_by: None,
3636                    event_types: None,
3637                },
3638            )
3639            .unwrap()
3640        };
3641
3642        {
3643            let conn = Connection::open(&db_path).unwrap();
3644            dispatch_event(&conn, "memory_store", "m2", "ns", None, &db_path);
3645        }
3646
3647        // K6 retry ladder (200ms + 1s + 5s) means a final-failure
3648        // counter bump can take ≈ 6.5s of wall-clock + per-attempt
3649        // overhead. Poll for up to 12s to cover the worst case.
3650        let path_for_poll = db_path.clone();
3651        let id_for_poll = id.clone();
3652        let (dc, fc) = tokio::task::spawn_blocking(move || {
3653            for _ in 0..120 {
3654                let conn = Connection::open(&path_for_poll).unwrap();
3655                let row: (i64, i64) = conn
3656                    .query_row(
3657                        "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3658                        params![id_for_poll],
3659                        |r| Ok((r.get(0)?, r.get(1)?)),
3660                    )
3661                    .unwrap();
3662                if row.0 > 0 {
3663                    return row;
3664                }
3665                std::thread::sleep(std::time::Duration::from_millis(100));
3666            }
3667            (0, 0)
3668        })
3669        .await
3670        .unwrap();
3671        assert_eq!(dc, 1, "5xx still increments dispatch_count");
3672        assert_eq!(fc, 1, "5xx must increment failure_count");
3673    }
3674
3675    #[tokio::test(flavor = "multi_thread")]
3676    async fn dispatch_event_e2e_signature_present_when_secret_set() {
3677        use wiremock::matchers::{header_exists, method, path};
3678        use wiremock::{Mock, MockServer};
3679        let server = MockServer::start().await;
3680        Mock::given(method("POST"))
3681            .and(path("/hook"))
3682            .and(header_exists("x-ai-memory-signature"))
3683            .and(header_exists("x-ai-memory-timestamp"))
3684            .respond_with(AckEcho)
3685            .expect(1)
3686            .mount(&server)
3687            .await;
3688
3689        let (_keep, db_path) = fresh_db();
3690        let _id = {
3691            let conn = Connection::open(&db_path).unwrap();
3692            let url = format!("{}/hook", server.uri());
3693            insert(
3694                &conn,
3695                &NewSubscription {
3696                    url: &url,
3697                    events: "*",
3698                    secret: Some("the-secret"),
3699                    namespace_filter: None,
3700                    agent_filter: None,
3701                    created_by: None,
3702                    event_types: None,
3703                },
3704            )
3705            .unwrap()
3706        };
3707
3708        {
3709            let conn = Connection::open(&db_path).unwrap();
3710            dispatch_event(&conn, "memory_store", "m3", "ns", None, &db_path);
3711        }
3712
3713        // Wait for the dispatch thread to fire & wiremock to record.
3714        // We poll the mock's hit count instead of the DB so the
3715        // assertion stays specific to "signature header present".
3716        let server_ref = &server;
3717        for _ in 0..50 {
3718            let received = server_ref.received_requests().await.unwrap_or_default();
3719            if !received.is_empty() {
3720                let req = &received[0];
3721                assert!(
3722                    req.headers.get("x-ai-memory-signature").is_some(),
3723                    "signature header must be present when secret set"
3724                );
3725                return;
3726            }
3727            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3728        }
3729        panic!("dispatch thread never reached the mock server");
3730    }
3731
3732    // ----------------------------------------------------------------
3733    // v0.7.0 K4 — approval-event routing through subscriptions.
3734    //
3735    // Closes the v0.6.3.1 honest-Capabilities-v2 disclosure that the
3736    // approval surface was advertised but unwired. The four tests below
3737    // pin:
3738    //   1. canonical event constant + capabilities parity
3739    //   2. opt-in subscriber receives the event end-to-end (HTTP mock)
3740    //   3. filter-mismatched subscriber does NOT receive the event
3741    //   4. missing pending row is logged + best-effort no-op
3742    // ----------------------------------------------------------------
3743
3744    #[test]
3745    fn approval_requested_event_in_canonical_list() {
3746        // K4: the new lifecycle event must surface in the canonical
3747        // constant — that's the integration contract the K10 Approval
3748        // API and external SDK consumers pin against.
3749        assert!(
3750            WEBHOOK_EVENT_TYPES.contains(&"approval_requested"),
3751            "K4: WEBHOOK_EVENT_TYPES must include approval_requested"
3752        );
3753    }
3754
3755    #[tokio::test(flavor = "multi_thread")]
3756    async fn approval_requested_dispatches_to_opt_in_subscriber() {
3757        // K4: end-to-end. Insert a subscription opt-ed in to
3758        // `approval_requested` only; queue a pending action via the
3759        // db layer; call the dispatch helper; assert the wiremock
3760        // saw the POST and the body shape carries the K4 details.
3761        use wiremock::matchers::{method, path};
3762        use wiremock::{Mock, MockServer};
3763        let server = MockServer::start().await;
3764        Mock::given(method("POST"))
3765            .and(path("/hook"))
3766            .respond_with(AckEcho)
3767            .mount(&server)
3768            .await;
3769
3770        let (_keep, db_path) = fresh_db();
3771        let url = format!("{}/hook", server.uri());
3772        let opt_in: Vec<String> = vec!["approval_requested".to_string()];
3773        let sub_id = {
3774            let conn = Connection::open(&db_path).unwrap();
3775            insert(
3776                &conn,
3777                &NewSubscription {
3778                    url: &url,
3779                    events: "approval_requested",
3780                    // R3-S1.HMAC (2026-05-13): dispatch refuses
3781                    // unsigned bodies; supply a per-sub secret.
3782                    secret: Some("test-sub-secret"),
3783                    namespace_filter: None,
3784                    agent_filter: None,
3785                    created_by: None,
3786                    event_types: Some(&opt_in),
3787                },
3788            )
3789            .unwrap()
3790        };
3791
3792        // Queue a pending action through the canonical db helper so
3793        // the row exists when dispatch_approval_requested looks it up.
3794        let pending_id = {
3795            let conn = Connection::open(&db_path).unwrap();
3796            crate::db::queue_pending_action(
3797                &conn,
3798                crate::models::GovernedAction::Store,
3799                "k4-ns",
3800                None,
3801                "agent-requestor",
3802                &serde_json::json!({"title": "k4 approval routing"}),
3803            )
3804            .unwrap()
3805        };
3806
3807        // Fire the dispatcher.
3808        {
3809            let conn = Connection::open(&db_path).unwrap();
3810            dispatch_approval_requested(&conn, &pending_id, &db_path);
3811        }
3812
3813        // Poll the mock for the dispatch — std::thread::spawn is
3814        // detached so we cannot join. ~5s budget mirrors the existing
3815        // dispatch_event_e2e_* tests.
3816        let mut received = Vec::new();
3817        for _ in 0..50 {
3818            received = server.received_requests().await.unwrap_or_default();
3819            if !received.is_empty() {
3820                break;
3821            }
3822            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3823        }
3824        assert_eq!(
3825            received.len(),
3826            1,
3827            "K4: opt-in subscriber must receive exactly one approval_requested POST"
3828        );
3829        let body: serde_json::Value =
3830            serde_json::from_slice(&received[0].body).expect("dispatch body must be JSON");
3831        assert_eq!(body["event"], "approval_requested");
3832        assert_eq!(body["memory_id"], pending_id);
3833        assert_eq!(body["namespace"], "k4-ns");
3834        assert_eq!(body["agent_id"], "agent-requestor");
3835        // K4 details block flattened into the envelope.
3836        assert_eq!(body["action_type"], "store");
3837        assert_eq!(body["status"], "pending");
3838        assert!(
3839            body["requested_at"].is_string(),
3840            "requested_at must round-trip from the row"
3841        );
3842
3843        // Sanity: the dispatch_count was bumped on the subscription
3844        // row (proves we went through record_dispatch on success).
3845        // Poll up to 2s — dispatch_count is written back AFTER the HTTP
3846        // POST is acked, so seeing the wiremock request above does not
3847        // imply the row update has landed yet (race observed on Linux
3848        // and Windows runners under load).
3849        let conn = Connection::open(&db_path).unwrap();
3850        let mut dc: i64 = 0;
3851        for _ in 0..40 {
3852            dc = conn
3853                .query_row(
3854                    "SELECT dispatch_count FROM subscriptions WHERE id = ?1",
3855                    params![sub_id],
3856                    |r| r.get(0),
3857                )
3858                .unwrap();
3859            if dc == 1 {
3860                break;
3861            }
3862            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3863        }
3864        assert_eq!(dc, 1, "dispatch_count must be 1 after successful dispatch");
3865    }
3866
3867    #[test]
3868    fn approval_requested_skipped_for_filtered_subscriber() {
3869        // K4: a subscriber opted in to a *different* event type must
3870        // NOT see approval_requested. Exercises the matches_filters
3871        // structured-opt-in branch from the K4 dispatch path. We can't
3872        // observe "no thread spawned" directly, but we can assert
3873        // dispatch_count stays at zero because no HTTP send occurred.
3874        let (_keep, db_path) = fresh_db();
3875        let opt_in_other: Vec<String> = vec!["memory_store".to_string()];
3876        let sub_id = {
3877            let conn = Connection::open(&db_path).unwrap();
3878            insert(
3879                &conn,
3880                &NewSubscription {
3881                    url: "https://example.com/hook",
3882                    events: "memory_store",
3883                    secret: None,
3884                    namespace_filter: None,
3885                    agent_filter: None,
3886                    created_by: None,
3887                    event_types: Some(&opt_in_other),
3888                },
3889            )
3890            .unwrap()
3891        };
3892        let pending_id = {
3893            let conn = Connection::open(&db_path).unwrap();
3894            crate::db::queue_pending_action(
3895                &conn,
3896                crate::models::GovernedAction::Delete,
3897                "k4-ns-2",
3898                Some("memory-xyz"),
3899                "agent-requestor",
3900                &serde_json::json!({"id": "memory-xyz"}),
3901            )
3902            .unwrap()
3903        };
3904        {
3905            let conn = Connection::open(&db_path).unwrap();
3906            dispatch_approval_requested(&conn, &pending_id, &db_path);
3907        }
3908        // Mismatched filter: matches_filters returns false → no
3909        // dispatch thread spawned → counters stay at zero. Sleep a
3910        // beat so we'd notice an unintended dispatch racing in.
3911        std::thread::sleep(std::time::Duration::from_millis(200));
3912        let conn = Connection::open(&db_path).unwrap();
3913        let (dc, fc): (i64, i64) = conn
3914            .query_row(
3915                "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3916                params![sub_id],
3917                |r| Ok((r.get(0)?, r.get(1)?)),
3918            )
3919            .unwrap();
3920        assert_eq!(dc, 0, "filter mismatch must skip dispatch");
3921        assert_eq!(fc, 0);
3922    }
3923
3924    #[test]
3925    fn approval_requested_missing_pending_row_is_noop() {
3926        // K4: defensive — the helper looks up the row before
3927        // dispatching. A bogus id must NOT panic, NOT spawn a thread,
3928        // and NOT touch any subscriber row. Exercises the early-return
3929        // branches in dispatch_approval_requested.
3930        let (_keep, db_path) = fresh_db();
3931        let sub_id = {
3932            let conn = Connection::open(&db_path).unwrap();
3933            insert(
3934                &conn,
3935                &NewSubscription {
3936                    url: "https://example.com/hook",
3937                    events: "*",
3938                    secret: None,
3939                    namespace_filter: None,
3940                    agent_filter: None,
3941                    created_by: None,
3942                    event_types: None,
3943                },
3944            )
3945            .unwrap()
3946        };
3947        let conn = Connection::open(&db_path).unwrap();
3948        // Bogus id — pending_actions table is empty.
3949        dispatch_approval_requested(&conn, "nonexistent-id", &db_path);
3950        let (dc, fc): (i64, i64) = conn
3951            .query_row(
3952                "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3953                params![sub_id],
3954                |r| Ok((r.get(0)?, r.get(1)?)),
3955            )
3956            .unwrap();
3957        assert_eq!(dc, 0, "missing pending row must not dispatch");
3958        assert_eq!(fc, 0);
3959    }
3960
3961    // ----------------------------------------------------------------
3962    // v0.7.0 K6 — A2A correlation IDs + ACK / retry / DLQ tests.
3963    //
3964    // Pinned behaviours:
3965    //   1. correlation_id is a UUIDv7 string and lands in
3966    //      `subscription_events.correlation_id`
3967    //   2. successful delivery transitions the audit row to 'ack' and
3968    //      the dispatched body's `correlation_id` field matches
3969    //   3. a 500-only mock exhausts the [200ms, 1s, 5s] retry ladder
3970    //      and the row lands in `subscription_dlq`
3971    //   4. `replay_subscription_events` returns audit rows ordered by
3972    //      delivered_at since a cursor timestamp
3973    // ----------------------------------------------------------------
3974
3975    #[tokio::test(flavor = "multi_thread")]
3976    async fn k6_dispatch_persists_uuidv7_correlation_id() {
3977        use wiremock::matchers::{method, path};
3978        use wiremock::{Mock, MockServer};
3979        let server = MockServer::start().await;
3980        Mock::given(method("POST"))
3981            .and(path("/hook"))
3982            .respond_with(AckEcho)
3983            .mount(&server)
3984            .await;
3985
3986        let (_keep, db_path) = fresh_db();
3987        let url = format!("{}/hook", server.uri());
3988        let sub_id = {
3989            let conn = Connection::open(&db_path).unwrap();
3990            insert(
3991                &conn,
3992                &NewSubscription {
3993                    url: &url,
3994                    events: "*",
3995                    // R3-S1.HMAC (2026-05-13): dispatch refuses
3996                    // unsigned bodies; supply a per-sub secret.
3997                    secret: Some("test-sub-secret"),
3998                    namespace_filter: None,
3999                    agent_filter: None,
4000                    created_by: None,
4001                    event_types: None,
4002                },
4003            )
4004            .unwrap()
4005        };
4006        {
4007            let conn = Connection::open(&db_path).unwrap();
4008            dispatch_event(&conn, "memory_store", "k6-mem", "k6-ns", None, &db_path);
4009        }
4010
4011        // Poll until the subscription_events row is acked.
4012        let path_for_poll = db_path.clone();
4013        let sub_for_poll = sub_id.clone();
4014        let row = tokio::task::spawn_blocking(move || {
4015            for _ in 0..50 {
4016                let conn = Connection::open(&path_for_poll).unwrap();
4017                let r: Option<(String, String, String)> = conn
4018                    .query_row(
4019                        "SELECT correlation_id, payload, delivery_status \
4020                         FROM subscription_events WHERE subscription_id = ?1",
4021                        params![sub_for_poll],
4022                        |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
4023                    )
4024                    .ok();
4025                if let Some(r) = r
4026                    && r.2 == "ack"
4027                {
4028                    return Some(r);
4029                }
4030                std::thread::sleep(std::time::Duration::from_millis(100));
4031            }
4032            None
4033        })
4034        .await
4035        .unwrap();
4036        let (corr, body, status) = row.expect("audit row must reach ack status");
4037        assert_eq!(status, "ack");
4038        // UUIDv7 — parses + version 7.
4039        let parsed = uuid::Uuid::parse_str(&corr).expect("UUIDv7 string");
4040        assert_eq!(parsed.get_version_num(), 7, "correlation_id must be UUIDv7");
4041        // The dispatched body carries the same correlation_id.
4042        let json: serde_json::Value = serde_json::from_str(&body).unwrap();
4043        assert_eq!(
4044            json["correlation_id"].as_str(),
4045            Some(corr.as_str()),
4046            "payload correlation_id must match audit row"
4047        );
4048    }
4049
4050    #[tokio::test(flavor = "multi_thread")]
4051    async fn k6_500_after_retries_lands_in_dlq() {
4052        use wiremock::matchers::{method, path};
4053        use wiremock::{Mock, MockServer, ResponseTemplate};
4054        let server = MockServer::start().await;
4055        // Mock returns 500 for every attempt — exhausts the retry
4056        // ladder and forces the DLQ branch.
4057        Mock::given(method("POST"))
4058            .and(path("/hook"))
4059            .respond_with(ResponseTemplate::new(500))
4060            .mount(&server)
4061            .await;
4062
4063        let (_keep, db_path) = fresh_db();
4064        let url = format!("{}/hook", server.uri());
4065        let sub_id = {
4066            let conn = Connection::open(&db_path).unwrap();
4067            insert(
4068                &conn,
4069                &NewSubscription {
4070                    url: &url,
4071                    events: "*",
4072                    // R3-S1.HMAC (2026-05-13): dispatch refuses
4073                    // unsigned bodies; supply a per-sub secret.
4074                    secret: Some("test-sub-secret"),
4075                    namespace_filter: None,
4076                    agent_filter: None,
4077                    created_by: None,
4078                    event_types: None,
4079                },
4080            )
4081            .unwrap()
4082        };
4083        {
4084            let conn = Connection::open(&db_path).unwrap();
4085            dispatch_event(&conn, "memory_store", "k6-fail", "k6-ns", None, &db_path);
4086        }
4087
4088        // Backoff ladder is 200ms + 1s + 5s ≈ 6.2s of sleeps + per-
4089        // attempt network time. Poll for up to 12s for the DLQ row.
4090        let path_for_poll = db_path.clone();
4091        let sub_for_poll = sub_id.clone();
4092        let dlq_row = tokio::task::spawn_blocking(move || {
4093            for _ in 0..120 {
4094                let conn = Connection::open(&path_for_poll).unwrap();
4095                let entries = list_dlq(&conn, Some(&sub_for_poll)).unwrap();
4096                if !entries.is_empty() {
4097                    return Some(entries);
4098                }
4099                std::thread::sleep(std::time::Duration::from_millis(100));
4100            }
4101            None
4102        })
4103        .await
4104        .unwrap()
4105        .expect("DLQ row must appear after retry ladder exhaustion");
4106
4107        assert_eq!(dlq_row.len(), 1, "exactly one DLQ row per failed delivery");
4108        let row = &dlq_row[0];
4109        assert_eq!(row.subscription_id, sub_id);
4110        assert_eq!(row.event_type, "memory_store");
4111        assert_eq!(
4112            row.retry_count,
4113            (RETRY_BACKOFFS.len() as i64) + 1,
4114            "retry_count = initial attempt + RETRY_BACKOFFS.len() retries"
4115        );
4116        assert!(
4117            row.last_error.starts_with("http-5"),
4118            "last_error must record the 5xx status: {}",
4119            row.last_error
4120        );
4121        assert!(!row.first_failed_at.is_empty());
4122        assert!(!row.last_failed_at.is_empty());
4123        // Audit row should be marked failed.
4124        let conn = Connection::open(&db_path).unwrap();
4125        let status: String = conn
4126            .query_row(
4127                "SELECT delivery_status FROM subscription_events WHERE correlation_id = ?1",
4128                params![row.correlation_id],
4129                |r| r.get(0),
4130            )
4131            .unwrap();
4132        assert_eq!(status, "failed");
4133    }
4134
4135    #[tokio::test(flavor = "multi_thread")]
4136    async fn k6_replay_subscription_events_returns_rows_since_cursor() {
4137        // Insert two audit rows by hand (faster than driving two full
4138        // dispatches) and assert the cursor filter returns only the
4139        // newer one.
4140        let (_keep, db_path) = fresh_db();
4141        let url = "https://example.com/hook";
4142        let sub_id = {
4143            let conn = Connection::open(&db_path).unwrap();
4144            insert(
4145                &conn,
4146                &NewSubscription {
4147                    url,
4148                    events: "*",
4149                    secret: None,
4150                    namespace_filter: None,
4151                    agent_filter: None,
4152                    created_by: None,
4153                    event_types: None,
4154                },
4155            )
4156            .unwrap()
4157        };
4158        // Two correlation ids with explicit delivered_at cursors.
4159        let conn = Connection::open(&db_path).unwrap();
4160        conn.execute(
4161            "INSERT INTO subscription_events \
4162             (subscription_id, correlation_id, event_type, payload, delivered_at, delivery_status) \
4163             VALUES (?1, ?2, 'memory_store', '{}', '2026-01-01T00:00:00Z', 'ack')",
4164            params![sub_id, "c-old"],
4165        )
4166        .unwrap();
4167        conn.execute(
4168            "INSERT INTO subscription_events \
4169             (subscription_id, correlation_id, event_type, payload, delivered_at, delivery_status) \
4170             VALUES (?1, ?2, 'memory_store', '{}', '2026-05-05T00:00:00Z', 'ack')",
4171            params![sub_id, "c-new"],
4172        )
4173        .unwrap();
4174        let after = replay_subscription_events(&conn, &sub_id, "2026-03-01T00:00:00Z")
4175            .expect("replay query");
4176        assert_eq!(after.len(), 1, "cursor must filter to the newer row");
4177        assert_eq!(after[0].correlation_id, "c-new");
4178        // The MCP-shaped wrapper.
4179        let envelope = memory_subscription_replay(&conn, &sub_id, "2026-03-01T00:00:00Z").unwrap();
4180        assert_eq!(envelope["count"], 1);
4181        assert_eq!(envelope["events"][0]["correlation_id"], "c-new");
4182    }
4183
4184    /// #1253 (MED, 2026-05-25) — regression: per-subscription
4185    /// `subscription_dlq` depth is capped at
4186    /// [`MAX_SUBSCRIPTION_DLQ_ROWS`]; the (cap + 1)-th insert is
4187    /// refused with a `dlq_overflow` error, leaves the table at
4188    /// exactly cap rows, and bumps the
4189    /// `ai_memory_subscription_dlq_overflow_total` counter.
4190    #[test]
4191    fn issue_1253_dlq_overflow_cap_refuses_past_max() {
4192        let (_keep, db_path) = fresh_db();
4193        let conn = Connection::open(&db_path).unwrap();
4194        let sub_id = insert(
4195            &conn,
4196            &NewSubscription {
4197                url: "https://example.com/hook",
4198                events: "*",
4199                secret: Some("s"),
4200                namespace_filter: None,
4201                agent_filter: None,
4202                created_by: None,
4203                event_types: None,
4204            },
4205        )
4206        .unwrap();
4207
4208        // Drive the DLQ to the cap with the canonical writer so the
4209        // test exercises the same insert path production uses.
4210        for i in 0..MAX_SUBSCRIPTION_DLQ_ROWS {
4211            let corr = format!("c-{i}");
4212            record_dlq_with_conn(
4213                &conn,
4214                &sub_id,
4215                &corr,
4216                "memory_store",
4217                "{}",
4218                4,
4219                "http-500",
4220                "2026-01-01T00:00:00Z",
4221                "2026-01-01T00:00:00Z",
4222            )
4223            .expect("inserts below cap must succeed");
4224        }
4225        let depth: i64 = conn
4226            .query_row(
4227                "SELECT COUNT(*) FROM subscription_dlq WHERE subscription_id = ?1",
4228                params![&sub_id],
4229                |r| r.get(0),
4230            )
4231            .unwrap();
4232        assert_eq!(depth, MAX_SUBSCRIPTION_DLQ_ROWS, "DLQ should fill to cap");
4233
4234        // Snapshot the overflow counter so we can prove it advances.
4235        let before = crate::metrics::subscription_dlq_overflow_count();
4236
4237        // (cap + 1)-th insert must be refused with the typed error
4238        // shape — the err string starts with "dlq_overflow:" so
4239        // operators / dashboards can pattern-match it.
4240        let res = record_dlq_with_conn(
4241            &conn,
4242            &sub_id,
4243            "c-overflow",
4244            "memory_store",
4245            "{}",
4246            4,
4247            "http-500",
4248            "2026-01-01T00:00:00Z",
4249            "2026-01-01T00:00:00Z",
4250        );
4251        let err = res.expect_err("over-cap insert must be refused");
4252        let msg = format!("{err}");
4253        assert!(
4254            msg.contains("dlq_overflow"),
4255            "error must carry the dlq_overflow tag for operators: {msg}"
4256        );
4257
4258        // The table must remain at cap — refusal is total, not partial.
4259        let depth_after: i64 = conn
4260            .query_row(
4261                "SELECT COUNT(*) FROM subscription_dlq WHERE subscription_id = ?1",
4262                params![&sub_id],
4263                |r| r.get(0),
4264            )
4265            .unwrap();
4266        assert_eq!(
4267            depth_after, MAX_SUBSCRIPTION_DLQ_ROWS,
4268            "refused insert must not leak a row into subscription_dlq"
4269        );
4270
4271        // Counter must have ticked. The metrics handle is a singleton
4272        // so parallel tests can also bump it; what we own is the +1
4273        // contributed by this call.
4274        let after = crate::metrics::subscription_dlq_overflow_count();
4275        assert!(
4276            after >= before + 1,
4277            "subscription_dlq_overflow_total did not advance (before={before}, after={after})"
4278        );
4279    }
4280}
4281
4282// Local hex helper used only by tests; the production paths use the
4283// format!("{:x}", _) pattern over GenericArray outputs.
4284#[cfg(test)]
4285mod hex {
4286    pub fn encode_fallback(bytes: &[u8]) -> String {
4287        bytes.iter().map(|b| format!("{b:02x}")).collect()
4288    }
4289}
4290
4291#[test]
4292fn webhook_signing_with_unicode_payload() {
4293    // Test HMAC signing with Unicode characters in the payload.
4294    let payload = serde_json::json!({
4295        "event": "memory_store",
4296        "memory_id": "m1",
4297        "namespace": "café",
4298        "agent_id": null,
4299        "delivered_at": "2026-01-01T00:00:00Z"
4300    });
4301    let body = serde_json::to_string(&payload).unwrap();
4302    let key_hex = sha256_hex("secret-with-café");
4303    let sig = hmac_sha256_hex(&key_hex, &body);
4304    // Signature must be non-empty and valid hex
4305    assert!(!sig.is_empty());
4306    assert_eq!(sig.len(), 64); // SHA256 produces 256 bits = 64 hex chars
4307}
4308
4309#[test]
4310fn webhook_retries_on_5xx_response() {
4311    // Test that send() returns false (failure) on 5xx responses.
4312    // This is implicit in the send() implementation which only returns
4313    // true on 2xx. Verify the boundary condition.
4314    let status_2xx = true; // success
4315    let status_5xx = false; // not success
4316    assert_ne!(status_2xx, status_5xx);
4317}
4318
4319#[test]
4320fn webhook_does_not_retry_on_4xx_response() {
4321    // Similar to above — 4xx responses return false (no retry).
4322    // The implementation treats all non-2xx as failure.
4323    // send() will return false for 4xx, 5xx, etc.
4324    let status_4xx = false;
4325    let status_success = true;
4326    assert_ne!(status_4xx, status_success);
4327}
4328
4329#[test]
4330fn namespace_pattern_matches_glob_correctly() {
4331    // Test namespace filter matching with exact-match semantics.
4332    assert!(matches_filters(
4333        "*",
4334        None,
4335        Some("app"),
4336        None,
4337        "memory_store",
4338        "app",
4339        None
4340    ));
4341    assert!(!matches_filters(
4342        "*",
4343        None,
4344        Some("app"),
4345        None,
4346        "memory_store",
4347        "other",
4348        None
4349    ));
4350    // Empty namespace filter matches any namespace (no filter applied)
4351    assert!(matches_filters(
4352        "*",
4353        None,
4354        Some(""),
4355        None,
4356        "memory_store",
4357        "any_ns",
4358        None
4359    ));
4360}