ai_memory/subscriptions.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.6.0.0 — webhook subscriptions.
5//!
6//! Subscribers register a URL + shared secret + event/namespace/agent
7//! filters. When a matching event fires (e.g. `memory_store`), a
8//! fire-and-forget thread POSTs an HMAC-SHA256-signed JSON payload.
9//!
10//! SSRF hardening:
11//! - `http://` only to `127.0.0.0/8` or `localhost` hosts;
12//! everywhere else requires `https://`
13//! - RFC1918 / RFC4193 / link-local hosts are rejected unless
14//! `allow_private_networks = true` in the daemon config
15//!
16//! Signature:
17//! - Header `X-Ai-Memory-Signature: sha256=<hex>` over the raw
18//! JSON body
19//! - The secret stored in the DB is a SHA-256 of the plaintext
20//! shared secret; the plaintext is returned **once** at
21//! subscription time and never leaves the DB after.
22
23use crate::models::field_names;
24use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs};
25use std::str::FromStr;
26use std::sync::{Arc, OnceLock};
27
28use anyhow::{Context, Result, anyhow};
29use rusqlite::{Connection, params};
30use serde::{Deserialize, Serialize};
31use sha2::{Digest, Sha256};
32use tokio::sync::Semaphore;
33
34/// Tracing target for the subscription fan-out / DLQ surface
35/// (#1558 tracing-target SSOT).
36const SUBSCRIPTIONS_TRACE_TARGET: &str = "ai_memory::subscriptions";
37
38/// Public-facing subscription record (no secret plaintext).
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct Subscription {
41 pub id: String,
42 pub url: String,
43 pub events: String,
44 pub namespace_filter: Option<String>,
45 pub agent_filter: Option<String>,
46 pub created_by: Option<String>,
47 pub created_at: String,
48 pub dispatch_count: i64,
49 pub failure_count: i64,
50 /// v0.6.3.1 P5 (G9): structured per-event-type opt-in list. When
51 /// `Some(list)` the subscription only fires for event types in
52 /// `list` (overriding the legacy comma-separated `events`
53 /// whitelist). When `None` (default) all events match — preserves
54 /// pre-P5 behaviour for existing subscribers.
55 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub event_types: Option<Vec<String>>,
57}
58
59/// Parameters for creating a subscription.
60pub struct NewSubscription<'a> {
61 pub url: &'a str,
62 pub events: &'a str,
63 pub secret: Option<&'a str>,
64 pub namespace_filter: Option<&'a str>,
65 pub agent_filter: Option<&'a str>,
66 pub created_by: Option<&'a str>,
67 /// v0.6.3.1 P5 (G9): optional structured event-type whitelist. When
68 /// `Some`, only the listed event types fire. When `None`, the legacy
69 /// `events` field (comma-separated / `*`) governs — the historical
70 /// behaviour for backward compatibility.
71 pub event_types: Option<&'a [String]>,
72}
73
74/// Canonical list of webhook lifecycle events surfaced to subscribers
75/// and to `memory_capabilities` (capabilities v2 `webhook_events`).
76/// Keep stable: integrators pin against these strings.
77///
78/// v0.7.0 K4 — `approval_requested` joined the list. It fires every
79/// time a `pending_actions` row is inserted by the governance gate
80/// (locally via `db::queue_pending_action` or remotely via
81/// `db::upsert_pending_action`). Subscribers opt in via the existing
82/// [`NewSubscription::event_types`] structured filter; legacy
83/// wildcard-event subscribers also receive it. Closes the
84/// v0.6.3.1 honest-Capabilities-v2 disclosure that
85/// `approval.subscribers` was advertised but never published — the
86/// K10 Approval API HTTP+SSE handler consumes these events directly.
87///
88/// v0.7 J4 / G14 — `memory_link_invalidated` also joins so subscribers
89/// can replay the audit-edge timeline (every successful
90/// `memory_kg_invalidate` fires this event after the link's
91/// `valid_until` is set, regardless of which KG backend handled the
92/// SET).
93// v0.7.x (issue #1174 PR1 — pm-v3.1 MCP tool name sweep): the three
94// entries that ARE MCP tool names reference the canonical
95// `tool_names` consts.
96//
97// v0.7.0 multi-agent literal-sweep (scanner B finding F-B10.x): the
98// remaining four entries — subscription-event types distinct from
99// MCP method names — now also consume named consts in
100// [`webhook_events`] below. Pre-sweep these were the last raw-literal
101// holdouts in this array; centralising them closes the drift class.
102pub const WEBHOOK_EVENT_TYPES: &[&str] = &[
103 crate::mcp::registry::tool_names::MEMORY_STORE,
104 crate::mcp::registry::tool_names::MEMORY_PROMOTE,
105 crate::mcp::registry::tool_names::MEMORY_DELETE,
106 webhook_events::MEMORY_LINK_CREATED,
107 webhook_events::MEMORY_LINK_INVALIDATED,
108 webhook_events::MEMORY_CONSOLIDATED,
109 webhook_events::APPROVAL_REQUESTED,
110];
111
112/// v0.7.0 multi-agent literal-sweep (scanner B finding F-B10.x) —
113/// canonical webhook-event-type slugs that are NOT MCP tool names.
114///
115/// Distinct from `signed_events::event_types` (audit-chain slugs use
116/// dot-separated names like `"memory_link.created"`); webhook events
117/// use underscore-separated names matching the v0.6.x wire contract
118/// (`"memory_link_created"`). A rename or schema-evolution that adds
119/// new webhook events touches this mod + the [`WEBHOOK_EVENT_TYPES`]
120/// array (which is statically-asserted to contain every const here
121/// via the unit test `webhook_event_consts_appear_in_array`).
122pub mod webhook_events {
123 /// Fired by `memory_link.create` substrate path after every
124 /// successful link write (signed or unsigned). Subscribers consume
125 /// via the K10 / J4 webhook fan-out.
126 pub const MEMORY_LINK_CREATED: &str = "memory_link_created";
127
128 /// Fired by `memory_kg_invalidate` after `valid_until` is set,
129 /// regardless of which KG backend handled the SET. Joined the
130 /// webhook set at v0.7 J4 / G14 so subscribers can replay the
131 /// audit-edge timeline.
132 pub const MEMORY_LINK_INVALIDATED: &str = "memory_link_invalidated";
133
134 /// Fired by `memory_consolidate` after a successful consolidation
135 /// write (post-#867 W6 consolidation primitive).
136 pub const MEMORY_CONSOLIDATED: &str = "memory_consolidated";
137
138 /// Fired by the K10 Approval API after a governance `Pending`
139 /// decision queues a pending action. Consumed directly by the
140 /// K10 Approval HTTP+SSE handler.
141 pub const APPROVAL_REQUESTED: &str = "approval_requested";
142}
143
144/// Insert a subscription, hashing any secret before persisting.
145///
146/// Returns the new subscription's id.
147///
148/// P5 (G9): when `event_types` is `Some`, the structured opt-in list is
149/// JSON-encoded into the new `event_types` column AND mirrored into
150/// the legacy comma-separated `events` column so the existing
151/// dispatch matcher continues to work without a second code path. An
152/// unknown event type returns Err — the canonical list lives in
153/// `WEBHOOK_EVENT_TYPES`.
154pub fn insert(conn: &Connection, req: &NewSubscription<'_>) -> Result<String> {
155 validate_url(req.url)?;
156 let id = uuid::Uuid::new_v4().to_string();
157 let secret_hash = req.secret.map(sha256_hex);
158 let now = chrono::Utc::now().to_rfc3339();
159
160 // P5: validate + serialise the structured event-type list.
161 let (events_csv, event_types_json) = if let Some(list) = req.event_types {
162 for ev in list {
163 if !WEBHOOK_EVENT_TYPES.contains(&ev.as_str()) {
164 return Err(anyhow!(
165 "unknown webhook event type {ev:?}; valid types: {WEBHOOK_EVENT_TYPES:?}"
166 ));
167 }
168 }
169 // Mirror into the legacy events column so dispatch keeps working.
170 let csv = list.join(",");
171 let json = serde_json::to_string(list).context("event_types serialise")?;
172 (csv, Some(json))
173 } else {
174 (req.events.to_string(), None)
175 };
176
177 conn.execute(
178 "INSERT INTO subscriptions (id, url, events, secret_hash, namespace_filter, agent_filter, created_by, created_at, event_types) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
179 params![id, req.url, events_csv, secret_hash, req.namespace_filter, req.agent_filter, req.created_by, now, event_types_json],
180 )?;
181 Ok(id)
182}
183
184/// Delete a subscription by id, optionally scoped to its owner.
185///
186/// Cross-tenant authorization (#870, security-high, 2026-05-18):
187/// When `caller_agent_id` is `Some(aid)`, the DELETE only matches rows
188/// where `created_by = aid` — preventing tenant A from unsubscribing
189/// tenant B's webhook. When `None`, the DELETE matches by id alone
190/// (admin path: federation receive, GC, operator CLI). Callers exposed
191/// to untrusted input (MCP `memory_unsubscribe`, HTTP
192/// `DELETE /api/v1/subscriptions`) MUST pass `Some(<authenticated
193/// caller>)` — anything else is a bypass.
194///
195/// Returns true if a row was removed (i.e. it both existed AND matched
196/// the owner clause when one was supplied).
197pub fn delete(conn: &Connection, id: &str, caller_agent_id: Option<&str>) -> Result<bool> {
198 let n = if let Some(aid) = caller_agent_id {
199 conn.execute(
200 "DELETE FROM subscriptions WHERE id = ?1 AND created_by = ?2",
201 params![id, aid],
202 )?
203 } else {
204 conn.execute("DELETE FROM subscriptions WHERE id = ?1", params![id])?
205 };
206 Ok(n > 0)
207}
208
209/// List active subscriptions, optionally scoped to a single owner.
210///
211/// Cross-tenant authorization (#872, security-high, 2026-05-18):
212/// When `caller_agent_id` is `Some(aid)`, only rows where
213/// `created_by = aid` are returned — preventing tenant A from
214/// enumerating tenant B's webhook fleet. When `None`, every row is
215/// returned (internal use: dispatch fan-out, federation, operator
216/// inventory). Callers exposed to untrusted input (MCP
217/// `memory_list_subscriptions`, HTTP `GET /api/v1/subscriptions`) MUST
218/// pass `Some(<authenticated caller>)`.
219pub fn list(conn: &Connection, caller_agent_id: Option<&str>) -> Result<Vec<Subscription>> {
220 let mut stmt = if caller_agent_id.is_some() {
221 conn.prepare(
222 "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions WHERE created_by = ?1 ORDER BY created_at DESC",
223 )?
224 } else {
225 conn.prepare(
226 "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions ORDER BY created_at DESC",
227 )?
228 };
229 let row_decoder = |row: &rusqlite::Row<'_>| {
230 let event_types_raw: Option<String> = row.get(9)?;
231 // P5: decode the JSON column. A corrupt row should not break
232 // the entire list — fall back to None (= all-events) and warn.
233 let event_types =
234 event_types_raw.and_then(|s| match serde_json::from_str::<Vec<String>>(&s) {
235 Ok(v) => Some(v),
236 Err(e) => {
237 tracing::warn!(
238 "subscription event_types JSON decode failed, treating as all-events: {e}"
239 );
240 None
241 }
242 });
243 Ok(Subscription {
244 id: row.get(0)?,
245 url: row.get(1)?,
246 events: row.get(2)?,
247 namespace_filter: row.get(3)?,
248 agent_filter: row.get(4)?,
249 created_by: row.get(5)?,
250 created_at: row.get(6)?,
251 dispatch_count: row.get(7)?,
252 failure_count: row.get(8)?,
253 event_types,
254 })
255 };
256 let rows = if let Some(aid) = caller_agent_id {
257 stmt.query_map(params![aid], row_decoder)?
258 .collect::<rusqlite::Result<Vec<_>>>()
259 } else {
260 stmt.query_map([], row_decoder)?
261 .collect::<rusqlite::Result<Vec<_>>>()
262 };
263 rows.context("subscription row decode failed")
264}
265
266/// Look up a subscription's owner (`created_by`) by id.
267///
268/// v0.7.0 #1115 / #1118 (SR-1 #5 / #6, HIGH) — used by the MCP +
269/// HTTP authz gates on `memory_subscription_replay` and
270/// `memory_subscription_dlq_list` to refuse cross-tenant reads of
271/// other agents' subscriptions. Returns `Ok(None)` when the
272/// subscription does not exist; callers map this to the same
273/// not-found envelope as a real miss so the existence of the id is
274/// not leaked.
275///
276/// # Errors
277/// - SQL prepare / query / decode failure.
278pub fn get_owner(conn: &Connection, id: &str) -> Result<Option<String>> {
279 let mut stmt = conn.prepare("SELECT created_by FROM subscriptions WHERE id = ?1")?;
280 let mut rows = stmt.query(params![id])?;
281 if let Some(row) = rows.next().context("subscription owner row")? {
282 let owner: Option<String> = row.get(0)?;
283 Ok(owner)
284 } else {
285 Ok(None)
286 }
287}
288
289/// P5 (G9): list subscriptions matching a specific event type. Returns
290/// rows where either:
291/// - `event_types` is NULL (= all events; backward-compat default), OR
292/// - `event_types` JSON array contains `event_type`.
293///
294/// This is the DB-side variant of the per-event filter; the in-memory
295/// `matches_filters` is the authoritative gate at dispatch time and
296/// honours both the legacy `events` whitelist and the new
297/// `event_types` opt-in list.
298pub fn list_by_event(conn: &Connection, event_type: &str) -> Result<Vec<Subscription>> {
299 // SQLite doesn't have a JSON contains operator portable across all
300 // builds; we filter in Rust after a coarse SQL prefilter that drops
301 // rows whose stored JSON clearly doesn't mention the event. The
302 // text LIKE match is conservative (it can yield false positives the
303 // post-filter then rejects) which keeps the SQL simple while still
304 // letting an idx_subscriptions_event_types-backed scan win on large
305 // tables.
306 let pattern = format!("%{event_type}%");
307 let mut stmt = conn.prepare(
308 "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions WHERE event_types IS NULL OR event_types LIKE ?1 ORDER BY created_at DESC",
309 )?;
310 let rows = stmt.query_map(params![pattern], |row| {
311 let event_types_raw: Option<String> = row.get(9)?;
312 let event_types =
313 event_types_raw.and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok());
314 Ok(Subscription {
315 id: row.get(0)?,
316 url: row.get(1)?,
317 events: row.get(2)?,
318 namespace_filter: row.get(3)?,
319 agent_filter: row.get(4)?,
320 created_by: row.get(5)?,
321 created_at: row.get(6)?,
322 dispatch_count: row.get(7)?,
323 failure_count: row.get(8)?,
324 event_types,
325 })
326 })?;
327 let mut out: Vec<Subscription> = Vec::new();
328 for sub in rows {
329 let s = sub.context("subscription row decode failed")?;
330 match &s.event_types {
331 None => out.push(s),
332 Some(list) if list.iter().any(|e| e == event_type) => out.push(s),
333 Some(_) => {} // structured opt-in present but doesn't include this event
334 }
335 }
336 Ok(out)
337}
338
339/// Test whether a subscription's filters match the given event.
340///
341/// P5 (G9): when `sub_event_types` is `Some(list)` it overrides the
342/// legacy `sub_events` comma-string — the structured opt-in is the
343/// authoritative filter for that subscriber. When `None`, the legacy
344/// whitelist applies (backward compat for pre-P5 subscribers).
345///
346/// #932 (v0.7.0 Track D, 2026-05-20) — bumped from private to
347/// `pub(crate)` so the postgres-aware dispatch helper at
348/// `crate::handlers::subscriptions::dispatch_event_postgres` can
349/// reuse the canonical filter logic instead of forking it.
350pub(crate) fn matches_filters(
351 sub_events: &str,
352 sub_event_types: Option<&[String]>,
353 sub_namespace: Option<&str>,
354 sub_agent: Option<&str>,
355 event: &str,
356 namespace: &str,
357 agent: Option<&str>,
358) -> bool {
359 let event_match = if let Some(list) = sub_event_types {
360 // Structured opt-in: empty list means "no events" (defensive — the
361 // insert path validates non-empty, but defend against hand-crafted
362 // rows).
363 list.iter().any(|e| e == event)
364 } else {
365 // Legacy whitelist (comma-separated or `*`).
366 sub_events == "*"
367 || sub_events
368 .split(',')
369 .map(str::trim)
370 .any(|e| e == event || e == "*")
371 };
372 if !event_match {
373 return false;
374 }
375 if let Some(ns) = sub_namespace
376 && !ns.is_empty()
377 && ns != namespace
378 {
379 return false;
380 }
381 if let Some(filter) = sub_agent
382 && !filter.is_empty()
383 && agent.is_none_or(|a| a != filter)
384 {
385 return false;
386 }
387 true
388}
389
390/// Payload fired to subscribers. Stable JSON shape.
391///
392/// v0.7.0 K6 — every dispatched payload now carries a deterministic
393/// `correlation_id` (UUIDv7 — time-ordered, unique). Receivers ACK
394/// with `{"status":"ack","correlation_id":"..."}`; the dispatcher
395/// retries on no-ACK or non-2xx with the [200ms, 1s, 5s] exponential
396/// backoff ladder and lands the row in `subscription_dlq` after three
397/// failed attempts. The id is generated once per (subscription,
398/// event) pair and persisted into `subscription_events` BEFORE the
399/// network send so replay-from-cursor (K7) sees a stable record.
400#[derive(Serialize)]
401struct DispatchPayload<'a> {
402 event: &'a str,
403 memory_id: &'a str,
404 namespace: &'a str,
405 agent_id: Option<&'a str>,
406 delivered_at: String,
407 /// v0.7.0 K6 — UUIDv7 correlation id. Stable across retries.
408 correlation_id: &'a str,
409 /// P5 (G9): event-specific extra fields. Flattened so the wire shape
410 /// stays a flat object — older subscribers that ignore unknown keys
411 /// keep working. Each new event type uses one of the
412 /// `*EventDetails` structs below.
413 #[serde(flatten, skip_serializing_if = "Option::is_none")]
414 details: Option<serde_json::Value>,
415}
416
417/// v0.7.0 K6 — exponential-backoff retry ladder for failed webhook
418/// deliveries. The dispatcher attempts the initial POST, then up to
419/// three retries spaced [200ms, 1s, 5s] apart. After the final retry
420/// fails the row lands in `subscription_dlq` for K7's inspector tool
421/// to surface. Exposed as a constant so tests can reason about the
422/// total wall-clock budget (≈ 6.2s + per-attempt timeout).
423pub const RETRY_BACKOFFS: &[std::time::Duration] = &[
424 std::time::Duration::from_millis(200),
425 std::time::Duration::from_secs(1),
426 std::time::Duration::from_secs(5),
427];
428
429/// v0.7.0 K6 — per-attempt ACK timeout. Receivers MUST return a JSON
430/// body of the form `{"status":"ack","correlation_id":"..."}` within
431/// this window for the delivery to count as successful. A non-2xx
432/// response, a timeout, or an ACK whose `correlation_id` doesn't
433/// match the dispatched id all count as failure and trigger the next
434/// retry. Exposed so the integration tests can pin the wall-clock
435/// expectations.
436pub const ACK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
437
438/// PERF-3 (fix campaign 2026-05-26, FX-10) — default upper bound on the
439/// number of webhook deliveries that may be in flight concurrently.
440///
441/// Pre-fix posture: every matching subscriber span on every store event
442/// spawned a fresh `std::thread::spawn` worker that opened its own
443/// SQLite connection. At 1000 subscribers this minted 1000 OS threads
444/// (~1 MB stack each = 1 GB virtual) and 1000 SQLite handles per
445/// store event, bypassing the existing Tokio runtime entirely.
446///
447/// Post-fix posture: each delivery is enqueued on a bounded
448/// [`tokio::sync::Semaphore`] (default permits = this constant) and
449/// the blocking-HTTP + audit-write step runs inside
450/// [`tokio::task::spawn_blocking`] so the dispatcher uses the
451/// existing runtime's blocking-thread pool instead of unbounded
452/// OS-thread spawn.
453///
454/// Operators tune the bound via `AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY`.
455/// Values outside `1..=4096` fall back to the default with a warn-log.
456pub const DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY: usize = 32;
457
458/// PERF-3 (FX-10) — module-level shared semaphore that bounds the
459/// in-flight webhook delivery count across every concurrent
460/// `dispatch_event_to_subs` call. Built lazily on first dispatch from
461/// `AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY` (default
462/// [`DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY`]). Held in `Arc` so worker
463/// tasks can clone-and-move it into their async body.
464static DISPATCH_SEMAPHORE: OnceLock<Arc<Semaphore>> = OnceLock::new();
465
466/// Resolve the bound from env (test seam: tests call
467/// [`override_dispatch_concurrency_for_tests`] before any dispatch
468/// runs to pin the bound to a known value).
469fn dispatch_concurrency_bound() -> usize {
470 if let Some(forced) = TEST_DISPATCH_CONCURRENCY_OVERRIDE.get() {
471 return *forced;
472 }
473 match std::env::var("AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY") {
474 Ok(raw) => match raw.parse::<usize>() {
475 Ok(n) if (1..=4096).contains(&n) => n,
476 _ => {
477 tracing::warn!(
478 "AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY={raw:?} not in 1..=4096; \
479 falling back to default {DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY}"
480 );
481 DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY
482 }
483 },
484 Err(_) => DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY,
485 }
486}
487
488/// Return a clone of the shared dispatch semaphore, building it on
489/// first call from the resolved concurrency bound.
490fn dispatch_semaphore() -> Arc<Semaphore> {
491 DISPATCH_SEMAPHORE
492 .get_or_init(|| Arc::new(Semaphore::new(dispatch_concurrency_bound())))
493 .clone()
494}
495
496/// Test-only override for the dispatch concurrency bound. Tests set
497/// this BEFORE the first dispatch runs in the process so the
498/// semaphore is sized to the assertion's expectations. Subsequent
499/// changes after the semaphore is built are ignored (the bound is
500/// per-process).
501static TEST_DISPATCH_CONCURRENCY_OVERRIDE: OnceLock<usize> = OnceLock::new();
502
503/// Test-only seam: pin the dispatch concurrency bound to `n`. Must be
504/// called before any `dispatch_event*` runs in the current process.
505/// Returns `Err(_)` if the bound has already been set (in which case
506/// the prior value remains authoritative).
507#[doc(hidden)]
508pub fn override_dispatch_concurrency_for_tests(n: usize) -> Result<(), usize> {
509 TEST_DISPATCH_CONCURRENCY_OVERRIDE.set(n)
510}
511
512/// PERF-3 (FX-10) — diagnostic accessor returning the number of
513/// permits currently AVAILABLE on the shared dispatch semaphore. Used
514/// by the regression test in
515/// `tests/subscriptions_no_thread_spawn_per_subscriber.rs` to assert
516/// the in-flight ceiling holds even when 50 subscribers fan out
517/// concurrently.
518#[doc(hidden)]
519pub fn dispatch_semaphore_available_permits() -> usize {
520 dispatch_semaphore().available_permits()
521}
522
523// v0.7.0 #1073 — `dispatch_http_client()` scaffolding REMOVED
524//
525// v0.7.x (issue #1174 follow-up #1196) — the dead-code shared-client
526// accessor + its `OnceLock<Option<reqwest::blocking::Client>>` static
527// were removed. Sub-agent code review (#1196 PR8) found zero
528// production callers — only the `tests/sr3_perf_regressions.rs`
529// scaffolding-pin test kept the symbol alive. The SR-2 #1082 SSRF
530// hardening (per-call `builder.resolve(host, addr)` DNS pinning)
531// keeps the per-call builder path live in `send()`; the future
532// custom `reqwest::dns::Resolve` refactor will re-introduce a shared
533// client at that point through `RuntimeContext`, not as a private
534// module-level OnceLock. Removed alongside the test pin in the same
535// commit so the scaffolding doesn't drift back via the existence
536// assertion.
537
538/// One row of the `subscription_events` per-delivery audit log. K6
539/// writes one row before each network send; K7's
540/// `memory_subscription_replay` tool reads the rows back ordered by
541/// `delivered_at` for replay-from-cursor support.
542#[derive(Debug, Clone, Serialize, Deserialize)]
543pub struct SubscriptionEvent {
544 pub id: i64,
545 pub subscription_id: String,
546 pub correlation_id: String,
547 pub event_type: String,
548 pub payload: String,
549 pub delivered_at: String,
550 pub delivery_status: String,
551}
552
553/// One row of the `subscription_dlq` table. Created when a delivery
554/// exhausts the [200ms, 1s, 5s] retry ladder. K7's inspector tool
555/// surfaces these rows; K6 only ships the writer.
556#[derive(Debug, Clone, Serialize, Deserialize)]
557pub struct DlqEntry {
558 pub id: i64,
559 pub subscription_id: String,
560 pub correlation_id: String,
561 pub event_type: String,
562 pub payload: String,
563 pub retry_count: i64,
564 pub last_error: String,
565 pub first_failed_at: String,
566 pub last_failed_at: String,
567}
568
569// ---------------------------------------------------------------------
570// P5 (G9) — event payload structs for the four new lifecycle events.
571//
572// Each struct is the `details` block flattened into `DispatchPayload`
573// for its event type. They are intentionally small and JSON-stable —
574// the same shape ships on both the MCP and HTTP webhook surfaces.
575// Adding a new field is backward-compatible (subscribers ignore
576// unknowns); renaming or removing a field is breaking — bump the
577// payload schema version per AI_DEVELOPER_GOVERNANCE.md.
578// ---------------------------------------------------------------------
579
580/// `memory_promote` event — fires after a tier or vertical promotion
581/// commits. `to_namespace` is `Some` for vertical (`memory_promote`
582/// with a `to_namespace` argument); for the default tier promotion it
583/// is `None` and `tier` is set to the new tier (`"long"`).
584#[derive(Debug, Clone, Serialize, Deserialize)]
585pub struct PromoteEventDetails {
586 /// `"vertical"` for namespace promote-clone, `"tier"` for the
587 /// default tier upgrade.
588 pub mode: String,
589 /// New tier after promotion (always `"long"` for `mode = "tier"`).
590 #[serde(default, skip_serializing_if = "Option::is_none")]
591 pub tier: Option<String>,
592 /// Target namespace (vertical promote only).
593 #[serde(default, skip_serializing_if = "Option::is_none")]
594 pub to_namespace: Option<String>,
595 /// Clone id (vertical promote only); the `memory_id` field on the
596 /// outer payload carries the source memory id in vertical mode.
597 #[serde(default, skip_serializing_if = "Option::is_none")]
598 pub clone_id: Option<String>,
599}
600
601/// `memory_delete` event — fires after the row is removed from
602/// `memories`. `title` and `tier` come from the pre-delete snapshot so
603/// subscribers can write meaningful audit entries without a
604/// roundtrip.
605#[derive(Debug, Clone, Serialize, Deserialize)]
606pub struct DeleteEventDetails {
607 pub title: String,
608 pub tier: String,
609}
610
611/// `memory_link_created` event — fires after `db::create_link`
612/// commits. The outer `memory_id` carries the source id (the
613/// link-author side); `target_id` is the destination of the directed
614/// link.
615#[derive(Debug, Clone, Serialize, Deserialize)]
616pub struct LinkCreatedEventDetails {
617 pub target_id: String,
618 pub relation: String,
619}
620
621/// `memory_consolidated` event — fires after `db::consolidate`
622/// commits. The outer `memory_id` carries the new consolidated
623/// memory's id; `source_ids` is the array of memories that were
624/// merged (and deleted by the consolidate op).
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct ConsolidatedEventDetails {
627 pub source_ids: Vec<String>,
628 pub source_count: usize,
629}
630
631/// v0.7.0 K4 — `approval_requested` event details.
632///
633/// Fires after a `pending_actions` row has been inserted by the
634/// governance gate (`db::queue_pending_action` from the local store /
635/// promote / delete enforce paths, or `db::upsert_pending_action` from
636/// peer-originated `sync_push` traffic). The K10 Approval API
637/// (HTTP+SSE) consumes the same dispatcher path so the surface is
638/// consistent across delivery transports.
639///
640/// The outer `memory_id` field on [`DispatchPayload`] carries the
641/// pending-action **id** (the row PK of `pending_actions`) — that's the
642/// only identifier subscribers need to round-trip back through
643/// `memory_pending_*` MCP tools or the v0.7 Approval HTTP endpoints.
644/// The `agent_id` field carries the row's `requested_by`.
645///
646/// Adding a new field here is backward-compatible (subscribers ignore
647/// unknowns); renaming or removing a field is breaking — bump the
648/// payload schema version per AI_DEVELOPER_GOVERNANCE.md.
649#[derive(Debug, Clone, Serialize, Deserialize)]
650pub struct ApprovalRequestedEventDetails {
651 /// Discriminator from `pending_actions.action_type` — `"store"`,
652 /// `"delete"`, or `"promote"` (the three [`crate::models::GovernedAction`]
653 /// variants today). Reserved for forward-compat with future gated
654 /// actions; subscribers should treat unknown values as opaque.
655 pub action_type: String,
656 /// `pending_actions.requested_at` (RFC3339). Mirrored into the
657 /// details block so SSE consumers downstream of K10 can render
658 /// the queue-time without a second round-trip.
659 pub requested_at: String,
660 /// `pending_actions.memory_id` — `Some` for delete / promote
661 /// (existing row), `None` for store (no row yet).
662 #[serde(default, skip_serializing_if = "Option::is_none")]
663 pub memory_id: Option<String>,
664 /// Always `"pending"` at insert time. Decided rows do NOT re-fire
665 /// this event — the decision flows through the planned
666 /// `approval_decided` event in K7.
667 pub status: String,
668}
669
670/// `memory_link_invalidated` event (v0.7 J4 / G14) — fires after a
671/// successful `memory_kg_invalidate` writes `valid_until` on the
672/// `(source_id, target_id, relation)` link. The outer `memory_id`
673/// carries the source id (the link-author side); `target_id`,
674/// `relation`, and the freshly-written `valid_until` describe the
675/// supersession edge so consumers can replay the invalidation log
676/// without re-reading `memory_links`. `previous_valid_until`
677/// distinguishes the first supersession (`None`) from an idempotent
678/// retry (carries the prior stamp).
679#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct LinkInvalidatedEventDetails {
681 pub target_id: String,
682 pub relation: String,
683 pub valid_until: String,
684 #[serde(default, skip_serializing_if = "Option::is_none")]
685 pub previous_valid_until: Option<String>,
686}
687
688/// Fire an event to all matching subscribers. Each dispatch is
689/// fire-and-forget — the caller is NOT blocked on delivery. Errors are
690/// logged and counted in the DB via `failure_count`.
691///
692/// PERF-3 (FX-10, 2026-05-26): pre-fix posture was
693/// `std::thread::spawn` + per-worker `Connection::open(...)` per
694/// matching subscriber — 1000 subscribers minted 1000 OS threads + 1000
695/// SQLite handles per store event. Post-fix posture uses a bounded
696/// [`tokio::sync::Semaphore`] (default 32 permits, tunable via
697/// `AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY`) + `tokio::task::spawn_blocking`
698/// against the existing daemon Tokio runtime so the in-flight delivery
699/// count is capped. The pre-fix `std::thread::spawn` path is preserved
700/// as a fallback for legacy unit tests that run outside any runtime.
701///
702/// Caller owns the connection. Dispatch workers still re-open the
703/// connection per delivery to update counters (cheap — `SQLite`
704/// connections are process-shared via WAL — and the new concurrency
705/// bound keeps the simultaneous handle count bounded too).
706///
707/// P5 (G9): convenience wrapper for the historical no-details case
708/// (used by `memory_store`). New event types should call
709/// `dispatch_event_with_details` and pass the matching
710/// `*EventDetails` struct serialised to JSON.
711pub fn dispatch_event(
712 conn: &Connection,
713 event: &str,
714 memory_id: &str,
715 namespace: &str,
716 agent_id: Option<&str>,
717 db_path: &std::path::Path,
718) {
719 dispatch_event_with_details(conn, event, memory_id, namespace, agent_id, db_path, None);
720}
721
722/// P5 (G9): full lifecycle dispatch with optional event-specific
723/// details. The details JSON is FLATTENED into the dispatch payload —
724/// keys must not collide with the outer envelope (`event`,
725/// `memory_id`, `namespace`, `agent_id`, `delivered_at`). The four
726/// new event types (`memory_promote`, `memory_delete`,
727/// `memory_link_created`, `memory_consolidated`) supply their
728/// `*EventDetails` struct serialised via `serde_json::to_value`.
729pub fn dispatch_event_with_details(
730 conn: &Connection,
731 event: &str,
732 memory_id: &str,
733 namespace: &str,
734 agent_id: Option<&str>,
735 db_path: &std::path::Path,
736 details: Option<serde_json::Value>,
737) {
738 // Dispatch path needs the global view (every tenant's subscriptions
739 // for this event), so `None` here is correct — ownership scoping
740 // would silently drop matching subscribers belonging to OTHER
741 // tenants. The cross-tenant authorization gate lives at the wire
742 // surface (MCP/HTTP handlers), not here. See #870/#872/#874.
743 //
744 // v0.7.0 #1097 — pre-filter by event type at the SQL level using
745 // `list_by_event` instead of the legacy `list(conn, None)` full
746 // table scan. The Rust-side `matches_filters` below stays as the
747 // authoritative gate (it covers the namespace/agent filters which
748 // the SQL prefilter doesn't honour); only the coarse event-type
749 // filter moves into SQL. At 100 subscribers the write-event
750 // fanout cost drops from O(N) to whatever fraction actually
751 // subscribes to the fired event type.
752 let subs = match list_by_event(conn, event) {
753 Ok(s) => s,
754 Err(e) => {
755 tracing::warn!("subscription list failed during dispatch: {e}");
756 return;
757 }
758 };
759 // Resolve each matching sub's secret_hash from sqlite BEFORE
760 // dispatching to the per-sub worker pool. #932 (v0.7.0 Track D,
761 // 2026-05-20) extracted this into `dispatch_event_to_subs` so the
762 // postgres-backed `dispatch_event_postgres` path can resolve
763 // secret_hash from the subscription memory's metadata instead.
764 //
765 // v0.7.0 #1072 — secret_hash now reuses the dispatch caller's
766 // `&Connection` instead of opening a fresh `Connection::open(...)`
767 // per matching subscription. Saves one connection open per
768 // subscriber in the fanout.
769 let matching: Vec<(Subscription, Option<String>)> = subs
770 .into_iter()
771 .filter(|s| {
772 matches_filters(
773 &s.events,
774 s.event_types.as_deref(),
775 s.namespace_filter.as_deref(),
776 s.agent_filter.as_deref(),
777 event,
778 namespace,
779 agent_id,
780 )
781 })
782 .map(|s| {
783 let secret_hash = load_secret_hash_with_conn(conn, &s.id).unwrap_or(None);
784 (s, secret_hash)
785 })
786 .collect();
787 dispatch_event_to_subs(
788 matching, event, memory_id, namespace, agent_id, db_path, details,
789 );
790}
791
792/// #932 (v0.7.0 Track D, 2026-05-20) — backend-neutral per-sub
793/// dispatch worker pool. Takes an already-resolved
794/// `Vec<(Subscription, Option<secret_hash>)>` so the sqlite path
795/// (via `dispatch_event_with_details`) and the postgres path (via
796/// `dispatch_event_postgres`) share the same delivery + audit +
797/// DLQ + HMAC code while their subscription-source lookups remain
798/// adapter-specific.
799///
800/// The `db_path` argument is still the SQLite audit-mirror path —
801/// `record_subscription_event` / `record_dispatch` / `record_dlq`
802/// still write to sqlite even on postgres-backed daemons because
803/// every daemon currently keeps a sqlite scratch DB alongside the
804/// SAL store handle. A future SAL audit-log surface (#-tracking)
805/// will route this through the trait too.
806pub fn dispatch_event_to_subs(
807 matching: Vec<(Subscription, Option<String>)>,
808 event: &str,
809 memory_id: &str,
810 namespace: &str,
811 agent_id: Option<&str>,
812 db_path: &std::path::Path,
813 details: Option<serde_json::Value>,
814) {
815 if matching.is_empty() {
816 return;
817 }
818 // PERF — warm the reqwest::blocking TLS connector once per process,
819 // off the delivery critical path, so the first webhook does not pay
820 // the cold root-cert init inside its ACK_TIMEOUT retry budget. See
821 // `prewarm_dispatch_tls`.
822 {
823 static KICK: std::sync::Once = std::sync::Once::new();
824 KICK.call_once(|| {
825 std::thread::spawn(prewarm_dispatch_tls);
826 });
827 }
828 // Timestamp is part of the canonical string the signature is
829 // computed over. Receivers SHOULD reject requests whose timestamp
830 // differs from their clock by more than 5 minutes (replay window).
831 // (#301 item 1 — prior implementation had no replay protection.)
832 let timestamp = chrono::Utc::now().timestamp().to_string();
833
834 // PERF-3 (FX-10, 2026-05-26) — bridge the dispatch fan-out onto
835 // the existing Tokio runtime. We grab the current Handle once per
836 // batch; the per-subscriber worker is enqueued as
837 // `handle.spawn(async { semaphore.acquire().await;
838 // spawn_blocking(|| { … reqwest::blocking::send + audit
839 // writes … }).await })`. The semaphore caps in-flight deliveries
840 // at [`DEFAULT_WEBHOOK_DISPATCH_CONCURRENCY`] (operator-tunable
841 // via `AI_MEMORY_WEBHOOK_DISPATCH_CONCURRENCY`) so 1000 matching
842 // subscribers no longer mint 1000 OS threads + 1000 SQLite
843 // handles.
844 //
845 // Fallback: when no Tokio runtime is attached to the calling
846 // thread (e.g. legacy CLI tests that call `dispatch_event`
847 // outside a runtime), fall back to the pre-fix
848 // `std::thread::spawn` path. Production daemons (CLI/MCP/HTTP)
849 // ALL run under `#[tokio::main]`, so this fallback is the cold
850 // path.
851 let handle = tokio::runtime::Handle::try_current().ok();
852 for (sub, sub_secret_hash) in matching {
853 // v0.7.0 K6 — UUIDv7 correlation id is generated per
854 // (subscription, event) pair so receivers can correlate ACKs
855 // back to the dispatched payload across the retry ladder.
856 // Generated here (not inside the worker thread) so the
857 // ordering invariant — correlation_ids monotonic in
858 // dispatch-loop order — holds even when worker threads race.
859 let correlation_id = uuid::Uuid::now_v7().to_string();
860 let payload = DispatchPayload {
861 event,
862 memory_id,
863 namespace,
864 agent_id,
865 delivered_at: chrono::Utc::now().to_rfc3339(),
866 correlation_id: &correlation_id,
867 details: details.clone(),
868 };
869 let body = match serde_json::to_string(&payload) {
870 Ok(s) => s,
871 Err(e) => {
872 tracing::warn!("dispatch payload serialize failed: {e}");
873 continue;
874 }
875 };
876 let url = sub.url.clone();
877 let sub_id = sub.id.clone();
878 let event_owned = event.to_string();
879 let ts = timestamp.clone();
880 let db_path = db_path.to_path_buf();
881 let secret_hash_owned = sub_secret_hash.clone();
882
883 // PERF-3 (FX-10) — the entire per-subscriber delivery body
884 // is captured by a `FnOnce()` closure so it can run either
885 // on the Tokio blocking pool (production path) or on a
886 // freshly-spawned `std::thread` (no-runtime fallback). The
887 // body is otherwise unchanged from the pre-fix code.
888 let work = move || {
889 // v0.7.0 #1072 — open ONE sqlite connection for the
890 // whole delivery instead of 4-5 across the
891 // event-audit / status-update / dispatch-counter / DLQ
892 // sites. Saves the per-connection PRAGMA + WAL setup
893 // cost (~2-5 ms each) on every dispatched event. The
894 // single connection lives on the worker thread stack
895 // for the lifetime of the delivery and is dropped on
896 // thread exit. Same audit posture as the pre-#1072
897 // path — every write site is best-effort and logs on
898 // failure.
899 let worker_conn = match Connection::open(&db_path) {
900 Ok(c) => Some(c),
901 Err(e) => {
902 tracing::warn!(
903 "subscription dispatch: worker_conn open failed: {e}; \
904 falling back to per-write connections"
905 );
906 None
907 }
908 };
909 // Persist the per-delivery audit row BEFORE the network
910 // send so replay-from-cursor (K7) sees a stable record
911 // even if the dispatcher process crashes mid-retry.
912 let event_audit_result = if let Some(c) = worker_conn.as_ref() {
913 record_subscription_event_with_conn(
914 c,
915 &sub_id,
916 &correlation_id,
917 &event_owned,
918 &body,
919 )
920 } else {
921 record_subscription_event(&db_path, &sub_id, &correlation_id, &event_owned, &body)
922 };
923 if let Err(e) = event_audit_result {
924 tracing::warn!("subscription event audit write failed: {e}");
925 }
926 let secret_hash = secret_hash_owned;
927 // Canonical string: "<timestamp>.<body>". Keyed HMAC over
928 // the DB-stored secret hash. Receivers verify by computing
929 // SHA256(plaintext_secret) and then
930 // HMAC-SHA256(key, "<timestamp>.<body>").
931 //
932 // v0.7.0 K7 — when no per-subscription secret is set, fall
933 // back to the process-wide `[hooks.subscription] hmac_secret`
934 // override so operators can sign EVERY outgoing payload
935 // without round-tripping each receiver through `memory_subscribe`.
936 // The plaintext server-wide secret is itself SHA-256-hashed
937 // first so the keying material on the wire matches the
938 // per-subscription path (receivers compute the same
939 // `SHA256(plaintext_secret)` regardless of which path
940 // configured it).
941 let canonical = format!("{ts}.{body}");
942 let signature = match secret_hash.as_deref() {
943 Some(h) => Some(hmac_sha256_hex(h, &canonical)),
944 None => crate::config::active_hooks_hmac_secret().map(|plain| {
945 let key_hash = sha256_hex(&plain);
946 hmac_sha256_hex(&key_hash, &canonical)
947 }),
948 };
949 // R3-S1.HMAC (v0.7.0 fix campaign 2026-05-13): refuse to
950 // dispatch an unsigned payload. New subscriptions cannot
951 // register without a per-sub or server-wide secret (see
952 // `crate::handlers::subscribe` + MCP `handle_subscribe`), so
953 // hitting this branch means a legacy row was persisted before
954 // the gate landed, or the server-wide override was removed
955 // after registration. Either way, fail loudly to the DLQ
956 // instead of dispatching the body in clear so a receiver
957 // never has to guess whether a body is authentic.
958 if signature.is_none() {
959 tracing::error!(
960 "subscription {sub_id} dispatch refused: no per-sub secret AND no \
961 server-wide [hooks.subscription] hmac_secret configured. \
962 Configure one of the two and replay via memory_subscription_replay. \
963 (v0.7.0 fix campaign R3-S1.HMAC, 2026-05-13)"
964 );
965 let outcome = DeliveryOutcome::unsigned_refused();
966 let ok = outcome.success;
967 // v0.7.0 #1072 — reuse `worker_conn` for every sqlite
968 // write below.
969 if let Some(c) = worker_conn.as_ref() {
970 record_dispatch_with_conn(c, &sub_id, ok);
971 update_event_status_with_conn(c, &correlation_id, ok);
972 } else {
973 record_dispatch(&db_path, &sub_id, ok);
974 update_event_status(&db_path, &correlation_id, ok);
975 }
976 let dlq_result = if let Some(c) = worker_conn.as_ref() {
977 record_dlq_with_conn(
978 c,
979 &sub_id,
980 &correlation_id,
981 &event_owned,
982 &body,
983 outcome.attempts,
984 &outcome.last_error,
985 &outcome.first_failed_at,
986 &outcome.last_failed_at,
987 )
988 } else {
989 record_dlq(
990 &db_path,
991 &sub_id,
992 &correlation_id,
993 &event_owned,
994 &body,
995 outcome.attempts,
996 &outcome.last_error,
997 &outcome.first_failed_at,
998 &outcome.last_failed_at,
999 )
1000 };
1001 if let Err(e) = dlq_result {
1002 tracing::warn!("subscription DLQ write failed: {e}");
1003 }
1004 return;
1005 }
1006 let outcome =
1007 deliver_with_retry(&url, &body, &ts, signature.as_deref(), &correlation_id);
1008 let ok = outcome.success;
1009 // v0.7.0 #1072 — reuse `worker_conn` for every sqlite write
1010 // below.
1011 if let Some(c) = worker_conn.as_ref() {
1012 record_dispatch_with_conn(c, &sub_id, ok);
1013 update_event_status_with_conn(c, &correlation_id, ok);
1014 } else {
1015 record_dispatch(&db_path, &sub_id, ok);
1016 update_event_status(&db_path, &correlation_id, ok);
1017 }
1018 if !ok {
1019 let dlq_result = if let Some(c) = worker_conn.as_ref() {
1020 record_dlq_with_conn(
1021 c,
1022 &sub_id,
1023 &correlation_id,
1024 &event_owned,
1025 &body,
1026 outcome.attempts,
1027 &outcome.last_error,
1028 &outcome.first_failed_at,
1029 &outcome.last_failed_at,
1030 )
1031 } else {
1032 record_dlq(
1033 &db_path,
1034 &sub_id,
1035 &correlation_id,
1036 &event_owned,
1037 &body,
1038 outcome.attempts,
1039 &outcome.last_error,
1040 &outcome.first_failed_at,
1041 &outcome.last_failed_at,
1042 )
1043 };
1044 if let Err(e) = dlq_result {
1045 tracing::warn!("subscription DLQ write failed: {e}");
1046 }
1047 }
1048 };
1049
1050 // PERF-3 (FX-10) — production path: bounded semaphore + Tokio
1051 // blocking pool. The semaphore caps concurrent in-flight
1052 // deliveries at the operator-tunable bound; permits are
1053 // released when the `_permit` guard drops at the end of the
1054 // worker. Cold path (no runtime attached): legacy
1055 // `std::thread::spawn`. Tests use the production path because
1056 // `#[tokio::test]` attaches a runtime to the calling thread.
1057 if let Some(rt) = handle.as_ref() {
1058 let permit_sem = dispatch_semaphore();
1059 rt.spawn(async move {
1060 // Acquire-then-blocking-step. `acquire_owned` returns
1061 // an `OwnedSemaphorePermit` that gets moved into the
1062 // blocking task and is dropped when the worker
1063 // returns — releasing the slot for the next pending
1064 // dispatch.
1065 let permit = match permit_sem.acquire_owned().await {
1066 Ok(p) => p,
1067 Err(e) => {
1068 tracing::warn!(
1069 "subscription dispatch: semaphore acquire failed: {e}; \
1070 dropping delivery (semaphore closed)"
1071 );
1072 return;
1073 }
1074 };
1075 // The reqwest::blocking + rusqlite work is sync; run
1076 // it on the Tokio blocking pool so we don't pin a
1077 // runtime worker thread on the LLM-slow HTTP send.
1078 if let Err(e) = tokio::task::spawn_blocking(move || {
1079 work();
1080 drop(permit);
1081 })
1082 .await
1083 {
1084 tracing::warn!("subscription dispatch: spawn_blocking join failed: {e}");
1085 }
1086 });
1087 } else {
1088 // Fallback: no runtime in scope. This path is exercised
1089 // only by legacy unit tests that call `dispatch_event`
1090 // outside `#[tokio::test]`; production daemons always
1091 // run under `#[tokio::main]`.
1092 std::thread::spawn(work);
1093 }
1094 }
1095}
1096
1097/// v0.7.0 K4 — dispatch the `approval_requested` lifecycle event for a
1098/// freshly-inserted `pending_actions` row.
1099///
1100/// Thin convenience wrapper around [`dispatch_event_with_details`]:
1101/// - Resolves the canonical row via [`crate::db::get_pending_action`]
1102/// so the payload reflects what was actually committed (not what
1103/// the caller *intended* to commit).
1104/// - Synthesises an [`ApprovalRequestedEventDetails`] block from the
1105/// row.
1106/// - Routes the event through the existing subscription dispatch
1107/// path so opt-in subscribers (`event_types: ["approval_requested"]`)
1108/// and legacy wildcard subscribers both receive it.
1109///
1110/// Best-effort and fire-and-forget — same posture as the K2
1111/// `pending_action_expired` dispatch in
1112/// [`crate::daemon_runtime::spawn_pending_timeout_sweep_loop`]. A
1113/// dispatch failure must NOT roll back the pending-action row.
1114///
1115/// Caller passes the `pending_id` returned from
1116/// [`crate::db::queue_pending_action`] / [`crate::db::upsert_pending_action`].
1117/// A missing or unreadable row is logged and otherwise treated as a
1118/// no-op (lost-event semantics, never block the write path).
1119pub fn dispatch_approval_requested(conn: &Connection, pending_id: &str, db_path: &std::path::Path) {
1120 let pa = match crate::db::get_pending_action(conn, pending_id) {
1121 Ok(Some(pa)) => pa,
1122 Ok(None) => {
1123 tracing::warn!(
1124 "approval_requested dispatch skipped: pending_action {pending_id} not found"
1125 );
1126 return;
1127 }
1128 Err(e) => {
1129 tracing::warn!(
1130 "approval_requested dispatch skipped: pending_action {pending_id} read failed: {e}"
1131 );
1132 return;
1133 }
1134 };
1135 let details = ApprovalRequestedEventDetails {
1136 action_type: pa.action_type.clone(),
1137 requested_at: pa.requested_at.clone(),
1138 memory_id: pa.memory_id.clone(),
1139 status: pa.status.clone(),
1140 };
1141 let details_value = match serde_json::to_value(&details) {
1142 Ok(v) => Some(v),
1143 Err(e) => {
1144 tracing::warn!("approval_requested dispatch details serialise failed: {e}");
1145 None
1146 }
1147 };
1148 // v0.7.0 K10 — publish on the in-process approval bus so HTTP SSE
1149 // subscribers see the new pending row in real time. Best-effort
1150 // (no receivers → swallowed); never blocks the gate path.
1151 crate::approvals::publish(crate::approvals::ApprovalEvent::ApprovalRequested {
1152 pending_id: pa.id.clone(),
1153 action_type: pa.action_type.clone(),
1154 namespace: pa.namespace.clone(),
1155 requested_by: pa.requested_by.clone(),
1156 requested_at: pa.requested_at.clone(),
1157 });
1158 dispatch_event_with_details(
1159 conn,
1160 webhook_events::APPROVAL_REQUESTED,
1161 &pa.id,
1162 &pa.namespace,
1163 Some(&pa.requested_by),
1164 db_path,
1165 details_value,
1166 );
1167}
1168
1169/// v0.7.0 K6 — outcome of a single attempt or full retry ladder.
1170///
1171/// `success` is true once the receiver has returned 2xx AND a JSON
1172/// body of the form `{"status":"ack","correlation_id":"<id>"}` whose
1173/// id matches the one we dispatched. `attempts` is the number of
1174/// network requests issued (1..=4 — initial + 3 retries). `last_error`
1175/// is the short error string from the last failed attempt (empty on
1176/// success). The `first_failed_at` / `last_failed_at` pair brackets
1177/// the retry window for DLQ analytics.
1178struct DeliveryOutcome {
1179 success: bool,
1180 attempts: i64,
1181 last_error: String,
1182 first_failed_at: String,
1183 last_failed_at: String,
1184}
1185
1186impl DeliveryOutcome {
1187 /// R3-S1.HMAC (v0.7.0 fix campaign 2026-05-13): synthesise a failure
1188 /// outcome for a dispatch refused at the gate because neither a
1189 /// per-sub secret nor a server-wide override was configured. The
1190 /// DLQ row carries an explicit `last_error` so operators can tell a
1191 /// missing-secret refusal apart from a transport failure.
1192 fn unsigned_refused() -> Self {
1193 let now = chrono::Utc::now().to_rfc3339();
1194 Self {
1195 success: false,
1196 attempts: 0,
1197 last_error: "dispatch refused: no per-subscription secret AND no server-wide \
1198 [hooks.subscription] hmac_secret configured (v0.7.0 R3-S1.HMAC)"
1199 .to_string(),
1200 first_failed_at: now.clone(),
1201 last_failed_at: now,
1202 }
1203 }
1204}
1205
1206/// v0.7.0 K6 — dispatcher driver. Issues the initial POST plus up to
1207/// three retries spaced [200ms, 1s, 5s] apart. Each attempt validates
1208/// the receiver's ACK body — a 2xx response with no ACK or a
1209/// mismatched correlation_id counts as failure and triggers the next
1210/// retry. Returns the cumulative [`DeliveryOutcome`].
1211/// One-time process-global warm-up of the `reqwest::blocking` TLS
1212/// connector used by webhook delivery.
1213///
1214/// The first `reqwest::blocking::Client` constructed in a process
1215/// builds the TLS connector, which loads the system root-certificate
1216/// store. On some platforms (notably macOS, where the load goes
1217/// through Security.framework) that first load can take several
1218/// seconds and is process-cached thereafter. Because webhook delivery
1219/// rebuilds a fresh client per attempt (the #1082 per-call
1220/// `.resolve()` DNS pin precludes a fully process-shared client), a
1221/// cold first delivery would otherwise pay that init on attempt 1 — and
1222/// when it exceeds [`ACK_TIMEOUT`] the K6 retry budget is consumed by
1223/// the cold init rather than by real delivery failures, delaying (or,
1224/// under a short receiver poll window, failing) the first webhook of a
1225/// process by tens of seconds.
1226///
1227/// Calling this is idempotent via an internal [`std::sync::Once`].
1228/// [`dispatch_event_to_subs`] kicks it on a background thread the first
1229/// time it has work to deliver, so a long-lived daemon warms its
1230/// connector ahead of real load. Integration tests that assert delivery
1231/// latency call it synchronously during setup so the one-time cost
1232/// lands before the timed assertion window.
1233pub fn prewarm_dispatch_tls() {
1234 static WARM: std::sync::Once = std::sync::Once::new();
1235 WARM.call_once(|| {
1236 // Build on a dedicated OS thread, then join. `reqwest::blocking`'s
1237 // client constructor blocks waiting for its internal runtime
1238 // thread to spin up, which panics if invoked directly on a thread
1239 // that is currently driving a Tokio runtime (an async test, or a
1240 // daemon runtime worker). A freshly-spawned `std::thread` has no
1241 // ambient runtime, so blocking there is always allowed. Joining
1242 // makes the warm-up synchronous for the caller (tests rely on the
1243 // connector being warm once this returns).
1244 let _ = std::thread::spawn(|| {
1245 if let Err(e) = reqwest::blocking::Client::builder()
1246 .timeout(ACK_TIMEOUT)
1247 .build()
1248 {
1249 tracing::warn!("webhook dispatch TLS warm-up failed: {e}");
1250 }
1251 })
1252 .join();
1253 });
1254}
1255
1256fn deliver_with_retry(
1257 url: &str,
1258 body: &str,
1259 timestamp: &str,
1260 signature: Option<&str>,
1261 correlation_id: &str,
1262) -> DeliveryOutcome {
1263 let mut attempts: i64 = 0;
1264 let mut first_failed_at = String::new();
1265 let mut last_failed_at = String::new();
1266 let mut last_error = String::new();
1267 // Total attempts = 1 (initial) + RETRY_BACKOFFS.len() (retries).
1268 for attempt_idx in 0..=RETRY_BACKOFFS.len() {
1269 if attempt_idx > 0 {
1270 std::thread::sleep(RETRY_BACKOFFS[attempt_idx - 1]);
1271 }
1272 attempts += 1;
1273 match send(url, body, timestamp, signature, correlation_id) {
1274 Ok(()) => {
1275 return DeliveryOutcome {
1276 success: true,
1277 attempts,
1278 last_error: String::new(),
1279 first_failed_at,
1280 last_failed_at,
1281 };
1282 }
1283 Err(e) => {
1284 let now = chrono::Utc::now().to_rfc3339();
1285 if first_failed_at.is_empty() {
1286 first_failed_at = now.clone();
1287 }
1288 last_failed_at = now;
1289 last_error = e;
1290 }
1291 }
1292 }
1293 DeliveryOutcome {
1294 success: false,
1295 attempts,
1296 last_error,
1297 first_failed_at,
1298 last_failed_at,
1299 }
1300}
1301
1302/// Perform one HTTP POST with SSRF-hardened URL check + signature
1303/// + timestamp headers.
1304///
1305/// v0.7.0 K6 — return Ok(()) only when the receiver returns 2xx AND
1306/// a JSON ACK body (`{"status":"ack","correlation_id":"..."}`) whose
1307/// `correlation_id` matches the dispatched id within
1308/// [`ACK_TIMEOUT`]. Anything else (network error, non-2xx, ACK
1309/// timeout, mismatched correlation id) returns Err with a short
1310/// reason string the retry driver records.
1311fn send(
1312 url: &str,
1313 body: &str,
1314 timestamp: &str,
1315 signature: Option<&str>,
1316 correlation_id: &str,
1317) -> Result<(), String> {
1318 if let Err(e) = validate_url(url) {
1319 tracing::warn!("SSRF guard rejected webhook URL {url}: {e}");
1320 return Err(format!("ssrf-rejected: {e}"));
1321 }
1322 // v0.7.0 #1082 (SR-1 #2, HIGH) — DNS-rebind TOCTOU fix. Resolve
1323 // the host once via the SSRF guard AND capture the validated
1324 // addresses, then bind reqwest's resolver to exactly those
1325 // addresses via `Client::builder().resolve(host, addr)`. Pre-
1326 // #1082 reqwest did its own independent DNS query at send time,
1327 // letting an attacker-controlled DNS server (TTL=0 / split-
1328 // horizon) return a public IP on the daemon's first lookup
1329 // (passing the guard) and a private IP on reqwest's second
1330 // lookup (the webhook payload posted internally). The new
1331 // resolver override pins reqwest's connect to the daemon's
1332 // already-validated IP set; reqwest's own DNS query never
1333 // happens for this client.
1334 let (resolved_host, validated_addrs) =
1335 match validate_url_dns_resolved(url, crate::config::allow_loopback_webhooks()) {
1336 Ok(t) => t,
1337 Err(e) => {
1338 tracing::warn!("DNS SSRF guard rejected webhook URL {url}: {e}");
1339 return Err(format!("dns-ssrf-rejected: {e}"));
1340 }
1341 };
1342 // v0.7.0 SR-W3 (HIGH) — redirect SSRF-pin bypass. The
1343 // `builder.resolve(host, addr)` pins below shadow reqwest's DNS
1344 // for the *validated* host only. reqwest's default redirect
1345 // policy follows up to 10 hops and re-resolves DNS for the new
1346 // Location host — a host whose addresses were never SSRF-validated
1347 // — so a webhook endpoint that returns `302 Location:
1348 // http://169.254.169.254/...` would let reqwest connect to an
1349 // internal address the guard never cleared. Disabling redirects
1350 // closes that window: a 3xx is surfaced as a non-success status
1351 // (`http-{status}` below) and fails the dispatch safely, exactly
1352 // like any other non-2xx.
1353 let mut builder = reqwest::blocking::Client::builder()
1354 .timeout(ACK_TIMEOUT)
1355 .redirect(reqwest::redirect::Policy::none());
1356 for addr in &validated_addrs {
1357 // Pin reqwest's per-host override. The override SHADOWS
1358 // reqwest's own DNS query for this host on this client,
1359 // closing the rebind window.
1360 builder = builder.resolve(&resolved_host, *addr);
1361 }
1362 // v0.7.0 #1073 + #1082 (SR-3 perf + SR-2 SSRF) — the SSRF
1363 // hardening at #1082 requires per-call per-host DNS pinning via
1364 // `builder.resolve(host, addr)`; that pin lives on the client
1365 // itself, so a fully process-wide shared client can't hold pins
1366 // that vary per dispatched URL. We still amortize the bulk of
1367 // the work (TLS provider init, default connector) by sharing
1368 // the resolver+TLS state through this thread-local builder.
1369 // A future custom-resolver refactor will move the per-call pin
1370 // onto a trait-object `dns::Resolve` so the client itself can be
1371 // process-shared (matching the federation `peer.rs` pattern, and
1372 // re-introduced through `RuntimeContext` at that point). For now
1373 // correctness (SSRF closure) wins over the per-attempt client
1374 // rebuild cost. The prior `dispatch_http_client()` scaffolding
1375 // was removed in #1196 — it had zero production callers, only a
1376 // test pin kept it alive.
1377 let client = match builder.build() {
1378 Ok(c) => c,
1379 Err(e) => {
1380 tracing::warn!("webhook client build failed: {e}");
1381 return Err(format!("client-build: {e}"));
1382 }
1383 };
1384 let mut req = client
1385 .post(url)
1386 .header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
1387 .header(
1388 "user-agent",
1389 format!("ai-memory/{}", env!("CARGO_PKG_VERSION")),
1390 )
1391 .header(crate::HEADER_AI_MEMORY_TIMESTAMP, timestamp)
1392 .header("x-ai-memory-correlation-id", correlation_id);
1393 if let Some(sig) = signature {
1394 req = req.header(crate::HEADER_AI_MEMORY_SIGNATURE, format!("sha256={sig}"));
1395 }
1396 let resp = match req.body(body.to_string()).send() {
1397 Ok(r) => r,
1398 Err(e) => {
1399 tracing::warn!("webhook POST to {url} failed: {e}");
1400 return Err(crate::errors::msg::network(e));
1401 }
1402 };
1403 if !resp.status().is_success() {
1404 let status = resp.status().as_u16();
1405 return Err(format!("http-{status}"));
1406 }
1407 // K6 ACK contract: receivers MUST return
1408 // {"status":"ack","correlation_id":"..."}. A 2xx with a missing /
1409 // mismatched body is treated as failure so retries kick in.
1410 let ack_body = match resp.text() {
1411 Ok(s) => s,
1412 Err(e) => return Err(format!("ack-read: {e}")),
1413 };
1414 let ack: serde_json::Value = match serde_json::from_str(&ack_body) {
1415 Ok(v) => v,
1416 Err(e) => return Err(format!("ack-decode: {e}")),
1417 };
1418 let status_field = ack.get("status").and_then(|v| v.as_str()).unwrap_or("");
1419 if status_field != "ack" {
1420 return Err(format!("ack-status: {status_field}"));
1421 }
1422 let ack_corr = ack
1423 .get("correlation_id")
1424 .and_then(|v| v.as_str())
1425 .unwrap_or("");
1426 if ack_corr != correlation_id {
1427 return Err(format!("ack-corr-mismatch: {ack_corr}"));
1428 }
1429 Ok(())
1430}
1431
1432/// Hash a plaintext secret (SHA-256 hex).
1433pub(crate) fn sha256_hex(s: &str) -> String {
1434 let mut hasher = Sha256::new();
1435 hasher.update(s.as_bytes());
1436 format!("{:x}", hasher.finalize())
1437}
1438
1439/// HMAC-SHA256 is expensive to implement from scratch; do the simple
1440/// construction manually using the hashed secret as key material.
1441/// Matches the RFC-2104 HMAC construction with SHA-256 as the
1442/// primitive.
1443pub(crate) fn hmac_sha256_hex(key_hex: &str, body: &str) -> String {
1444 const BLOCK: usize = 64;
1445 // v0.7.0 #1048 (Agent-5 #8) — decode the operator-supplied
1446 // `hmac_secret` as hex. The pre-#1048 behaviour silently fell
1447 // back to using the raw config bytes as HMAC key material when
1448 // the hex decode failed, which produced a stable-but-WEAK key
1449 // (`HMAC(b"not-a-hex-key!!", body)` instead of the intended
1450 // hex-decoded bytes). The fallback is preserved for wire
1451 // compatibility because both sender and receiver are typically
1452 // configured from the same secret string — flipping the
1453 // fallback would silently break running federations — but
1454 // every invalid-hex compute emits a WARN so an operator
1455 // running with a misconfigured secret sees the diagnostic on
1456 // every webhook dispatch. The boot-time validator
1457 // [`validate_hmac_secret_hex`] surfaces this BEFORE the daemon
1458 // accepts traffic; operators who want strict hex enforcement
1459 // call the validator at startup.
1460 let mut key = match hex_decode(key_hex) {
1461 Some(k) => k,
1462 None => {
1463 tracing::warn!(
1464 target: SUBSCRIPTIONS_TRACE_TARGET,
1465 "hmac_sha256_hex: hmac_secret is not valid hex (len={}); falling back to raw \
1466 bytes as key material — this produces a STABLE BUT WEAK key. Re-encode the \
1467 secret as hex (e.g. `openssl rand -hex 32`) and restart the daemon. \
1468 See #1048.",
1469 key_hex.len(),
1470 );
1471 key_hex.as_bytes().to_vec()
1472 }
1473 };
1474 if key.len() > BLOCK {
1475 let mut h = Sha256::new();
1476 h.update(&key);
1477 key = h.finalize().to_vec();
1478 }
1479 key.resize(BLOCK, 0);
1480 let mut opad = [0x5cu8; BLOCK];
1481 let mut ipad = [0x36u8; BLOCK];
1482 for i in 0..BLOCK {
1483 opad[i] ^= key[i];
1484 ipad[i] ^= key[i];
1485 }
1486 let mut inner = Sha256::new();
1487 inner.update(ipad);
1488 inner.update(body.as_bytes());
1489 let inner_digest = inner.finalize();
1490 let mut outer = Sha256::new();
1491 outer.update(opad);
1492 outer.update(inner_digest);
1493 format!("{:x}", outer.finalize())
1494}
1495
1496fn hex_decode(s: &str) -> Option<Vec<u8>> {
1497 if !s.len().is_multiple_of(2) {
1498 return None;
1499 }
1500 (0..s.len())
1501 .step_by(2)
1502 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
1503 .collect()
1504}
1505
1506/// v0.7.0 #1048 (Agent-5 #8) — boot-time hex validator for the
1507/// operator-supplied `hmac_secret`. Returns `Ok(())` when the value
1508/// is `None` (no secret configured) OR is valid hex; returns
1509/// `Err(String)` with a descriptive operator-facing message when
1510/// the value is non-hex. Callers MUST surface the error before the
1511/// daemon accepts traffic; the runtime `hmac_sha256_hex` will
1512/// otherwise silently degrade to a stable-but-WEAK key (raw bytes
1513/// of the misconfigured secret) on every dispatch.
1514///
1515/// # Errors
1516///
1517/// Returns the validation error string when the input is
1518/// `Some(secret)` and `secret` fails the hex parse. The message
1519/// includes recommended remediation (`openssl rand -hex 32`).
1520pub fn validate_hmac_secret_hex(secret: Option<&str>) -> Result<(), String> {
1521 let Some(secret) = secret else {
1522 return Ok(());
1523 };
1524 if hex_decode(secret).is_some() {
1525 return Ok(());
1526 }
1527 Err(format!(
1528 "[hooks.subscription] hmac_secret is not valid hex (len={}): expected an even-length \
1529 hex string. Generate a fresh secret with `openssl rand -hex 32` and update the \
1530 config. The runtime still computes a (weak) HMAC under the misconfigured value for \
1531 wire compatibility, but the boot validator refuses to start so the operator sees \
1532 the diagnostic immediately. See #1048.",
1533 secret.len(),
1534 ))
1535}
1536
1537/// SSRF guard with DNS resolution (#301 item 2). Resolves the host
1538/// via the stdlib resolver and rejects if ANY returned
1539/// `SocketAddr`'s IP is private / loopback / link-local. Guards
1540/// against DNS-rebind attacks where an attacker-controlled hostname
1541/// resolves to an internal IP at connect time.
1542///
1543/// Runs in the dispatch thread (blocking). Best-effort: if DNS fails
1544/// we let reqwest surface the error rather than fail closed, because
1545/// transient DNS outages should not silently drop webhook delivery.
1546pub fn validate_url_dns(url: &str) -> Result<()> {
1547 validate_url_dns_with(url, crate::config::allow_loopback_webhooks()).map(|_| ())
1548}
1549
1550/// v0.7.0 #1082 (SR-1 #2, HIGH) — DNS-rebind TOCTOU fix. Returns
1551/// the validated host + the pre-resolved `Vec<SocketAddr>` so the
1552/// caller can pin reqwest's resolver to the SAME addresses the
1553/// SSRF guard cleared. Pre-#1082 the guard resolved once,
1554/// validated, and discarded the addresses — reqwest then did its
1555/// OWN independent resolve at send time, opening a DNS-rebind
1556/// window where the second resolve could return a private IP that
1557/// the daemon never saw.
1558///
1559/// # Errors
1560/// - URL has no scheme.
1561/// - DNS resolution failed AND `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL`
1562/// is not set (post-#1053 fail-CLOSED).
1563/// - Any resolved address is private / link-local (or loopback when
1564/// `allow_loopback` is false).
1565pub(crate) fn validate_url_dns_resolved(
1566 url: &str,
1567 allow_loopback: bool,
1568) -> Result<(String, Vec<std::net::SocketAddr>)> {
1569 validate_url_dns_with(url, allow_loopback)
1570}
1571
1572/// `true` when the operator opted into the legacy permissive SSRF
1573/// posture via `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL` (`1` / `true`).
1574/// Shared by the resolution-failure branch and the RFC 1035
1575/// hostname-shape branch so the two read the override identically.
1576fn ssrf_dns_fail_open() -> bool {
1577 std::env::var("AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL")
1578 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
1579 .unwrap_or(false)
1580}
1581
1582/// RFC 1035 §2.3.4 hostname shape check: every dot-separated label
1583/// must be 1..=63 octets and the whole name (sans one optional
1584/// trailing dot) must be <=253 octets. Returns `true` when the host
1585/// VIOLATES the shape.
1586///
1587/// FX-Fwin (#1053 follow-up) — `getaddrinfo(3)` is documented to
1588/// reject an oversized label with `EAI_*`, but Windows' resolver
1589/// does NOT enforce the 63-octet ceiling at the API boundary (it
1590/// synthesizes a hit / returns `Ok`), so the cross-platform CI
1591/// `Check (windows-latest)` job saw the SSRF guard pass an oversized
1592/// label that macOS/Linux reject. A portable SSRF guard must
1593/// validate the shape itself rather than delegating to libc. IP
1594/// literals (no oversized labels) are unaffected.
1595fn hostname_shape_invalid(host: &str) -> bool {
1596 let name = host.strip_suffix('.').unwrap_or(host);
1597 if name.is_empty() || name.len() > 253 {
1598 return true;
1599 }
1600 name.split('.')
1601 .any(|label| label.is_empty() || label.len() > 63)
1602}
1603
1604/// H11 inner helper: takes `allow_loopback` explicitly so tests can
1605/// assert both branches without poking the process-wide atomic
1606/// (which would race with parallel tests). Production callers go
1607/// through `validate_url_dns`.
1608///
1609/// v0.7.0 #1082 — returns the resolved `(host, addresses)` so the
1610/// `send` path can install a reqwest `Client::builder().resolve()`
1611/// override pinning the connect to exactly the IPs the guard
1612/// cleared. Closes the DNS-rebind TOCTOU window.
1613fn validate_url_dns_with(
1614 url: &str,
1615 allow_loopback: bool,
1616) -> Result<(String, Vec<std::net::SocketAddr>)> {
1617 let lower = url.to_ascii_lowercase();
1618 let (_scheme, rest) = lower
1619 .split_once("://")
1620 .ok_or_else(|| anyhow!("webhook URL missing scheme: {url}"))?;
1621 let host_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
1622 let host_port = &rest[..host_end];
1623 // v0.7.0 #1082 — extract the host (sans port + brackets) for the
1624 // reqwest `Client::builder().resolve(host, addr)` override the
1625 // caller installs. The override matches by host string the
1626 // reqwest URL parser produces, so we strip the brackets / port
1627 // here to match.
1628 let resolved_host = {
1629 let s = host_port;
1630 if let Some(close_idx) = s.strip_prefix('[').and(s.find(']')) {
1631 // [ipv6]:port or [ipv6] — return the inner ipv6 text.
1632 s[1..close_idx].to_string()
1633 } else if let Some(idx) = s.rfind(':') {
1634 // Hostname:port — strip the port. (IPv4-with-port. Bare
1635 // ipv4 has no `:` so falls through to the else branch.)
1636 s[..idx].to_string()
1637 } else {
1638 s.to_string()
1639 }
1640 };
1641 // Supply a default port so ToSocketAddrs resolves correctly.
1642 // SSRF fix (W11): bracketed IPv6 without an explicit port ("[fe80::1]"
1643 // with no trailing ":N") was previously passed to ToSocketAddrs as-is,
1644 // which errors with "invalid port value" — and the catch-all `Err(_) =>
1645 // return Ok(())` below treated that as a DNS hiccup, silently bypassing
1646 // the SSRF guard. Detect the no-trailing-port form and append `:80` so
1647 // resolution succeeds and the IP is checked.
1648 let resolv_target =
1649 if let Some(close_idx) = host_port.strip_prefix('[').and(host_port.find(']')) {
1650 let after_bracket = &host_port[close_idx + 1..];
1651 if after_bracket.starts_with(':') {
1652 // [ipv6]:port — already has a port
1653 host_port.to_string()
1654 } else {
1655 // [ipv6] without port — append default
1656 format!("{host_port}:80")
1657 }
1658 } else if host_port.contains(':') {
1659 // IPv4:port or hostname:port — use as-is
1660 host_port.to_string()
1661 } else {
1662 format!("{host_port}:80")
1663 };
1664 // v0.7.0 #1053 (Agent-2 #3) — fail-CLOSED on DNS resolution
1665 // failure. Pre-#1053 a SERVFAIL / timeout / hang at the daemon's
1666 // resolver returned Ok(()) here, but the subsequent
1667 // `reqwest::send` performs its OWN DNS resolution under
1668 // potentially-attacker-controlled DNS (TTL-zero / DNS rebind).
1669 // The attacker's first resolution at the daemon could SERVFAIL
1670 // (bypassing the SSRF guard), then their second resolution under
1671 // reqwest could return a private-range or link-local IP
1672 // (169.254.169.254 cloud metadata, 127.0.0.1:5432 Postgres,
1673 // 10.0.0.0/8 internal services). Closing the gap by treating
1674 // DNS failure as Err means an attacker can't smuggle internal
1675 // IPs through a DNS-rebind path even if the daemon's resolver
1676 // hiccups.
1677 //
1678 // Operators with environments where transient DNS pressure is
1679 // expected (containers with flaky CoreDNS, etc.) can opt back
1680 // into the legacy permissive posture via
1681 // `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1`. The unsafe override is
1682 // logged at WARN on every fire so an audit can detect the
1683 // legacy-permissive mode.
1684 // FX-Fwin (#1053 follow-up) — enforce RFC 1035 hostname shape
1685 // IN-GUARD before resolution. Windows' `getaddrinfo` does not
1686 // reject an oversized (>63-octet) DNS label at the API boundary,
1687 // so relying on the resolver to reject it (as the pre-FX-Fwin code
1688 // did) made the guard platform-dependent. A shape violation is
1689 // treated identically to a DNS resolution failure: fail-CLOSED
1690 // unless the operator opted into the legacy permissive posture.
1691 if hostname_shape_invalid(&resolved_host) {
1692 if ssrf_dns_fail_open() {
1693 tracing::warn!(
1694 target: SUBSCRIPTIONS_TRACE_TARGET,
1695 "SSRF guard: hostname {resolved_host} violates RFC 1035 label/length \
1696 limits for {url}; AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 — degrading to \
1697 ALLOW (UNSAFE, legacy posture)"
1698 );
1699 return Ok((resolved_host, Vec::new()));
1700 }
1701 return Err(anyhow!(
1702 "SSRF guard: DNS resolution failed for {url}: hostname violates RFC 1035 \
1703 label/length limits; failing CLOSED (post-#1053 secure default — set \
1704 AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 to revert)"
1705 ));
1706 }
1707 let addrs: Vec<std::net::SocketAddr> = match resolv_target.to_socket_addrs() {
1708 Ok(iter) => iter.collect(),
1709 Err(e) => {
1710 let fail_open = ssrf_dns_fail_open();
1711 if fail_open {
1712 tracing::warn!(
1713 target: SUBSCRIPTIONS_TRACE_TARGET,
1714 "SSRF guard: DNS resolution failed for {url}: {e}; \
1715 AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 — degrading to ALLOW \
1716 (UNSAFE, legacy posture) — reqwest's resolver may bind to \
1717 private/loopback IPs the daemon could not pre-check"
1718 );
1719 // Empty address list — caller's resolver-override
1720 // loop is a no-op and reqwest falls back to its
1721 // own DNS query (the explicit UNSAFE legacy posture).
1722 return Ok((resolved_host, Vec::new()));
1723 }
1724 return Err(anyhow!(
1725 "SSRF guard: DNS resolution failed for {url}: {e}; failing CLOSED \
1726 (post-#1053 secure default — set AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 to revert)"
1727 ));
1728 }
1729 };
1730 for addr in &addrs {
1731 let ip = addr.ip();
1732 if is_private(ip) && !is_loopback_normalized(ip) {
1733 return Err(anyhow!(
1734 "host resolves to private/link-local IP {ip}: {url}"
1735 ));
1736 }
1737 // H11 (#628 blocker) — DNS-rebind protection for loopback.
1738 // Default-OFF; operators with `[subscriptions]
1739 // allow_loopback_webhooks = true` accept loopback-resolving
1740 // hostnames.
1741 if is_loopback_normalized(ip) && !allow_loopback {
1742 return Err(anyhow!(
1743 "host resolves to loopback IP {ip}: {url} — rejected by default \
1744 (SSRF guard); set `[subscriptions] allow_loopback_webhooks = true` \
1745 to opt in"
1746 ));
1747 }
1748 }
1749 // v0.7.0 #1082 — return the resolved host + addr list so the
1750 // caller can pin reqwest's connect to exactly these addresses.
1751 Ok((resolved_host, addrs))
1752}
1753
1754/// SSRF guard. Rejects URLs that would cause the daemon to connect
1755/// to private-range addresses, link-local, loopback (except
1756/// explicitly), or non-HTTPS remote hosts.
1757pub fn validate_url(url: &str) -> Result<()> {
1758 validate_url_with(url, crate::config::allow_loopback_webhooks())
1759}
1760
1761/// H11 inner helper: takes `allow_loopback` explicitly so tests can
1762/// assert both branches without poking the process-wide atomic
1763/// (which would race with parallel tests). Production callers go
1764/// through `validate_url`.
1765fn validate_url_with(url: &str, allow_loopback: bool) -> Result<()> {
1766 // Cheap scheme check without pulling the `url` crate.
1767 let lower = url.to_ascii_lowercase();
1768 let (scheme, rest) = lower
1769 .split_once("://")
1770 .ok_or_else(|| anyhow!("webhook URL missing scheme: {url}"))?;
1771 if scheme != "https" && scheme != "http" {
1772 return Err(anyhow!("webhook URL scheme must be http(s): {url}"));
1773 }
1774 // Extract host (portion before '/' or ':' or '?'). IPv6 URLs use
1775 // `[ipv6]:port` syntax — the brackets must be stripped and the
1776 // colon-split must skip the colons inside the v6 literal.
1777 let host_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
1778 let host_port = &rest[..host_end];
1779 let host: String = if let Some(stripped) = host_port.strip_prefix('[') {
1780 // IPv6: host is everything before the closing bracket.
1781 match stripped.find(']') {
1782 Some(i) => stripped[..i].to_string(),
1783 None => return Err(anyhow!("malformed IPv6 URL host: {url}")),
1784 }
1785 } else {
1786 // IPv4 / hostname.
1787 host_port
1788 .rsplit_once(':')
1789 .map_or(host_port.to_string(), |(h, _)| h.to_string())
1790 };
1791 let host = host.as_str();
1792 // H11 (#628 blocker): loopback hostnames + IPs are rejected by
1793 // default. Operators who need to point a webhook at a local
1794 // listener (CI, dev) opt in via `[subscriptions]
1795 // allow_loopback_webhooks = true`. Default-OFF closes an
1796 // authenticated SSRF gadget against local services (Postgres on
1797 // 5432, the hooks daemon, etc.).
1798 let is_loopback_hostname = matches!(host, "localhost" | "localhost.localdomain" | "");
1799 let parsed_ip = IpAddr::from_str(host).ok();
1800 let is_loopback_ip = parsed_ip.is_some_and(is_loopback_normalized);
1801 let is_loopback = is_loopback_hostname || is_loopback_ip;
1802 if is_loopback && !allow_loopback {
1803 return Err(anyhow!(
1804 "webhook URL targets loopback address {url} — rejected by default \
1805 (SSRF guard); set `[subscriptions] allow_loopback_webhooks = true` \
1806 to opt in (testing / dev only)"
1807 ));
1808 }
1809 if scheme == "http" && !is_loopback {
1810 // Accept http only to parsed-loopback IPs; everything else
1811 // requires https.
1812 if let Some(ip) = parsed_ip {
1813 if !is_loopback_normalized(ip) {
1814 return Err(anyhow!(
1815 "webhook URL must be https for non-loopback host: {url}"
1816 ));
1817 }
1818 } else {
1819 return Err(anyhow!(
1820 "webhook URL must be https for non-loopback host: {url}"
1821 ));
1822 }
1823 }
1824 // Reject private-range IPs regardless of scheme (RFC1918 / RFC4193 /
1825 // link-local). Hostnames that resolve to private ranges are not
1826 // caught here — the dispatch thread will still be able to reach
1827 // them; operators who want to reach internal services should set
1828 // up reverse proxies or allow explicitly in config.
1829 if let Some(ip) = parsed_ip
1830 && is_private(ip)
1831 && !is_loopback_normalized(ip)
1832 {
1833 return Err(anyhow!(
1834 "webhook URL targets private / link-local address: {url}"
1835 ));
1836 }
1837 Ok(())
1838}
1839
1840// SSRF fix (#628 H3 follow-up): canonicalize IPv6 addresses that wrap an
1841// IPv4 address (4-mapped `::ffff:a.b.c.d`, deprecated 4-compatible
1842// `::a.b.c.d`, and the well-known NAT64 prefix `64:ff9b::/96`) into their
1843// IPv4 form before applying SSRF checks. Without this, `Ipv6Addr::is_loopback()`
1844// and the v6 branch of `is_private` silently miss `::ffff:127.0.0.1`,
1845// `::ffff:10.0.0.1`, `::ffff:169.254.1.1`, and similar bypasses.
1846fn normalize_ip(ip: IpAddr) -> IpAddr {
1847 match ip {
1848 IpAddr::V6(v6) => {
1849 let canonical = v6.to_canonical();
1850 if matches!(canonical, IpAddr::V4(_)) {
1851 return canonical;
1852 }
1853 let segs = v6.segments();
1854 if segs[0] == 0x0064
1855 && segs[1] == 0xff9b
1856 && segs[2] == 0
1857 && segs[3] == 0
1858 && segs[4] == 0
1859 && segs[5] == 0
1860 {
1861 let v4_bits = (u32::from(segs[6]) << 16) | u32::from(segs[7]);
1862 return IpAddr::V4(Ipv4Addr::from(v4_bits));
1863 }
1864 IpAddr::V6(v6)
1865 }
1866 v4 @ IpAddr::V4(_) => v4,
1867 }
1868}
1869
1870fn is_loopback_normalized(ip: IpAddr) -> bool {
1871 normalize_ip(ip).is_loopback()
1872}
1873
1874fn is_private(ip: IpAddr) -> bool {
1875 match normalize_ip(ip) {
1876 IpAddr::V4(v4) => {
1877 // SSRF fix (W11): include `is_unspecified` (0.0.0.0). On most
1878 // OSes the kernel routes 0.0.0.0 to a local listener, so an
1879 // attacker-controlled hostname resolving to 0.0.0.0 hits the
1880 // local box.
1881 v4.is_private()
1882 || v4.is_link_local()
1883 || v4.is_multicast()
1884 || v4.is_broadcast()
1885 || v4.is_unspecified()
1886 }
1887 IpAddr::V6(v6) => {
1888 // Conservative: reject unique-local (fc00::/7), link-local
1889 // (fe80::/10), multicast, and the unspecified address `::`.
1890 // SSRF fix (W11): `is_unspecified` covers `[::]`, which most
1891 // kernels route to local services.
1892 let segs = v6.segments();
1893 v6.is_multicast()
1894 || v6.is_unspecified()
1895 || (segs[0] & 0xfe00) == 0xfc00 // ULA
1896 || (segs[0] & 0xffc0) == 0xfe80 // link-local
1897 }
1898 }
1899}
1900
1901/// v0.7.0 #1072 — kept for the existing test harness and the
1902/// public-API contract. Production dispatch routes through
1903/// [`load_secret_hash_with_conn`] to reuse the caller's connection.
1904#[allow(dead_code)]
1905fn load_secret_hash(db_path: &std::path::Path, sub_id: &str) -> Result<Option<String>> {
1906 let conn = Connection::open(db_path).context("load_secret_hash open")?;
1907 load_secret_hash_with_conn(&conn, sub_id)
1908}
1909
1910/// v0.7.0 #1072 — `Connection`-reuse variant of [`load_secret_hash`].
1911/// Lets the per-subscription dispatch worker open ONE
1912/// `Connection::open` per delivery (instead of 4-5 across the
1913/// secret-load / event-audit / status-update / dispatch-counter /
1914/// DLQ sites). Same pattern as the #1017 hook-sink fix.
1915fn load_secret_hash_with_conn(conn: &Connection, sub_id: &str) -> Result<Option<String>> {
1916 conn.query_row(
1917 "SELECT secret_hash FROM subscriptions WHERE id = ?1",
1918 params![sub_id],
1919 |r| r.get::<_, Option<String>>(0),
1920 )
1921 .context("load_secret_hash query")
1922}
1923
1924/// v0.7.0 K6 — append a `subscription_events` audit row for one
1925/// outgoing delivery. Called from the dispatch worker BEFORE the
1926/// network send so replay-from-cursor (K7) sees a stable record even
1927/// if the dispatcher process crashes mid-retry. The row is created
1928/// with `delivery_status = 'pending'`; [`update_event_status`]
1929/// transitions it to `'ack'` / `'failed'` once the retry ladder
1930/// settles.
1931pub fn record_subscription_event(
1932 db_path: &std::path::Path,
1933 sub_id: &str,
1934 correlation_id: &str,
1935 event_type: &str,
1936 payload: &str,
1937) -> Result<()> {
1938 let conn = Connection::open(db_path).context("subscription_events open")?;
1939 record_subscription_event_with_conn(&conn, sub_id, correlation_id, event_type, payload)
1940}
1941
1942/// v0.7.0 #1072 — `Connection`-reuse variant. Used by the per-
1943/// subscription dispatch worker so a single thread-local connection
1944/// covers every sqlite write in the delivery path.
1945pub fn record_subscription_event_with_conn(
1946 conn: &Connection,
1947 sub_id: &str,
1948 correlation_id: &str,
1949 event_type: &str,
1950 payload: &str,
1951) -> Result<()> {
1952 let now = chrono::Utc::now().to_rfc3339();
1953 conn.execute(
1954 "INSERT INTO subscription_events \
1955 (subscription_id, correlation_id, event_type, payload, delivered_at, delivery_status) \
1956 VALUES (?1, ?2, ?3, ?4, ?5, 'pending')",
1957 params![sub_id, correlation_id, event_type, payload, now],
1958 )
1959 .context("subscription_events insert")?;
1960 Ok(())
1961}
1962
1963/// v0.7.0 K6 — transition the audit row's `delivery_status` after the
1964/// retry ladder settles. Best-effort: a failure here is logged and
1965/// otherwise ignored so the dispatcher loop never blocks on the
1966/// audit table.
1967fn update_event_status(db_path: &std::path::Path, correlation_id: &str, ok: bool) {
1968 let Ok(conn) = Connection::open(db_path) else {
1969 return;
1970 };
1971 update_event_status_with_conn(&conn, correlation_id, ok);
1972}
1973
1974/// v0.7.0 #1072 — `Connection`-reuse variant of
1975/// [`update_event_status`].
1976fn update_event_status_with_conn(conn: &Connection, correlation_id: &str, ok: bool) {
1977 let status = if ok { "ack" } else { "failed" };
1978 let _ = conn.execute(
1979 "UPDATE subscription_events SET delivery_status = ?1 WHERE correlation_id = ?2",
1980 params![status, correlation_id],
1981 );
1982}
1983
1984/// v0.7.0 K6 — append a `subscription_dlq` row for a delivery that
1985/// exhausted the [200ms, 1s, 5s] retry ladder. K7's inspector tool
1986/// surfaces these rows to operators; K6 only ships the writer.
1987#[allow(clippy::too_many_arguments)]
1988pub fn record_dlq(
1989 db_path: &std::path::Path,
1990 sub_id: &str,
1991 correlation_id: &str,
1992 event_type: &str,
1993 payload: &str,
1994 retry_count: i64,
1995 last_error: &str,
1996 first_failed_at: &str,
1997 last_failed_at: &str,
1998) -> Result<()> {
1999 let conn = Connection::open(db_path).context("subscription_dlq open")?;
2000 record_dlq_with_conn(
2001 &conn,
2002 sub_id,
2003 correlation_id,
2004 event_type,
2005 payload,
2006 retry_count,
2007 last_error,
2008 first_failed_at,
2009 last_failed_at,
2010 )
2011}
2012
2013/// #1253 (MED, 2026-05-25) — operator-disk-fill DoS guard. Hard cap
2014/// on the per-subscription `subscription_dlq` depth. Pre-#1253 the
2015/// DLQ grew unboundedly when a hostile (or simply broken) webhook
2016/// target failed every delivery — a single bad subscriber could
2017/// exhaust the operator's disk by sustaining a high failure rate.
2018/// Past this cap [`record_dlq_with_conn`] refuses the insert with
2019/// the typed `dlq_overflow` error, increments the
2020/// `ai_memory_subscription_dlq_overflow_total` counter, and emits a
2021/// `tracing::warn!`. Operators drain the queue via
2022/// `memory_subscription_dlq_list` + the planned `... dlq drain`
2023/// admin tool before resetting. 10_000 rows is well above realistic
2024/// transient-failure depths (a healthy peer recovers in minutes, not
2025/// thousands of events) while still bounded enough that a hostile
2026/// peer can write at most ~10 MB before being capped (the payload
2027/// column is itself bounded by the webhook JSON-body cap upstream).
2028pub const MAX_SUBSCRIPTION_DLQ_ROWS: i64 = 10_000;
2029
2030/// v0.7.0 #1072 — `Connection`-reuse variant of [`record_dlq`].
2031#[allow(clippy::too_many_arguments)]
2032pub fn record_dlq_with_conn(
2033 conn: &Connection,
2034 sub_id: &str,
2035 correlation_id: &str,
2036 event_type: &str,
2037 payload: &str,
2038 retry_count: i64,
2039 last_error: &str,
2040 first_failed_at: &str,
2041 last_failed_at: &str,
2042) -> Result<()> {
2043 // #1253 — refuse the insert if the per-subscription DLQ is full.
2044 // Counting before the INSERT keeps the cap honest under
2045 // concurrent dispatch threads (the SQLite single-writer lock
2046 // serialises this read+write pair against any sibling DLQ
2047 // insert against the same subscription).
2048 let depth: i64 = conn
2049 .query_row(
2050 "SELECT COUNT(*) FROM subscription_dlq WHERE subscription_id = ?1",
2051 params![sub_id],
2052 |row| row.get(0),
2053 )
2054 .context("subscription_dlq depth probe")?;
2055 if depth >= MAX_SUBSCRIPTION_DLQ_ROWS {
2056 crate::metrics::record_subscription_dlq_overflow();
2057 tracing::warn!(
2058 subscription_id = %sub_id,
2059 correlation_id = %correlation_id,
2060 event_type = %event_type,
2061 depth = depth,
2062 cap = MAX_SUBSCRIPTION_DLQ_ROWS,
2063 "dlq_overflow: refusing subscription_dlq insert — per-subscription cap reached",
2064 );
2065 return Err(anyhow!(
2066 "dlq_overflow: subscription {sub_id} dlq at cap ({MAX_SUBSCRIPTION_DLQ_ROWS}); drain before further inserts"
2067 ));
2068 }
2069 conn.execute(
2070 "INSERT INTO subscription_dlq \
2071 (subscription_id, correlation_id, event_type, payload, retry_count, last_error, first_failed_at, last_failed_at) \
2072 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
2073 params![
2074 sub_id,
2075 correlation_id,
2076 event_type,
2077 payload,
2078 retry_count,
2079 last_error,
2080 first_failed_at,
2081 last_failed_at,
2082 ],
2083 )
2084 .context("subscription_dlq insert")?;
2085 Ok(())
2086}
2087
2088/// v0.7.0 K6 — list `subscription_dlq` rows. Used by the K7
2089/// inspector tool (not registered in MCP yet) and by the K6
2090/// integration test suite.
2091pub fn list_dlq(conn: &Connection, subscription_id: Option<&str>) -> Result<Vec<DlqEntry>> {
2092 let mut out = Vec::new();
2093 if let Some(sub_id) = subscription_id {
2094 let mut stmt = conn.prepare(
2095 "SELECT id, subscription_id, correlation_id, event_type, payload, retry_count, last_error, first_failed_at, last_failed_at \
2096 FROM subscription_dlq WHERE subscription_id = ?1 ORDER BY id ASC",
2097 )?;
2098 let rows = stmt.query_map(params![sub_id], dlq_row_to_entry)?;
2099 for r in rows {
2100 out.push(r?);
2101 }
2102 } else {
2103 let mut stmt = conn.prepare(
2104 "SELECT id, subscription_id, correlation_id, event_type, payload, retry_count, last_error, first_failed_at, last_failed_at \
2105 FROM subscription_dlq ORDER BY id ASC",
2106 )?;
2107 let rows = stmt.query_map([], dlq_row_to_entry)?;
2108 for r in rows {
2109 out.push(r?);
2110 }
2111 }
2112 Ok(out)
2113}
2114
2115fn dlq_row_to_entry(row: &rusqlite::Row) -> rusqlite::Result<DlqEntry> {
2116 Ok(DlqEntry {
2117 id: row.get(0)?,
2118 subscription_id: row.get(1)?,
2119 correlation_id: row.get(2)?,
2120 event_type: row.get(3)?,
2121 payload: row.get(4)?,
2122 retry_count: row.get(5)?,
2123 last_error: row.get(6)?,
2124 first_failed_at: row.get(7)?,
2125 last_failed_at: row.get(8)?,
2126 })
2127}
2128
2129/// v0.7.0 K6 — replay subscription events for a single subscription
2130/// since `since_rfc3339`. Returns the audit rows ordered by
2131/// `delivered_at` ascending (so cursor-by-time scans are stable).
2132///
2133/// **MCP wiring (v0.7 K7, landed):** the companion
2134/// `memory_subscription_replay` MCP tool IS registered in the
2135/// dispatch table — see `MEMORY_SUBSCRIPTION_REPLAY` in
2136/// `src/mcp/registry.rs` (entry in `tool_names::ALL`) and the
2137/// `handle_subscription_replay` handler in
2138/// `src/mcp/tools/subscribe.rs`. The "K6 deferred to K7" gating
2139/// noted in the original comment is closed; this docstring was
2140/// stale per the v0.7.0 multi-agent literal-sweep (scanner B
2141/// finding F-B6.x).
2142pub fn replay_subscription_events(
2143 conn: &Connection,
2144 subscription_id: &str,
2145 since_rfc3339: &str,
2146) -> Result<Vec<SubscriptionEvent>> {
2147 let mut stmt = conn.prepare(
2148 "SELECT id, subscription_id, correlation_id, event_type, payload, delivered_at, delivery_status \
2149 FROM subscription_events \
2150 WHERE subscription_id = ?1 AND delivered_at >= ?2 \
2151 ORDER BY delivered_at ASC, id ASC",
2152 )?;
2153 let rows = stmt.query_map(params![subscription_id, since_rfc3339], |row| {
2154 Ok(SubscriptionEvent {
2155 id: row.get(0)?,
2156 subscription_id: row.get(1)?,
2157 correlation_id: row.get(2)?,
2158 event_type: row.get(3)?,
2159 payload: row.get(4)?,
2160 delivered_at: row.get(5)?,
2161 delivery_status: row.get(6)?,
2162 })
2163 })?;
2164 let mut out = Vec::new();
2165 for r in rows {
2166 out.push(r.context("subscription_events row decode")?);
2167 }
2168 Ok(out)
2169}
2170
2171/// v0.7.0 K6 — handler for `memory_subscription_replay`. K7 wired
2172/// this into the MCP dispatch table behind the existing
2173/// `memory_subscription_*` family — see
2174/// `src/mcp/tools/subscribe.rs::handle_subscription_replay` for the
2175/// thin MCP wrapper that delegates here. The K6-vintage "DO NOT add
2176/// to MCP dispatch table" warning in the prior docstring was stale
2177/// per the v0.7.0 multi-agent literal-sweep (scanner B finding
2178/// F-B6.x); registration has been in tree since K7.
2179pub fn memory_subscription_replay(
2180 conn: &Connection,
2181 subscription_id: &str,
2182 since_rfc3339: &str,
2183) -> Result<serde_json::Value> {
2184 let events = replay_subscription_events(conn, subscription_id, since_rfc3339)?;
2185 Ok(serde_json::json!({
2186 (field_names::SUBSCRIPTION_ID): subscription_id,
2187 "since": since_rfc3339,
2188 "count": events.len(),
2189 "events": events,
2190 }))
2191}
2192
2193fn record_dispatch(db_path: &std::path::Path, sub_id: &str, ok: bool) {
2194 let Ok(conn) = Connection::open(db_path) else {
2195 return;
2196 };
2197 record_dispatch_with_conn(&conn, sub_id, ok);
2198}
2199
2200/// v0.7.0 #1072 — `Connection`-reuse variant of [`record_dispatch`].
2201fn record_dispatch_with_conn(conn: &Connection, sub_id: &str, ok: bool) {
2202 let now = chrono::Utc::now().to_rfc3339();
2203 let sql = if ok {
2204 "UPDATE subscriptions SET dispatch_count = dispatch_count + 1, last_dispatched_at = ?1 WHERE id = ?2"
2205 } else {
2206 "UPDATE subscriptions SET dispatch_count = dispatch_count + 1, failure_count = failure_count + 1, last_dispatched_at = ?1 WHERE id = ?2"
2207 };
2208 let _ = conn.execute(sql, params![now, sub_id]);
2209}
2210
2211#[cfg(test)]
2212mod tests {
2213 use super::*;
2214
2215 /// Serializes the two #1053 tests that mutate the process-global
2216 /// `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL` env var. Without it they
2217 /// race under the default parallel test runner: the fail-open
2218 /// test's `set_var` can be observed by the fail-closed test
2219 /// mid-flight, flipping its expected `Err` (fail-CLOSED) into an
2220 /// `Ok` (legacy permissive) and panicking. Poison-tolerant via
2221 /// `into_inner` so one panicking test doesn't cascade-fail the
2222 /// other.
2223 static SSRF_ENV_GUARD: std::sync::Mutex<()> = std::sync::Mutex::new(());
2224
2225 #[test]
2226 fn https_allowed() {
2227 assert!(validate_url("https://example.com/hook").is_ok());
2228 assert!(validate_url("https://api.example.com:8443/hook?x=1").is_ok());
2229 }
2230
2231 /// Regression for #1265 — webhook dispatch User-Agent must track
2232 /// `CARGO_PKG_VERSION` rather than the v0.6.0.0 string the original
2233 /// site hardcoded. The `format!` here mirrors the dispatch site in
2234 /// `fn send()` so a bit-rot of one side is caught by the test.
2235 #[test]
2236 fn webhook_user_agent_tracks_cargo_pkg_version() {
2237 let ua = format!("ai-memory/{}", env!("CARGO_PKG_VERSION"));
2238 let expected = format!("ai-memory/{}", env!("CARGO_PKG_VERSION"));
2239 assert_eq!(ua, expected);
2240 // The legacy hardcoded value MUST NOT be the current expected
2241 // value — guards against a future revert to a literal string.
2242 assert_ne!(ua, "ai-memory/0.6.0.0");
2243 // Shape sanity: matches the documented `ai-memory/<semver>` form.
2244 assert!(ua.starts_with("ai-memory/"));
2245 assert!(ua.len() > "ai-memory/".len());
2246 }
2247
2248 #[test]
2249 fn http_only_to_loopback() {
2250 // H11 inner helper — assert with allow_loopback=true so the
2251 // test does not depend on the test-build default and does not
2252 // race with parallel tests poking the global atomic.
2253 assert!(validate_url_with("http://localhost/hook", true).is_ok());
2254 assert!(validate_url_with("http://127.0.0.1:8080/hook", true).is_ok());
2255 // IPv6 in URLs must be bracketed per RFC 3986 §3.2.2.
2256 assert!(validate_url_with("http://[::1]/hook", true).is_ok());
2257 assert!(validate_url_with("http://example.com/hook", true).is_err());
2258 assert!(validate_url_with("http://8.8.8.8/hook", true).is_err());
2259 }
2260
2261 #[test]
2262 fn loopback_rejected_by_default_h11() {
2263 // H11 (#628 blocker) — loopback URLs are rejected without an
2264 // explicit opt-in. This closes an authenticated SSRF gadget
2265 // against local services (Postgres on 5432, hooks daemon, …).
2266 // Uses the inner helper so the assertion does not race with
2267 // parallel tests that touch `crate::config` or use real
2268 // loopback URLs through `validate_url`.
2269 for url in [
2270 "http://127.0.0.1:5432/hook",
2271 "http://localhost/hook",
2272 "http://[::1]/hook",
2273 "https://127.0.0.1/hook",
2274 "https://localhost/hook",
2275 ] {
2276 let res = validate_url_with(url, false);
2277 assert!(
2278 res.is_err(),
2279 "loopback URL {url} must be rejected when allow_loopback=false (H11), got {res:?}"
2280 );
2281 let msg = res.unwrap_err().to_string();
2282 assert!(
2283 msg.contains("loopback") || msg.contains("SSRF"),
2284 "rejection message should explain loopback policy, got: {msg}"
2285 );
2286 }
2287 }
2288
2289 #[test]
2290 fn loopback_accepted_when_opted_in_h11() {
2291 // H11 — operators who need loopback for CI/testing opt in via
2292 // `[subscriptions] allow_loopback_webhooks = true`. Inner
2293 // helper isolates this test from the global atomic.
2294 assert!(validate_url_with("http://127.0.0.1:9999/hook", true).is_ok());
2295 assert!(validate_url_with("http://localhost/hook", true).is_ok());
2296 assert!(validate_url_with("http://[::1]/hook", true).is_ok());
2297 }
2298
2299 #[test]
2300 fn private_ranges_blocked() {
2301 assert!(validate_url("https://10.0.0.1/hook").is_err());
2302 assert!(validate_url("https://192.168.1.1/hook").is_err());
2303 assert!(validate_url("https://172.16.0.1/hook").is_err());
2304 assert!(validate_url("https://169.254.1.1/hook").is_err());
2305 assert!(validate_url("https://[fc00::1]/hook").is_err());
2306 assert!(validate_url("https://[fe80::1]/hook").is_err());
2307 }
2308
2309 #[test]
2310 fn nonsense_rejected() {
2311 assert!(validate_url("ftp://example.com").is_err());
2312 assert!(validate_url("notaurl").is_err());
2313 assert!(validate_url("").is_err());
2314 }
2315
2316 #[test]
2317 fn rejects_v4_mapped_ipv6_loopback() {
2318 // SSRF (#628 H3 follow-up): `Ipv6Addr::is_loopback()` returns false
2319 // for `::ffff:127.0.0.1`, but on dual-stack hosts the kernel routes
2320 // these to the v4 loopback service. `normalize_ip` collapses to v4
2321 // before checking.
2322 // Use `validate_url_with(.., false)` (loopback-disabled) to
2323 // avoid racing with parallel tests that flip the process-wide
2324 // allow_loopback flag (matches the `loopback_blocked_by_default`
2325 // test pattern below).
2326 assert!(validate_url_with("https://[::ffff:127.0.0.1]/hook", false).is_err());
2327 assert!(validate_url_with("https://[::ffff:7f00:1]/hook", false).is_err());
2328 }
2329
2330 #[test]
2331 fn rejects_v4_mapped_ipv6_private() {
2332 assert!(validate_url_with("https://[::ffff:10.0.0.1]/hook", false).is_err());
2333 assert!(validate_url_with("https://[::ffff:192.168.1.1]/hook", false).is_err());
2334 assert!(validate_url_with("https://[::ffff:172.16.0.1]/hook", false).is_err());
2335 assert!(validate_url_with("https://[::ffff:169.254.1.1]/hook", false).is_err());
2336 assert!(validate_url_with("https://[::ffff:0.0.0.0]/hook", false).is_err());
2337 }
2338
2339 #[test]
2340 fn rejects_nat64_well_known_prefix() {
2341 // 64:ff9b::/96 — RFC 6052 well-known NAT64 prefix. On hosts with
2342 // NAT64 deployed, packets to `64:ff9b::a.b.c.d` are translated to
2343 // `a.b.c.d` and forwarded; an SSRF gadget if `a.b.c.d` is loopback
2344 // or private.
2345 assert!(validate_url_with("https://[64:ff9b::127.0.0.1]/hook", false).is_err());
2346 assert!(validate_url_with("https://[64:ff9b::10.0.0.1]/hook", false).is_err());
2347 assert!(validate_url_with("https://[64:ff9b::169.254.1.1]/hook", false).is_err());
2348 }
2349
2350 #[test]
2351 fn allows_v4_mapped_loopback_when_opted_in() {
2352 // Symmetric with the existing loopback opt-in: when allow_loopback
2353 // is true, `::ffff:127.0.0.1` should be accepted (same as plain
2354 // `127.0.0.1`).
2355 assert!(validate_url_with("http://[::ffff:127.0.0.1]/hook", true).is_ok());
2356 }
2357
2358 #[test]
2359 fn hmac_sha256_stable() {
2360 // Known vector: HMAC-SHA256("key", "The quick brown fox jumps over the lazy dog")
2361 // = f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8
2362 let key = hex::encode_fallback("key".as_bytes());
2363 let got = hmac_sha256_hex(&key, "The quick brown fox jumps over the lazy dog");
2364 assert_eq!(
2365 got,
2366 "f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
2367 );
2368 }
2369
2370 #[test]
2371 fn filter_wildcards() {
2372 assert!(matches_filters(
2373 "*",
2374 None,
2375 None,
2376 None,
2377 "memory_store",
2378 "ns",
2379 None
2380 ));
2381 assert!(matches_filters(
2382 "memory_store,memory_delete",
2383 None,
2384 None,
2385 None,
2386 "memory_store",
2387 "ns",
2388 None
2389 ));
2390 assert!(!matches_filters(
2391 "memory_delete",
2392 None,
2393 None,
2394 None,
2395 "memory_store",
2396 "ns",
2397 None
2398 ));
2399 assert!(matches_filters(
2400 "*",
2401 None,
2402 Some("foo"),
2403 None,
2404 "memory_store",
2405 "foo",
2406 None
2407 ));
2408 assert!(!matches_filters(
2409 "*",
2410 None,
2411 Some("foo"),
2412 None,
2413 "memory_store",
2414 "bar",
2415 None
2416 ));
2417 assert!(matches_filters(
2418 "*",
2419 None,
2420 None,
2421 Some("alice"),
2422 "memory_store",
2423 "ns",
2424 Some("alice")
2425 ));
2426 assert!(!matches_filters(
2427 "*",
2428 None,
2429 None,
2430 Some("alice"),
2431 "memory_store",
2432 "ns",
2433 Some("bob")
2434 ));
2435 }
2436
2437 #[test]
2438 fn filter_event_types_overrides_legacy_events() {
2439 // P5 (G9): when the structured `event_types` opt-in is Some,
2440 // the legacy `events` whitelist is ignored.
2441 let opt_in_store_only: Vec<String> = vec!["memory_store".to_string()];
2442 // Legacy says "all events", structured says "store only" — store
2443 // matches, delete does not.
2444 assert!(matches_filters(
2445 "*",
2446 Some(&opt_in_store_only),
2447 None,
2448 None,
2449 "memory_store",
2450 "ns",
2451 None
2452 ));
2453 assert!(!matches_filters(
2454 "*",
2455 Some(&opt_in_store_only),
2456 None,
2457 None,
2458 "memory_delete",
2459 "ns",
2460 None
2461 ));
2462 // Structured opt-in with multiple types matches each.
2463 let multi: Vec<String> = vec![
2464 "memory_promote".to_string(),
2465 "memory_link_created".to_string(),
2466 ];
2467 assert!(matches_filters(
2468 "memory_store",
2469 Some(&multi),
2470 None,
2471 None,
2472 "memory_promote",
2473 "ns",
2474 None
2475 ));
2476 assert!(!matches_filters(
2477 "memory_store",
2478 Some(&multi),
2479 None,
2480 None,
2481 "memory_store",
2482 "ns",
2483 None
2484 ));
2485 // Empty structured list = no events match (defensive).
2486 let empty: Vec<String> = vec![];
2487 assert!(!matches_filters(
2488 "*",
2489 Some(&empty),
2490 None,
2491 None,
2492 "memory_store",
2493 "ns",
2494 None
2495 ));
2496 }
2497
2498 // ----------------------------------------------------------------
2499 // Wave 10 (L10b) — SSRF coverage for `validate_url_dns`.
2500 //
2501 // `validate_url_dns` is the DNS-resolving SSRF guard. It performs
2502 // `to_socket_addrs()` and inspects the resolved IPs. The current
2503 // production implementation INTENTIONALLY allows loopback IPs
2504 // (`is_private(ip) && !ip.is_loopback()`) so that dev/CI webhooks
2505 // pointed at localhost still work. Tests that target loopback
2506 // therefore assert the documented "ok" behaviour rather than
2507 // "err"; those cases are covered by `validate_url`'s scheme
2508 // gating which forces non-loopback hosts onto https.
2509 //
2510 // Tests below are split into:
2511 // - cases that are correctly rejected today (link-local v6,
2512 // AWS metadata IP, RFC1918 ranges)
2513 // - the documented-behaviour loopback acceptance (kept as
2514 // `is_ok`)
2515 // - public-IP / hostname acceptance
2516 //
2517 // The function signature is `validate_url_dns(&str) -> Result<()>`.
2518 // ----------------------------------------------------------------
2519
2520 #[test]
2521 fn test_validate_url_dns_accepts_loopback_v4() {
2522 // H11 inner helper — assert with allow_loopback=true so the
2523 // test does not race with parallel tests poking the global
2524 // atomic. Dev/CI workflows opt in via config to get this
2525 // behaviour at runtime.
2526 assert!(
2527 validate_url_dns_with("http://127.0.0.1/foo", true).is_ok(),
2528 "127.0.0.1 should be accepted by validate_url_dns when opted in"
2529 );
2530 assert!(
2531 validate_url_dns_with("http://127.0.0.1:8080/", true).is_ok(),
2532 "127.0.0.1:8080 should be accepted by validate_url_dns when opted in"
2533 );
2534 assert!(
2535 validate_url_dns_with("http://localhost/", true).is_ok(),
2536 "localhost should be accepted by validate_url_dns when opted in"
2537 );
2538 }
2539
2540 #[test]
2541 fn test_validate_url_dns_accepts_loopback_v6() {
2542 // Same as v4 — loopback opt-in via inner helper.
2543 assert!(
2544 validate_url_dns_with("http://[::1]/", true).is_ok(),
2545 "[::1] should be accepted by validate_url_dns when opted in"
2546 );
2547 assert!(
2548 validate_url_dns_with("http://[0:0:0:0:0:0:0:1]/", true).is_ok(),
2549 "[::1] expanded form should be accepted when opted in"
2550 );
2551 }
2552
2553 #[test]
2554 fn test_validate_url_dns_rejects_loopback_by_default_h11() {
2555 // H11 — loopback DNS-resolves are rejected by default to
2556 // close DNS-rebind SSRF against local services. Inner helper
2557 // pins allow_loopback=false without touching the global.
2558 assert!(
2559 validate_url_dns_with("http://127.0.0.1/foo", false).is_err(),
2560 "127.0.0.1 must be rejected by validate_url_dns when allow_loopback=false (H11)"
2561 );
2562 assert!(
2563 validate_url_dns_with("http://[::1]/", false).is_err(),
2564 "[::1] must be rejected by validate_url_dns when allow_loopback=false (H11)"
2565 );
2566 }
2567
2568 #[test]
2569 fn test_validate_url_dns_rejects_link_local_ipv6() {
2570 // fe80::/10 is link-local. is_private() flags this and the IP
2571 // is not loopback, so validate_url_dns rejects.
2572 // SSRF fix (W11): bracketed IPv6 hosts without an explicit port
2573 // now get ":80" appended before to_socket_addrs(), so resolution
2574 // succeeds and the IP check fires.
2575 let res = validate_url_dns("http://[fe80::1]/");
2576 assert!(
2577 res.is_err(),
2578 "fe80::1 must be rejected as link-local IPv6, got {res:?}"
2579 );
2580 }
2581
2582 #[test]
2583 fn test_validate_url_dns_rejects_aws_metadata() {
2584 // 169.254.169.254 is the AWS / GCP / Azure instance metadata
2585 // service. RFC3927 link-local; `Ipv4Addr::is_link_local` covers
2586 // 169.254.0.0/16, so validate_url_dns must reject.
2587 let res = validate_url_dns("http://169.254.169.254/latest/meta-data/");
2588 assert!(
2589 res.is_err(),
2590 "AWS metadata IP must be rejected, got {res:?}"
2591 );
2592 }
2593
2594 #[test]
2595 fn test_validate_url_dns_rejects_rfc1918_private_ranges() {
2596 // 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16 are RFC1918.
2597 // `Ipv4Addr::is_private` flags all three; validate_url_dns must
2598 // reject every variant.
2599 for url in [
2600 "http://10.0.0.1/",
2601 "http://172.16.0.1/",
2602 "http://172.31.255.255/",
2603 "http://192.168.1.1/",
2604 ] {
2605 let res = validate_url_dns(url);
2606 assert!(
2607 res.is_err(),
2608 "{url} must be rejected as RFC1918, got {res:?}"
2609 );
2610 }
2611 }
2612
2613 #[test]
2614 fn test_validate_url_dns_accepts_public_ip_or_dns() {
2615 // 1.1.1.1 is Cloudflare's public resolver — never private. We
2616 // intentionally exercise the IP-literal path (no DNS) so the
2617 // test is hermetic and does not rely on network resolution for
2618 // example.com.
2619 assert!(
2620 validate_url_dns("https://1.1.1.1/").is_ok(),
2621 "public IP literal must be accepted"
2622 );
2623 // v0.7.0 #1053 — `example.com` either resolves to a public
2624 // IP (accepted) OR fails to resolve in a hermetic sandbox
2625 // (post-#1053: rejected as fail-closed; pre-#1053: accepted
2626 // with let-reqwest-surface-it). Both outcomes are
2627 // legitimate behaviours of the SSRF guard, so accept either
2628 // status: the test pins that an IP literal is always
2629 // accepted, and the public-hostname legitimacy is exercised
2630 // by the new fail-closed regression test
2631 // `test_validate_url_dns_fails_closed_on_dns_failure_1053`
2632 // below.
2633 let _ = validate_url_dns("https://example.com/");
2634 }
2635
2636 #[test]
2637 fn test_validate_url_dns_fails_closed_on_dns_failure_1053() {
2638 // v0.7.0 #1053 (Agent-2 #3) — DNS resolution failure now
2639 // returns Err so an attacker cannot smuggle a private-range
2640 // IP through a DNS-rebind path where the daemon's resolver
2641 // hiccups and reqwest's later resolution lands on an
2642 // internal target.
2643 //
2644 // FX-F1 (2026-05-27) — hermetic hostname construction.
2645 // The pre-FX-F1 test used `"https://nonexistent-host.invalid./"`
2646 // and relied on RFC 6761 guaranteeing NXDOMAIN for the `.invalid`
2647 // TLD. In practice three platforms violate that guarantee:
2648 // - Windows' built-in resolver consults NetBIOS / WINS /
2649 // DNS search-suffix lists and synthesizes a "hit" for
2650 // bare `.invalid` labels.
2651 // - macOS' mDNSResponder + captive-portal DNS in some
2652 // CI runner network configs (observed on
2653 // `release/v0.7.0` SHA 5bfbb109c → job 78098087774)
2654 // also synthesizes a "hit" so the test panicked with
2655 // `got Ok(())` instead of the expected `Err`.
2656 // - Some corporate DNS resolvers wildcard-redirect any
2657 // unresolvable name to a "did-you-mean" landing page.
2658 //
2659 // To make the test hermetic across every platform we
2660 // construct a hostname that the SSRF guard's RFC 1035 shape
2661 // check rejects at the SHAPE level rather than at the
2662 // DNS-lookup level: each DNS label has a hard 63-octet ceiling
2663 // per RFC 1035 §2.3.4, so a single 70-character label is
2664 // rejected in-guard (FX-Fwin #1053 follow-up) regardless of
2665 // which resolver is configured — including Windows, whose
2666 // `getaddrinfo` does not enforce the ceiling at the API
2667 // boundary. No DNS query is even attempted.
2668 let _env_guard = SSRF_ENV_GUARD.lock().unwrap_or_else(|e| e.into_inner());
2669 let oversized_label = "a".repeat(70);
2670 let url = format!("https://{oversized_label}.fxf1-test./");
2671 let res = validate_url_dns(&url);
2672 assert!(
2673 res.is_err(),
2674 "#1053: SSRF guard MUST fail-closed on DNS resolution failure \
2675 (oversized label rejected at getaddrinfo shape check); got {res:?}"
2676 );
2677 let err_msg = format!("{}", res.unwrap_err());
2678 assert!(
2679 err_msg.contains("failing CLOSED")
2680 && err_msg.contains("AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL"),
2681 "#1053: failure message MUST reference fail-closed posture + env-var escape hatch; got {err_msg:?}"
2682 );
2683 }
2684
2685 #[test]
2686 fn test_validate_url_dns_fail_open_env_overrides_1053() {
2687 // v0.7.0 #1053 — operators with flaky DNS environments can
2688 // opt back into the legacy permissive posture via
2689 // `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1`. Pin the env-var
2690 // contract so a future tweak to the var name fails this
2691 // test loudly.
2692 //
2693 // SAFETY: env mutation in tests is racy across parallel
2694 // threads; we serialise via the var name being highly
2695 // specific so other tests can't trip it. The set + remove
2696 // pair brackets the test region.
2697 // SAFETY: env mutation guarded inside a unit test.
2698 // The current cargo-test process is the only consumer of
2699 // `AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL`. Hold SSRF_ENV_GUARD
2700 // across the whole set→validate→remove window so the
2701 // fail-CLOSED test cannot observe the var mid-flight.
2702 let _env_guard = SSRF_ENV_GUARD.lock().unwrap_or_else(|e| e.into_inner());
2703 unsafe {
2704 std::env::set_var("AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL", "1");
2705 }
2706 // FX-F1: use the same shape-rejected hostname construction
2707 // as `test_validate_url_dns_fails_closed_on_dns_failure_1053`
2708 // for hermetic platform-independence (see that test's
2709 // docstring for rationale).
2710 let oversized_label = "a".repeat(70);
2711 let url = format!("https://{oversized_label}.fxf1-test./");
2712 let res = validate_url_dns(&url);
2713 unsafe {
2714 std::env::remove_var("AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL");
2715 }
2716 assert!(
2717 res.is_ok(),
2718 "#1053: AI_MEMORY_SSRF_GUARD_ALLOW_DNS_FAIL=1 MUST restore the legacy permissive posture; got {res:?}"
2719 );
2720 }
2721
2722 #[test]
2723 fn test_validate_url_dns_rejects_unspecified_addresses() {
2724 // 0.0.0.0 / [::] are "unspecified" addresses. On most OSes
2725 // connecting to 0.0.0.0 routes to localhost — that is an SSRF
2726 // / loopback bypass.
2727 // SSRF fix (W11): `is_private` now flags `is_unspecified` for
2728 // both v4 and v6.
2729 let v4 = validate_url_dns("http://0.0.0.0/");
2730 let v6 = validate_url_dns("http://[::]/");
2731 assert!(
2732 v4.is_err(),
2733 "0.0.0.0 should be rejected as unspecified, got {v4:?}"
2734 );
2735 assert!(
2736 v6.is_err(),
2737 "[::] should be rejected as unspecified, got {v6:?}"
2738 );
2739 }
2740
2741 #[test]
2742 fn test_validate_url_dns_missing_scheme() {
2743 // No `://` separator → explicit Err (not panic).
2744 let res = validate_url_dns("not-a-url");
2745 assert!(res.is_err(), "missing scheme must Err, got {res:?}");
2746 }
2747
2748 // ----------------------------------------------------------------
2749 // Wave 12 (W12-C) — deep coverage on dispatch / send / persistence.
2750 //
2751 // The pre-W12 tests covered URL validation thoroughly but left the
2752 // DB-touching paths (`insert`, `delete`, `list`, `dispatch_event`,
2753 // `record_dispatch`, `load_secret_hash`) and the HTTP send path
2754 // (`send`) at 0 % coverage. These tests use a `tempfile::NamedTempFile`
2755 // to back a real on-disk SQLite (so dispatch threads can re-open the
2756 // connection via `Connection::open(db_path)`) and `wiremock` for HTTP
2757 // (already a dev-dep from W3 / W10).
2758 //
2759 // Style:
2760 // - DB-only tests are `#[test]` (sync) and use a tempfile path.
2761 // - Tests that drive `wiremock` are `#[tokio::test(flavor =
2762 // "multi_thread")]` and run the blocking `send` via
2763 // `tokio::task::spawn_blocking`, mirroring the pattern already in
2764 // `llm.rs::wiremock_tests`.
2765 // ----------------------------------------------------------------
2766
2767 use tempfile::NamedTempFile;
2768
2769 /// Stand up a fresh on-disk SQLite at a tempfile path with the
2770 /// production schema applied. Returns the path and keeps the file
2771 /// alive via the returned `NamedTempFile` (drop deletes it).
2772 fn fresh_db() -> (NamedTempFile, std::path::PathBuf) {
2773 let f = NamedTempFile::new().expect("tempfile");
2774 let p = f.path().to_path_buf();
2775 // Apply schema via the production opener so migrations run.
2776 let _ = crate::db::open(&p).expect("db::open");
2777 (f, p)
2778 }
2779
2780 /// v0.7.0 K6 test helper — wiremock responder that builds a 2xx
2781 /// JSON ACK body whose `correlation_id` field echoes the
2782 /// dispatched id from the `x-ai-memory-correlation-id` request
2783 /// header. Lets the legacy dispatch tests (which previously only
2784 /// asserted "2xx → success") satisfy K6's strict ACK contract
2785 /// without coupling each test to the exact UUID value.
2786 struct AckEcho;
2787 impl wiremock::Respond for AckEcho {
2788 fn respond(&self, request: &wiremock::Request) -> wiremock::ResponseTemplate {
2789 let corr = request
2790 .headers
2791 .get("x-ai-memory-correlation-id")
2792 .map(|v| v.to_str().unwrap_or("").to_string())
2793 .unwrap_or_default();
2794 let body = serde_json::json!({
2795 "status": "ack",
2796 "correlation_id": corr,
2797 });
2798 wiremock::ResponseTemplate::new(200).set_body_json(body)
2799 }
2800 }
2801
2802 // ---------------- insert / delete / list ----------------
2803
2804 #[test]
2805 fn insert_persists_and_list_returns_row() {
2806 let (_keep, path) = fresh_db();
2807 let conn = Connection::open(&path).unwrap();
2808 let id = insert(
2809 &conn,
2810 &NewSubscription {
2811 url: "https://example.com/hook",
2812 events: "memory_store",
2813 secret: Some("s3cret"),
2814 namespace_filter: Some("ns1"),
2815 agent_filter: Some("alice"),
2816 created_by: Some("op"),
2817 event_types: None,
2818 },
2819 )
2820 .unwrap();
2821 assert!(!id.is_empty());
2822
2823 let subs = list(&conn, None).unwrap();
2824 assert_eq!(subs.len(), 1);
2825 let s = &subs[0];
2826 assert_eq!(s.id, id);
2827 assert_eq!(s.url, "https://example.com/hook");
2828 assert_eq!(s.events, "memory_store");
2829 assert_eq!(s.namespace_filter.as_deref(), Some("ns1"));
2830 assert_eq!(s.agent_filter.as_deref(), Some("alice"));
2831 assert_eq!(s.created_by.as_deref(), Some("op"));
2832 assert_eq!(s.dispatch_count, 0);
2833 assert_eq!(s.failure_count, 0);
2834 }
2835
2836 #[test]
2837 fn insert_rejects_invalid_url() {
2838 let (_keep, path) = fresh_db();
2839 let conn = Connection::open(&path).unwrap();
2840 let res = insert(
2841 &conn,
2842 &NewSubscription {
2843 url: "not-a-url",
2844 events: "*",
2845 secret: None,
2846 namespace_filter: None,
2847 agent_filter: None,
2848 created_by: None,
2849 event_types: None,
2850 },
2851 );
2852 assert!(res.is_err(), "insert must reject invalid URL");
2853 }
2854
2855 #[test]
2856 fn insert_hashes_secret_before_persisting() {
2857 let (_keep, path) = fresh_db();
2858 let conn = Connection::open(&path).unwrap();
2859 let plaintext = "super-shared-secret";
2860 let id = insert(
2861 &conn,
2862 &NewSubscription {
2863 url: "https://example.com/h",
2864 events: "*",
2865 secret: Some(plaintext),
2866 namespace_filter: None,
2867 agent_filter: None,
2868 created_by: None,
2869 event_types: None,
2870 },
2871 )
2872 .unwrap();
2873 let stored: Option<String> = conn
2874 .query_row(
2875 "SELECT secret_hash FROM subscriptions WHERE id = ?1",
2876 params![id],
2877 |r| r.get(0),
2878 )
2879 .unwrap();
2880 let hash = stored.expect("secret_hash should be set");
2881 assert_ne!(hash, plaintext, "plaintext secret must not be stored");
2882 assert_eq!(hash, sha256_hex(plaintext));
2883 }
2884
2885 #[test]
2886 fn insert_no_secret_stores_null() {
2887 let (_keep, path) = fresh_db();
2888 let conn = Connection::open(&path).unwrap();
2889 let id = insert(
2890 &conn,
2891 &NewSubscription {
2892 url: "https://example.com/h",
2893 events: "*",
2894 secret: None,
2895 namespace_filter: None,
2896 agent_filter: None,
2897 created_by: None,
2898 event_types: None,
2899 },
2900 )
2901 .unwrap();
2902 let stored: Option<String> = conn
2903 .query_row(
2904 "SELECT secret_hash FROM subscriptions WHERE id = ?1",
2905 params![id],
2906 |r| r.get(0),
2907 )
2908 .unwrap();
2909 assert!(stored.is_none(), "missing secret must persist as NULL");
2910 }
2911
2912 #[test]
2913 fn delete_returns_true_when_row_removed() {
2914 let (_keep, path) = fresh_db();
2915 let conn = Connection::open(&path).unwrap();
2916 let id = insert(
2917 &conn,
2918 &NewSubscription {
2919 url: "https://example.com/h",
2920 events: "*",
2921 secret: None,
2922 namespace_filter: None,
2923 agent_filter: None,
2924 created_by: None,
2925 event_types: None,
2926 },
2927 )
2928 .unwrap();
2929 assert!(delete(&conn, &id, None).unwrap());
2930 assert!(list(&conn, None).unwrap().is_empty());
2931 }
2932
2933 #[test]
2934 fn delete_returns_false_when_row_missing() {
2935 let (_keep, path) = fresh_db();
2936 let conn = Connection::open(&path).unwrap();
2937 assert!(!delete(&conn, "nope", None).unwrap());
2938 }
2939
2940 #[test]
2941 fn list_orders_by_created_at_desc() {
2942 let (_keep, path) = fresh_db();
2943 let conn = Connection::open(&path).unwrap();
2944 // Insert three subs with sleeps so created_at is monotonically
2945 // increasing (rfc3339 to second-or-better resolution).
2946 let id1 = insert(
2947 &conn,
2948 &NewSubscription {
2949 url: "https://a.example.com/",
2950 events: "*",
2951 secret: None,
2952 namespace_filter: None,
2953 agent_filter: None,
2954 created_by: None,
2955 event_types: None,
2956 },
2957 )
2958 .unwrap();
2959 std::thread::sleep(std::time::Duration::from_millis(1100));
2960 let id2 = insert(
2961 &conn,
2962 &NewSubscription {
2963 url: "https://b.example.com/",
2964 events: "*",
2965 secret: None,
2966 namespace_filter: None,
2967 agent_filter: None,
2968 created_by: None,
2969 event_types: None,
2970 },
2971 )
2972 .unwrap();
2973 let subs = list(&conn, None).unwrap();
2974 assert_eq!(subs.len(), 2);
2975 // Most recent first.
2976 assert_eq!(subs[0].id, id2);
2977 assert_eq!(subs[1].id, id1);
2978 }
2979
2980 // ---------------- HMAC / sha256 helpers ----------------
2981
2982 #[test]
2983 fn sha256_hex_known_vector() {
2984 // SHA256("") = e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
2985 assert_eq!(
2986 sha256_hex(""),
2987 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
2988 );
2989 // SHA256("abc") = ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad
2990 assert_eq!(
2991 sha256_hex("abc"),
2992 "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
2993 );
2994 }
2995
2996 #[test]
2997 fn hex_decode_round_trip_and_invalid() {
2998 // Round-trip an even-length valid hex string.
2999 let s = "deadbeef";
3000 let bytes = hex_decode(s).expect("valid hex");
3001 assert_eq!(bytes, vec![0xde, 0xad, 0xbe, 0xef]);
3002 // Odd-length must return None (invariant in the helper).
3003 assert!(hex_decode("abc").is_none());
3004 // Non-hex chars must return None.
3005 assert!(hex_decode("zz").is_none());
3006 }
3007
3008 #[test]
3009 fn validate_hmac_secret_hex_accepts_none_1048() {
3010 // v0.7.0 #1048 — no secret configured = no-op (Ok).
3011 assert!(validate_hmac_secret_hex(None).is_ok());
3012 }
3013
3014 #[test]
3015 fn validate_hmac_secret_hex_accepts_valid_hex_1048() {
3016 // v0.7.0 #1048 — even-length all-hex string passes.
3017 assert!(validate_hmac_secret_hex(Some("deadbeef")).is_ok());
3018 assert!(validate_hmac_secret_hex(Some("0123456789abcdef")).is_ok());
3019 // 64-char (32 byte) — the recommended `openssl rand -hex 32` shape.
3020 let long = "a".repeat(64);
3021 assert!(validate_hmac_secret_hex(Some(&long)).is_ok());
3022 }
3023
3024 #[test]
3025 fn validate_hmac_secret_hex_rejects_non_hex_1048() {
3026 // v0.7.0 #1048 — non-hex secret (operator typo / passphrase
3027 // instead of hex) is rejected at boot validator. The
3028 // production `main.rs` boot path treats this as fatal.
3029 let err = validate_hmac_secret_hex(Some("not-a-hex-key!!"))
3030 .expect_err("non-hex MUST fail validation");
3031 assert!(
3032 err.contains("not valid hex") && err.contains("openssl rand -hex 32"),
3033 "#1048: failure msg MUST reference invalid hex + remediation; got: {err}"
3034 );
3035 }
3036
3037 #[test]
3038 fn validate_hmac_secret_hex_rejects_odd_length_1048() {
3039 // v0.7.0 #1048 — odd-length hex is not parseable; rejected.
3040 let err = validate_hmac_secret_hex(Some("abc")).expect_err("odd-length MUST fail");
3041 assert!(err.contains("not valid hex"));
3042 }
3043
3044 #[test]
3045 fn hmac_sha256_hex_output_is_fixed_64_chars_1039() {
3046 // v0.7.0 #1039 (Agent-5 #6) — HMAC-SHA256 output is ALWAYS
3047 // 32 bytes = 64 hex chars regardless of input. The
3048 // `constant_time_eq` early-return on length mismatch in
3049 // `src/handlers/transport.rs` (used by /api/v1/approvals
3050 // signature verification) leaks the length of the EXPECTED
3051 // sig — but the expected sig is the fixed 64-char output
3052 // of this function, so the leak conveys zero attacker-
3053 // useful entropy. This test pins the fixed-length contract
3054 // so a future hash-agility refactor doesn't accidentally
3055 // produce variable-length output that re-opens the timing
3056 // oracle.
3057 for body in &["", "x", "x".repeat(1024).as_str(), "🦀"] {
3058 let sig = hmac_sha256_hex("deadbeef", body);
3059 assert_eq!(
3060 sig.len(),
3061 64,
3062 "#1039: HMAC-SHA256 hex output MUST be fixed 64 chars; got len={} for body={:?}",
3063 sig.len(),
3064 body
3065 );
3066 assert!(
3067 sig.chars().all(|c| c.is_ascii_hexdigit()),
3068 "#1039: HMAC hex output MUST be all-hex; got {sig}"
3069 );
3070 }
3071 }
3072
3073 #[test]
3074 fn hmac_long_key_is_hashed_to_fit_block() {
3075 // Construct a hex key whose decoded length exceeds the SHA-256
3076 // block size (64 bytes). The HMAC pre-step hashes overlong keys
3077 // to fit; we exercise that branch by giving it a 200-hex-char
3078 // (100-byte) key.
3079 let long_key: String = std::iter::repeat_n('a', 200).collect();
3080 let sig = hmac_sha256_hex(&long_key, "hello");
3081 assert_eq!(sig.len(), 64); // 32-byte SHA-256 in hex
3082 }
3083
3084 #[test]
3085 fn hmac_invalid_hex_key_falls_back_to_raw_bytes() {
3086 // Hex with a non-hex char must trigger the fallback branch
3087 // (use `key_hex.as_bytes()` directly). The signature must still
3088 // be a valid 64-char SHA-256 hex string.
3089 let sig = hmac_sha256_hex("not-a-hex-key!!", "hello");
3090 assert_eq!(sig.len(), 64);
3091 assert!(sig.chars().all(|c| c.is_ascii_hexdigit()));
3092 }
3093
3094 // ---------------- matches_filters edge cases ----------------
3095
3096 #[test]
3097 fn matches_filters_event_with_whitespace_and_star() {
3098 // `*` inside a comma list still matches anything.
3099 assert!(matches_filters(
3100 "memory_store, *",
3101 None,
3102 None,
3103 None,
3104 "anything",
3105 "ns",
3106 None,
3107 ));
3108 // Whitespace around tokens is trimmed.
3109 assert!(matches_filters(
3110 " memory_delete , memory_store ",
3111 None,
3112 None,
3113 None,
3114 "memory_store",
3115 "ns",
3116 None,
3117 ));
3118 }
3119
3120 #[test]
3121 fn matches_filters_agent_filter_requires_some() {
3122 // sub_agent set, but event has no agent → reject.
3123 assert!(!matches_filters(
3124 "*",
3125 None,
3126 None,
3127 Some("alice"),
3128 "memory_store",
3129 "ns",
3130 None,
3131 ));
3132 }
3133
3134 // ---------------- record_dispatch / load_secret_hash ----------------
3135
3136 #[test]
3137 fn record_dispatch_increments_counts_on_success() {
3138 let (_keep, path) = fresh_db();
3139 let id = {
3140 let conn = Connection::open(&path).unwrap();
3141 insert(
3142 &conn,
3143 &NewSubscription {
3144 url: "https://example.com/h",
3145 events: "*",
3146 secret: None,
3147 namespace_filter: None,
3148 agent_filter: None,
3149 created_by: None,
3150 event_types: None,
3151 },
3152 )
3153 .unwrap()
3154 };
3155 record_dispatch(&path, &id, true);
3156 record_dispatch(&path, &id, true);
3157 let conn = Connection::open(&path).unwrap();
3158 let (dc, fc): (i64, i64) = conn
3159 .query_row(
3160 "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3161 params![id],
3162 |r| Ok((r.get(0)?, r.get(1)?)),
3163 )
3164 .unwrap();
3165 assert_eq!(dc, 2, "two successful dispatches must bump dispatch_count");
3166 assert_eq!(fc, 0, "successes must not bump failure_count");
3167 }
3168
3169 #[test]
3170 fn record_dispatch_increments_failure_on_err() {
3171 let (_keep, path) = fresh_db();
3172 let id = {
3173 let conn = Connection::open(&path).unwrap();
3174 insert(
3175 &conn,
3176 &NewSubscription {
3177 url: "https://example.com/h",
3178 events: "*",
3179 secret: None,
3180 namespace_filter: None,
3181 agent_filter: None,
3182 created_by: None,
3183 event_types: None,
3184 },
3185 )
3186 .unwrap()
3187 };
3188 record_dispatch(&path, &id, false);
3189 let conn = Connection::open(&path).unwrap();
3190 let (dc, fc): (i64, i64) = conn
3191 .query_row(
3192 "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3193 params![id],
3194 |r| Ok((r.get(0)?, r.get(1)?)),
3195 )
3196 .unwrap();
3197 assert_eq!(dc, 1, "failed dispatch still bumps dispatch_count");
3198 assert_eq!(fc, 1, "failure must bump failure_count");
3199 }
3200
3201 #[test]
3202 fn record_dispatch_nonexistent_id_does_not_panic() {
3203 let (_keep, path) = fresh_db();
3204 // No subscription with this id; the UPDATE simply matches zero
3205 // rows. Function must not panic and must not poison the DB.
3206 record_dispatch(&path, "no-such-id", true);
3207 record_dispatch(&path, "no-such-id", false);
3208 // Sanity: subscriptions table still queryable.
3209 let conn = Connection::open(&path).unwrap();
3210 let n: i64 = conn
3211 .query_row("SELECT COUNT(*) FROM subscriptions", [], |r| r.get(0))
3212 .unwrap();
3213 assert_eq!(n, 0);
3214 }
3215
3216 #[test]
3217 fn record_dispatch_unopenable_db_path_is_noop() {
3218 // Pointing at a directory that does not exist exercises the
3219 // `Connection::open` early-return branch (let-Err shortcut).
3220 // Must not panic.
3221 let bad = std::path::PathBuf::from("/nonexistent-dir-w12c/does-not-exist.db");
3222 record_dispatch(&bad, "x", true);
3223 }
3224
3225 #[test]
3226 fn load_secret_hash_returns_stored_hash() {
3227 let (_keep, path) = fresh_db();
3228 let id = {
3229 let conn = Connection::open(&path).unwrap();
3230 insert(
3231 &conn,
3232 &NewSubscription {
3233 url: "https://example.com/h",
3234 events: "*",
3235 secret: Some("topsecret"),
3236 namespace_filter: None,
3237 agent_filter: None,
3238 created_by: None,
3239 event_types: None,
3240 },
3241 )
3242 .unwrap()
3243 };
3244 let got = load_secret_hash(&path, &id).unwrap();
3245 assert_eq!(got, Some(sha256_hex("topsecret")));
3246 }
3247
3248 #[test]
3249 fn load_secret_hash_missing_id_errs() {
3250 let (_keep, path) = fresh_db();
3251 // No row → query_row returns Err(QueryReturnedNoRows), which
3252 // is wrapped via `.context()`.
3253 let res = load_secret_hash(&path, "missing-id");
3254 assert!(res.is_err(), "missing subscription id must surface as Err");
3255 }
3256
3257 // ---------------- dispatch_event thread plumbing ----------------
3258
3259 #[test]
3260 fn dispatch_event_no_subs_is_noop() {
3261 let (_keep, path) = fresh_db();
3262 let conn = Connection::open(&path).unwrap();
3263 // Empty subscriptions table — must return without spawning
3264 // any threads or panicking.
3265 dispatch_event(&conn, "memory_store", "m1", "ns", None, &path);
3266 }
3267
3268 #[test]
3269 fn dispatch_event_filter_mismatch_skips_send() {
3270 // Subscriber registered for `memory_delete` only — a
3271 // `memory_store` event must NOT match. We don't have a way to
3272 // observe "no thread spawned" directly without polling, but the
3273 // function returning quickly without panicking exercises the
3274 // matches_filters early-return branch and the `if matching.is_empty
3275 // { return; }` short-circuit.
3276 let (_keep, path) = fresh_db();
3277 let conn = Connection::open(&path).unwrap();
3278 insert(
3279 &conn,
3280 &NewSubscription {
3281 url: "https://example.com/h",
3282 events: "memory_delete",
3283 secret: None,
3284 namespace_filter: None,
3285 agent_filter: None,
3286 created_by: None,
3287 event_types: None,
3288 },
3289 )
3290 .unwrap();
3291 dispatch_event(&conn, "memory_store", "m1", "ns", None, &path);
3292 // Counters must remain zero — no dispatch happened.
3293 let (dc, fc): (i64, i64) = conn
3294 .query_row(
3295 "SELECT dispatch_count, failure_count FROM subscriptions",
3296 [],
3297 |r| Ok((r.get(0)?, r.get(1)?)),
3298 )
3299 .unwrap();
3300 assert_eq!(dc, 0);
3301 assert_eq!(fc, 0);
3302 }
3303
3304 #[test]
3305 fn dispatch_event_namespace_filter_mismatch_skips() {
3306 let (_keep, path) = fresh_db();
3307 let conn = Connection::open(&path).unwrap();
3308 insert(
3309 &conn,
3310 &NewSubscription {
3311 url: "https://example.com/h",
3312 events: "*",
3313 secret: None,
3314 namespace_filter: Some("only-this-ns"),
3315 agent_filter: None,
3316 created_by: None,
3317 event_types: None,
3318 },
3319 )
3320 .unwrap();
3321 // Wrong namespace → no dispatch.
3322 dispatch_event(&conn, "memory_store", "m1", "other-ns", None, &path);
3323 let (dc, fc): (i64, i64) = conn
3324 .query_row(
3325 "SELECT dispatch_count, failure_count FROM subscriptions",
3326 [],
3327 |r| Ok((r.get(0)?, r.get(1)?)),
3328 )
3329 .unwrap();
3330 assert_eq!(dc, 0);
3331 assert_eq!(fc, 0);
3332 }
3333
3334 // ---------------- send() — wiremock-driven HTTP tests ----------------
3335
3336 #[tokio::test(flavor = "multi_thread")]
3337 async fn send_returns_true_on_2xx() {
3338 use wiremock::matchers::{method, path};
3339 use wiremock::{Mock, MockServer};
3340 let server = MockServer::start().await;
3341 // K6: receivers MUST return a JSON ack body — the AckEcho
3342 // helper echoes the request's correlation_id header so the
3343 // ack-correlation-id check in `send` passes.
3344 Mock::given(method("POST"))
3345 .and(path("/hook"))
3346 .respond_with(AckEcho)
3347 .expect(1)
3348 .mount(&server)
3349 .await;
3350 let url = format!("{}/hook", server.uri());
3351 let corr = uuid::Uuid::now_v7().to_string();
3352 let res = tokio::task::spawn_blocking(move || {
3353 send(
3354 &url,
3355 "{\"event\":\"x\"}",
3356 "1700000000",
3357 Some("deadbeef"),
3358 &corr,
3359 )
3360 })
3361 .await
3362 .unwrap();
3363 assert!(res.is_ok(), "2xx + matching ack must succeed: {res:?}");
3364 }
3365
3366 #[tokio::test(flavor = "multi_thread")]
3367 async fn send_returns_false_on_5xx() {
3368 use wiremock::matchers::{method, path};
3369 use wiremock::{Mock, MockServer, ResponseTemplate};
3370 let server = MockServer::start().await;
3371 Mock::given(method("POST"))
3372 .and(path("/hook"))
3373 .respond_with(ResponseTemplate::new(500))
3374 .mount(&server)
3375 .await;
3376 let url = format!("{}/hook", server.uri());
3377 let corr = uuid::Uuid::now_v7().to_string();
3378 let res = tokio::task::spawn_blocking(move || {
3379 send(&url, "{\"event\":\"x\"}", "1700000000", None, &corr)
3380 })
3381 .await
3382 .unwrap();
3383 assert!(res.is_err(), "5xx must return Err (no retry inside send)");
3384 }
3385
3386 #[tokio::test(flavor = "multi_thread")]
3387 async fn send_returns_false_on_4xx() {
3388 use wiremock::matchers::{method, path};
3389 use wiremock::{Mock, MockServer, ResponseTemplate};
3390 let server = MockServer::start().await;
3391 Mock::given(method("POST"))
3392 .and(path("/hook"))
3393 .respond_with(ResponseTemplate::new(404))
3394 .mount(&server)
3395 .await;
3396 let url = format!("{}/hook", server.uri());
3397 let corr = uuid::Uuid::now_v7().to_string();
3398 let res = tokio::task::spawn_blocking(move || send(&url, "{}", "1700000000", None, &corr))
3399 .await
3400 .unwrap();
3401 assert!(res.is_err(), "4xx must return Err");
3402 }
3403
3404 #[tokio::test(flavor = "multi_thread")]
3405 async fn send_does_not_follow_redirect_ssrf_pin_bypass() {
3406 // SR-W3 (HIGH) regression. The webhook client pins reqwest's
3407 // DNS to the SSRF-validated address set of the *original* host.
3408 // A malicious endpoint could return a 3xx whose Location points
3409 // at an internal address the guard never cleared; reqwest's
3410 // default redirect policy would follow it and re-resolve DNS for
3411 // the new host, bypassing the pin. With redirects disabled the
3412 // 3xx is surfaced as a non-success status and the dispatch fails
3413 // safely. We assert BOTH that send returns Err AND that the
3414 // redirect target was never requested.
3415 use wiremock::matchers::{method, path};
3416 use wiremock::{Mock, MockServer, ResponseTemplate};
3417 let server = MockServer::start().await;
3418 // The hook returns a redirect to a path that, if followed, the
3419 // mock would happily answer 200 — proving the failure is the
3420 // un-followed redirect, not a missing target.
3421 Mock::given(method("POST"))
3422 .and(path("/hook"))
3423 .respond_with(
3424 ResponseTemplate::new(302).insert_header("location", "/internal-rebind-target"),
3425 )
3426 .expect(1)
3427 .mount(&server)
3428 .await;
3429 let redirect_followed = Mock::given(path("/internal-rebind-target"))
3430 .respond_with(ResponseTemplate::new(200))
3431 .expect(0)
3432 .named("redirect target must NOT be requested");
3433 server.register(redirect_followed).await;
3434
3435 let url = format!("{}/hook", server.uri());
3436 let corr = uuid::Uuid::now_v7().to_string();
3437 let res = tokio::task::spawn_blocking(move || send(&url, "{}", "1700000000", None, &corr))
3438 .await
3439 .unwrap();
3440 assert!(
3441 res.is_err(),
3442 "redirect must not be followed; 3xx surfaces as a failed dispatch: {res:?}"
3443 );
3444 // The `.expect(0)` on the redirect-target mock is verified on
3445 // server drop; assert it explicitly here for a clear failure
3446 // message if reqwest ever regains redirect-following behavior.
3447 let hits = server
3448 .received_requests()
3449 .await
3450 .unwrap_or_default()
3451 .into_iter()
3452 .filter(|r| r.url.path() == "/internal-rebind-target")
3453 .count();
3454 assert_eq!(hits, 0, "redirect target was requested — SSRF pin bypassed");
3455 }
3456
3457 #[tokio::test(flavor = "multi_thread")]
3458 async fn send_signature_header_set_when_provided() {
3459 use wiremock::matchers::{header, header_exists, method, path};
3460 use wiremock::{Mock, MockServer};
3461 let server = MockServer::start().await;
3462 // Assert the `x-ai-memory-signature` header is `sha256=<sig>`
3463 // and the timestamp + correlation-id headers are set.
3464 Mock::given(method("POST"))
3465 .and(path("/hook"))
3466 .and(header("x-ai-memory-signature", "sha256=abc123"))
3467 .and(header_exists("x-ai-memory-timestamp"))
3468 .and(header_exists("x-ai-memory-correlation-id"))
3469 .and(header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON))
3470 .respond_with(AckEcho)
3471 .expect(1)
3472 .mount(&server)
3473 .await;
3474 let url = format!("{}/hook", server.uri());
3475 let corr = uuid::Uuid::now_v7().to_string();
3476 let res = tokio::task::spawn_blocking(move || {
3477 send(&url, "{}", "1700000000", Some("abc123"), &corr)
3478 })
3479 .await
3480 .unwrap();
3481 assert!(
3482 res.is_ok(),
3483 "2xx with matched signature header + ack must succeed: {res:?}"
3484 );
3485 }
3486
3487 #[tokio::test(flavor = "multi_thread")]
3488 async fn send_no_signature_header_when_secret_absent() {
3489 use wiremock::matchers::{method, path};
3490 use wiremock::{Mock, MockServer, Request};
3491 let server = MockServer::start().await;
3492 Mock::given(method("POST"))
3493 .and(path("/hook"))
3494 .respond_with(AckEcho)
3495 .mount(&server)
3496 .await;
3497 let url = format!("{}/hook", server.uri());
3498 let corr = uuid::Uuid::now_v7().to_string();
3499 let res = tokio::task::spawn_blocking({
3500 let url = url.clone();
3501 let corr = corr.clone();
3502 move || send(&url, "{}", "1700000000", None, &corr)
3503 })
3504 .await
3505 .unwrap();
3506 assert!(res.is_ok(), "ack-echo must succeed: {res:?}");
3507 // Inspect the captured request to confirm no signature header.
3508 let received: Vec<Request> = server.received_requests().await.unwrap_or_default();
3509 assert_eq!(received.len(), 1);
3510 let req = &received[0];
3511 // wiremock lower-cases header names.
3512 assert!(
3513 req.headers.get("x-ai-memory-signature").is_none(),
3514 "no signature should be sent when secret absent"
3515 );
3516 assert!(
3517 req.headers.get("x-ai-memory-timestamp").is_some(),
3518 "timestamp header must always be set"
3519 );
3520 }
3521
3522 #[test]
3523 fn send_rejects_ssrf_url_without_network() {
3524 // `send` is the public dispatch path. A private-network URL must
3525 // be rejected by the `validate_url` guard before any HTTP attempt.
3526 // We don't need a server — the guard fails fast and returns Err.
3527 let res = send(
3528 "https://10.0.0.1/hook",
3529 "{}",
3530 "1700000000",
3531 None,
3532 "some-corr",
3533 );
3534 assert!(
3535 res.is_err(),
3536 "send must reject SSRF URL via validate_url guard"
3537 );
3538 }
3539
3540 #[test]
3541 fn send_rejects_invalid_scheme_without_network() {
3542 // ftp:// is rejected by validate_url; send returns Err.
3543 let res = send("ftp://example.com/hook", "{}", "1700000000", None, "x");
3544 assert!(res.is_err(), "send must reject non-http(s) URL");
3545 }
3546
3547 // ---------------- end-to-end dispatch_event with HTTP mock ----------------
3548
3549 #[tokio::test(flavor = "multi_thread")]
3550 async fn dispatch_event_e2e_increments_dispatch_count_on_2xx() {
3551 use wiremock::matchers::{method, path};
3552 use wiremock::{Mock, MockServer};
3553 let server = MockServer::start().await;
3554 Mock::given(method("POST"))
3555 .and(path("/hook"))
3556 .respond_with(AckEcho)
3557 .mount(&server)
3558 .await;
3559
3560 let (_keep, db_path) = fresh_db();
3561 // Insert a wildcard subscription pointing at the mock.
3562 let id = {
3563 let conn = Connection::open(&db_path).unwrap();
3564 let url = format!("{}/hook", server.uri());
3565 insert(
3566 &conn,
3567 &NewSubscription {
3568 url: &url,
3569 events: "*",
3570 secret: Some("mysecret"),
3571 namespace_filter: None,
3572 agent_filter: None,
3573 created_by: None,
3574 event_types: None,
3575 },
3576 )
3577 .unwrap()
3578 };
3579
3580 // Run dispatch and wait for the spawned thread to record the
3581 // counter bump. dispatch_event spawns a detached std::thread so
3582 // we poll for up to ~5 s.
3583 {
3584 let conn = Connection::open(&db_path).unwrap();
3585 dispatch_event(&conn, "memory_store", "m1", "ns", None, &db_path);
3586 }
3587
3588 let path_for_poll = db_path.clone();
3589 let id_for_poll = id.clone();
3590 let dc = tokio::task::spawn_blocking(move || {
3591 for _ in 0..50 {
3592 let conn = Connection::open(&path_for_poll).unwrap();
3593 let dc: i64 = conn
3594 .query_row(
3595 "SELECT dispatch_count FROM subscriptions WHERE id = ?1",
3596 params![id_for_poll],
3597 |r| r.get(0),
3598 )
3599 .unwrap();
3600 if dc > 0 {
3601 return dc;
3602 }
3603 std::thread::sleep(std::time::Duration::from_millis(100));
3604 }
3605 0
3606 })
3607 .await
3608 .unwrap();
3609 assert_eq!(dc, 1, "successful dispatch must increment dispatch_count");
3610 }
3611
3612 #[tokio::test(flavor = "multi_thread")]
3613 async fn dispatch_event_e2e_increments_failure_count_on_5xx() {
3614 use wiremock::matchers::{method, path};
3615 use wiremock::{Mock, MockServer, ResponseTemplate};
3616 let server = MockServer::start().await;
3617 Mock::given(method("POST"))
3618 .and(path("/hook"))
3619 .respond_with(ResponseTemplate::new(500))
3620 .mount(&server)
3621 .await;
3622
3623 let (_keep, db_path) = fresh_db();
3624 let id = {
3625 let conn = Connection::open(&db_path).unwrap();
3626 let url = format!("{}/hook", server.uri());
3627 insert(
3628 &conn,
3629 &NewSubscription {
3630 url: &url,
3631 events: "*",
3632 secret: None,
3633 namespace_filter: None,
3634 agent_filter: None,
3635 created_by: None,
3636 event_types: None,
3637 },
3638 )
3639 .unwrap()
3640 };
3641
3642 {
3643 let conn = Connection::open(&db_path).unwrap();
3644 dispatch_event(&conn, "memory_store", "m2", "ns", None, &db_path);
3645 }
3646
3647 // K6 retry ladder (200ms + 1s + 5s) means a final-failure
3648 // counter bump can take ≈ 6.5s of wall-clock + per-attempt
3649 // overhead. Poll for up to 12s to cover the worst case.
3650 let path_for_poll = db_path.clone();
3651 let id_for_poll = id.clone();
3652 let (dc, fc) = tokio::task::spawn_blocking(move || {
3653 for _ in 0..120 {
3654 let conn = Connection::open(&path_for_poll).unwrap();
3655 let row: (i64, i64) = conn
3656 .query_row(
3657 "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3658 params![id_for_poll],
3659 |r| Ok((r.get(0)?, r.get(1)?)),
3660 )
3661 .unwrap();
3662 if row.0 > 0 {
3663 return row;
3664 }
3665 std::thread::sleep(std::time::Duration::from_millis(100));
3666 }
3667 (0, 0)
3668 })
3669 .await
3670 .unwrap();
3671 assert_eq!(dc, 1, "5xx still increments dispatch_count");
3672 assert_eq!(fc, 1, "5xx must increment failure_count");
3673 }
3674
3675 #[tokio::test(flavor = "multi_thread")]
3676 async fn dispatch_event_e2e_signature_present_when_secret_set() {
3677 use wiremock::matchers::{header_exists, method, path};
3678 use wiremock::{Mock, MockServer};
3679 let server = MockServer::start().await;
3680 Mock::given(method("POST"))
3681 .and(path("/hook"))
3682 .and(header_exists("x-ai-memory-signature"))
3683 .and(header_exists("x-ai-memory-timestamp"))
3684 .respond_with(AckEcho)
3685 .expect(1)
3686 .mount(&server)
3687 .await;
3688
3689 let (_keep, db_path) = fresh_db();
3690 let _id = {
3691 let conn = Connection::open(&db_path).unwrap();
3692 let url = format!("{}/hook", server.uri());
3693 insert(
3694 &conn,
3695 &NewSubscription {
3696 url: &url,
3697 events: "*",
3698 secret: Some("the-secret"),
3699 namespace_filter: None,
3700 agent_filter: None,
3701 created_by: None,
3702 event_types: None,
3703 },
3704 )
3705 .unwrap()
3706 };
3707
3708 {
3709 let conn = Connection::open(&db_path).unwrap();
3710 dispatch_event(&conn, "memory_store", "m3", "ns", None, &db_path);
3711 }
3712
3713 // Wait for the dispatch thread to fire & wiremock to record.
3714 // We poll the mock's hit count instead of the DB so the
3715 // assertion stays specific to "signature header present".
3716 let server_ref = &server;
3717 for _ in 0..50 {
3718 let received = server_ref.received_requests().await.unwrap_or_default();
3719 if !received.is_empty() {
3720 let req = &received[0];
3721 assert!(
3722 req.headers.get("x-ai-memory-signature").is_some(),
3723 "signature header must be present when secret set"
3724 );
3725 return;
3726 }
3727 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3728 }
3729 panic!("dispatch thread never reached the mock server");
3730 }
3731
3732 // ----------------------------------------------------------------
3733 // v0.7.0 K4 — approval-event routing through subscriptions.
3734 //
3735 // Closes the v0.6.3.1 honest-Capabilities-v2 disclosure that the
3736 // approval surface was advertised but unwired. The four tests below
3737 // pin:
3738 // 1. canonical event constant + capabilities parity
3739 // 2. opt-in subscriber receives the event end-to-end (HTTP mock)
3740 // 3. filter-mismatched subscriber does NOT receive the event
3741 // 4. missing pending row is logged + best-effort no-op
3742 // ----------------------------------------------------------------
3743
3744 #[test]
3745 fn approval_requested_event_in_canonical_list() {
3746 // K4: the new lifecycle event must surface in the canonical
3747 // constant — that's the integration contract the K10 Approval
3748 // API and external SDK consumers pin against.
3749 assert!(
3750 WEBHOOK_EVENT_TYPES.contains(&"approval_requested"),
3751 "K4: WEBHOOK_EVENT_TYPES must include approval_requested"
3752 );
3753 }
3754
3755 #[tokio::test(flavor = "multi_thread")]
3756 async fn approval_requested_dispatches_to_opt_in_subscriber() {
3757 // K4: end-to-end. Insert a subscription opt-ed in to
3758 // `approval_requested` only; queue a pending action via the
3759 // db layer; call the dispatch helper; assert the wiremock
3760 // saw the POST and the body shape carries the K4 details.
3761 use wiremock::matchers::{method, path};
3762 use wiremock::{Mock, MockServer};
3763 let server = MockServer::start().await;
3764 Mock::given(method("POST"))
3765 .and(path("/hook"))
3766 .respond_with(AckEcho)
3767 .mount(&server)
3768 .await;
3769
3770 let (_keep, db_path) = fresh_db();
3771 let url = format!("{}/hook", server.uri());
3772 let opt_in: Vec<String> = vec!["approval_requested".to_string()];
3773 let sub_id = {
3774 let conn = Connection::open(&db_path).unwrap();
3775 insert(
3776 &conn,
3777 &NewSubscription {
3778 url: &url,
3779 events: "approval_requested",
3780 // R3-S1.HMAC (2026-05-13): dispatch refuses
3781 // unsigned bodies; supply a per-sub secret.
3782 secret: Some("test-sub-secret"),
3783 namespace_filter: None,
3784 agent_filter: None,
3785 created_by: None,
3786 event_types: Some(&opt_in),
3787 },
3788 )
3789 .unwrap()
3790 };
3791
3792 // Queue a pending action through the canonical db helper so
3793 // the row exists when dispatch_approval_requested looks it up.
3794 let pending_id = {
3795 let conn = Connection::open(&db_path).unwrap();
3796 crate::db::queue_pending_action(
3797 &conn,
3798 crate::models::GovernedAction::Store,
3799 "k4-ns",
3800 None,
3801 "agent-requestor",
3802 &serde_json::json!({"title": "k4 approval routing"}),
3803 )
3804 .unwrap()
3805 };
3806
3807 // Fire the dispatcher.
3808 {
3809 let conn = Connection::open(&db_path).unwrap();
3810 dispatch_approval_requested(&conn, &pending_id, &db_path);
3811 }
3812
3813 // Poll the mock for the dispatch — std::thread::spawn is
3814 // detached so we cannot join. ~5s budget mirrors the existing
3815 // dispatch_event_e2e_* tests.
3816 let mut received = Vec::new();
3817 for _ in 0..50 {
3818 received = server.received_requests().await.unwrap_or_default();
3819 if !received.is_empty() {
3820 break;
3821 }
3822 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3823 }
3824 assert_eq!(
3825 received.len(),
3826 1,
3827 "K4: opt-in subscriber must receive exactly one approval_requested POST"
3828 );
3829 let body: serde_json::Value =
3830 serde_json::from_slice(&received[0].body).expect("dispatch body must be JSON");
3831 assert_eq!(body["event"], "approval_requested");
3832 assert_eq!(body["memory_id"], pending_id);
3833 assert_eq!(body["namespace"], "k4-ns");
3834 assert_eq!(body["agent_id"], "agent-requestor");
3835 // K4 details block flattened into the envelope.
3836 assert_eq!(body["action_type"], "store");
3837 assert_eq!(body["status"], "pending");
3838 assert!(
3839 body["requested_at"].is_string(),
3840 "requested_at must round-trip from the row"
3841 );
3842
3843 // Sanity: the dispatch_count was bumped on the subscription
3844 // row (proves we went through record_dispatch on success).
3845 // Poll up to 2s — dispatch_count is written back AFTER the HTTP
3846 // POST is acked, so seeing the wiremock request above does not
3847 // imply the row update has landed yet (race observed on Linux
3848 // and Windows runners under load).
3849 let conn = Connection::open(&db_path).unwrap();
3850 let mut dc: i64 = 0;
3851 for _ in 0..40 {
3852 dc = conn
3853 .query_row(
3854 "SELECT dispatch_count FROM subscriptions WHERE id = ?1",
3855 params![sub_id],
3856 |r| r.get(0),
3857 )
3858 .unwrap();
3859 if dc == 1 {
3860 break;
3861 }
3862 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3863 }
3864 assert_eq!(dc, 1, "dispatch_count must be 1 after successful dispatch");
3865 }
3866
3867 #[test]
3868 fn approval_requested_skipped_for_filtered_subscriber() {
3869 // K4: a subscriber opted in to a *different* event type must
3870 // NOT see approval_requested. Exercises the matches_filters
3871 // structured-opt-in branch from the K4 dispatch path. We can't
3872 // observe "no thread spawned" directly, but we can assert
3873 // dispatch_count stays at zero because no HTTP send occurred.
3874 let (_keep, db_path) = fresh_db();
3875 let opt_in_other: Vec<String> = vec!["memory_store".to_string()];
3876 let sub_id = {
3877 let conn = Connection::open(&db_path).unwrap();
3878 insert(
3879 &conn,
3880 &NewSubscription {
3881 url: "https://example.com/hook",
3882 events: "memory_store",
3883 secret: None,
3884 namespace_filter: None,
3885 agent_filter: None,
3886 created_by: None,
3887 event_types: Some(&opt_in_other),
3888 },
3889 )
3890 .unwrap()
3891 };
3892 let pending_id = {
3893 let conn = Connection::open(&db_path).unwrap();
3894 crate::db::queue_pending_action(
3895 &conn,
3896 crate::models::GovernedAction::Delete,
3897 "k4-ns-2",
3898 Some("memory-xyz"),
3899 "agent-requestor",
3900 &serde_json::json!({"id": "memory-xyz"}),
3901 )
3902 .unwrap()
3903 };
3904 {
3905 let conn = Connection::open(&db_path).unwrap();
3906 dispatch_approval_requested(&conn, &pending_id, &db_path);
3907 }
3908 // Mismatched filter: matches_filters returns false → no
3909 // dispatch thread spawned → counters stay at zero. Sleep a
3910 // beat so we'd notice an unintended dispatch racing in.
3911 std::thread::sleep(std::time::Duration::from_millis(200));
3912 let conn = Connection::open(&db_path).unwrap();
3913 let (dc, fc): (i64, i64) = conn
3914 .query_row(
3915 "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3916 params![sub_id],
3917 |r| Ok((r.get(0)?, r.get(1)?)),
3918 )
3919 .unwrap();
3920 assert_eq!(dc, 0, "filter mismatch must skip dispatch");
3921 assert_eq!(fc, 0);
3922 }
3923
3924 #[test]
3925 fn approval_requested_missing_pending_row_is_noop() {
3926 // K4: defensive — the helper looks up the row before
3927 // dispatching. A bogus id must NOT panic, NOT spawn a thread,
3928 // and NOT touch any subscriber row. Exercises the early-return
3929 // branches in dispatch_approval_requested.
3930 let (_keep, db_path) = fresh_db();
3931 let sub_id = {
3932 let conn = Connection::open(&db_path).unwrap();
3933 insert(
3934 &conn,
3935 &NewSubscription {
3936 url: "https://example.com/hook",
3937 events: "*",
3938 secret: None,
3939 namespace_filter: None,
3940 agent_filter: None,
3941 created_by: None,
3942 event_types: None,
3943 },
3944 )
3945 .unwrap()
3946 };
3947 let conn = Connection::open(&db_path).unwrap();
3948 // Bogus id — pending_actions table is empty.
3949 dispatch_approval_requested(&conn, "nonexistent-id", &db_path);
3950 let (dc, fc): (i64, i64) = conn
3951 .query_row(
3952 "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
3953 params![sub_id],
3954 |r| Ok((r.get(0)?, r.get(1)?)),
3955 )
3956 .unwrap();
3957 assert_eq!(dc, 0, "missing pending row must not dispatch");
3958 assert_eq!(fc, 0);
3959 }
3960
3961 // ----------------------------------------------------------------
3962 // v0.7.0 K6 — A2A correlation IDs + ACK / retry / DLQ tests.
3963 //
3964 // Pinned behaviours:
3965 // 1. correlation_id is a UUIDv7 string and lands in
3966 // `subscription_events.correlation_id`
3967 // 2. successful delivery transitions the audit row to 'ack' and
3968 // the dispatched body's `correlation_id` field matches
3969 // 3. a 500-only mock exhausts the [200ms, 1s, 5s] retry ladder
3970 // and the row lands in `subscription_dlq`
3971 // 4. `replay_subscription_events` returns audit rows ordered by
3972 // delivered_at since a cursor timestamp
3973 // ----------------------------------------------------------------
3974
3975 #[tokio::test(flavor = "multi_thread")]
3976 async fn k6_dispatch_persists_uuidv7_correlation_id() {
3977 use wiremock::matchers::{method, path};
3978 use wiremock::{Mock, MockServer};
3979 let server = MockServer::start().await;
3980 Mock::given(method("POST"))
3981 .and(path("/hook"))
3982 .respond_with(AckEcho)
3983 .mount(&server)
3984 .await;
3985
3986 let (_keep, db_path) = fresh_db();
3987 let url = format!("{}/hook", server.uri());
3988 let sub_id = {
3989 let conn = Connection::open(&db_path).unwrap();
3990 insert(
3991 &conn,
3992 &NewSubscription {
3993 url: &url,
3994 events: "*",
3995 // R3-S1.HMAC (2026-05-13): dispatch refuses
3996 // unsigned bodies; supply a per-sub secret.
3997 secret: Some("test-sub-secret"),
3998 namespace_filter: None,
3999 agent_filter: None,
4000 created_by: None,
4001 event_types: None,
4002 },
4003 )
4004 .unwrap()
4005 };
4006 {
4007 let conn = Connection::open(&db_path).unwrap();
4008 dispatch_event(&conn, "memory_store", "k6-mem", "k6-ns", None, &db_path);
4009 }
4010
4011 // Poll until the subscription_events row is acked.
4012 let path_for_poll = db_path.clone();
4013 let sub_for_poll = sub_id.clone();
4014 let row = tokio::task::spawn_blocking(move || {
4015 for _ in 0..50 {
4016 let conn = Connection::open(&path_for_poll).unwrap();
4017 let r: Option<(String, String, String)> = conn
4018 .query_row(
4019 "SELECT correlation_id, payload, delivery_status \
4020 FROM subscription_events WHERE subscription_id = ?1",
4021 params![sub_for_poll],
4022 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
4023 )
4024 .ok();
4025 if let Some(r) = r
4026 && r.2 == "ack"
4027 {
4028 return Some(r);
4029 }
4030 std::thread::sleep(std::time::Duration::from_millis(100));
4031 }
4032 None
4033 })
4034 .await
4035 .unwrap();
4036 let (corr, body, status) = row.expect("audit row must reach ack status");
4037 assert_eq!(status, "ack");
4038 // UUIDv7 — parses + version 7.
4039 let parsed = uuid::Uuid::parse_str(&corr).expect("UUIDv7 string");
4040 assert_eq!(parsed.get_version_num(), 7, "correlation_id must be UUIDv7");
4041 // The dispatched body carries the same correlation_id.
4042 let json: serde_json::Value = serde_json::from_str(&body).unwrap();
4043 assert_eq!(
4044 json["correlation_id"].as_str(),
4045 Some(corr.as_str()),
4046 "payload correlation_id must match audit row"
4047 );
4048 }
4049
4050 #[tokio::test(flavor = "multi_thread")]
4051 async fn k6_500_after_retries_lands_in_dlq() {
4052 use wiremock::matchers::{method, path};
4053 use wiremock::{Mock, MockServer, ResponseTemplate};
4054 let server = MockServer::start().await;
4055 // Mock returns 500 for every attempt — exhausts the retry
4056 // ladder and forces the DLQ branch.
4057 Mock::given(method("POST"))
4058 .and(path("/hook"))
4059 .respond_with(ResponseTemplate::new(500))
4060 .mount(&server)
4061 .await;
4062
4063 let (_keep, db_path) = fresh_db();
4064 let url = format!("{}/hook", server.uri());
4065 let sub_id = {
4066 let conn = Connection::open(&db_path).unwrap();
4067 insert(
4068 &conn,
4069 &NewSubscription {
4070 url: &url,
4071 events: "*",
4072 // R3-S1.HMAC (2026-05-13): dispatch refuses
4073 // unsigned bodies; supply a per-sub secret.
4074 secret: Some("test-sub-secret"),
4075 namespace_filter: None,
4076 agent_filter: None,
4077 created_by: None,
4078 event_types: None,
4079 },
4080 )
4081 .unwrap()
4082 };
4083 {
4084 let conn = Connection::open(&db_path).unwrap();
4085 dispatch_event(&conn, "memory_store", "k6-fail", "k6-ns", None, &db_path);
4086 }
4087
4088 // Backoff ladder is 200ms + 1s + 5s ≈ 6.2s of sleeps + per-
4089 // attempt network time. Poll for up to 12s for the DLQ row.
4090 let path_for_poll = db_path.clone();
4091 let sub_for_poll = sub_id.clone();
4092 let dlq_row = tokio::task::spawn_blocking(move || {
4093 for _ in 0..120 {
4094 let conn = Connection::open(&path_for_poll).unwrap();
4095 let entries = list_dlq(&conn, Some(&sub_for_poll)).unwrap();
4096 if !entries.is_empty() {
4097 return Some(entries);
4098 }
4099 std::thread::sleep(std::time::Duration::from_millis(100));
4100 }
4101 None
4102 })
4103 .await
4104 .unwrap()
4105 .expect("DLQ row must appear after retry ladder exhaustion");
4106
4107 assert_eq!(dlq_row.len(), 1, "exactly one DLQ row per failed delivery");
4108 let row = &dlq_row[0];
4109 assert_eq!(row.subscription_id, sub_id);
4110 assert_eq!(row.event_type, "memory_store");
4111 assert_eq!(
4112 row.retry_count,
4113 (RETRY_BACKOFFS.len() as i64) + 1,
4114 "retry_count = initial attempt + RETRY_BACKOFFS.len() retries"
4115 );
4116 assert!(
4117 row.last_error.starts_with("http-5"),
4118 "last_error must record the 5xx status: {}",
4119 row.last_error
4120 );
4121 assert!(!row.first_failed_at.is_empty());
4122 assert!(!row.last_failed_at.is_empty());
4123 // Audit row should be marked failed.
4124 let conn = Connection::open(&db_path).unwrap();
4125 let status: String = conn
4126 .query_row(
4127 "SELECT delivery_status FROM subscription_events WHERE correlation_id = ?1",
4128 params![row.correlation_id],
4129 |r| r.get(0),
4130 )
4131 .unwrap();
4132 assert_eq!(status, "failed");
4133 }
4134
4135 #[tokio::test(flavor = "multi_thread")]
4136 async fn k6_replay_subscription_events_returns_rows_since_cursor() {
4137 // Insert two audit rows by hand (faster than driving two full
4138 // dispatches) and assert the cursor filter returns only the
4139 // newer one.
4140 let (_keep, db_path) = fresh_db();
4141 let url = "https://example.com/hook";
4142 let sub_id = {
4143 let conn = Connection::open(&db_path).unwrap();
4144 insert(
4145 &conn,
4146 &NewSubscription {
4147 url,
4148 events: "*",
4149 secret: None,
4150 namespace_filter: None,
4151 agent_filter: None,
4152 created_by: None,
4153 event_types: None,
4154 },
4155 )
4156 .unwrap()
4157 };
4158 // Two correlation ids with explicit delivered_at cursors.
4159 let conn = Connection::open(&db_path).unwrap();
4160 conn.execute(
4161 "INSERT INTO subscription_events \
4162 (subscription_id, correlation_id, event_type, payload, delivered_at, delivery_status) \
4163 VALUES (?1, ?2, 'memory_store', '{}', '2026-01-01T00:00:00Z', 'ack')",
4164 params![sub_id, "c-old"],
4165 )
4166 .unwrap();
4167 conn.execute(
4168 "INSERT INTO subscription_events \
4169 (subscription_id, correlation_id, event_type, payload, delivered_at, delivery_status) \
4170 VALUES (?1, ?2, 'memory_store', '{}', '2026-05-05T00:00:00Z', 'ack')",
4171 params![sub_id, "c-new"],
4172 )
4173 .unwrap();
4174 let after = replay_subscription_events(&conn, &sub_id, "2026-03-01T00:00:00Z")
4175 .expect("replay query");
4176 assert_eq!(after.len(), 1, "cursor must filter to the newer row");
4177 assert_eq!(after[0].correlation_id, "c-new");
4178 // The MCP-shaped wrapper.
4179 let envelope = memory_subscription_replay(&conn, &sub_id, "2026-03-01T00:00:00Z").unwrap();
4180 assert_eq!(envelope["count"], 1);
4181 assert_eq!(envelope["events"][0]["correlation_id"], "c-new");
4182 }
4183
4184 /// #1253 (MED, 2026-05-25) — regression: per-subscription
4185 /// `subscription_dlq` depth is capped at
4186 /// [`MAX_SUBSCRIPTION_DLQ_ROWS`]; the (cap + 1)-th insert is
4187 /// refused with a `dlq_overflow` error, leaves the table at
4188 /// exactly cap rows, and bumps the
4189 /// `ai_memory_subscription_dlq_overflow_total` counter.
4190 #[test]
4191 fn issue_1253_dlq_overflow_cap_refuses_past_max() {
4192 let (_keep, db_path) = fresh_db();
4193 let conn = Connection::open(&db_path).unwrap();
4194 let sub_id = insert(
4195 &conn,
4196 &NewSubscription {
4197 url: "https://example.com/hook",
4198 events: "*",
4199 secret: Some("s"),
4200 namespace_filter: None,
4201 agent_filter: None,
4202 created_by: None,
4203 event_types: None,
4204 },
4205 )
4206 .unwrap();
4207
4208 // Drive the DLQ to the cap with the canonical writer so the
4209 // test exercises the same insert path production uses.
4210 for i in 0..MAX_SUBSCRIPTION_DLQ_ROWS {
4211 let corr = format!("c-{i}");
4212 record_dlq_with_conn(
4213 &conn,
4214 &sub_id,
4215 &corr,
4216 "memory_store",
4217 "{}",
4218 4,
4219 "http-500",
4220 "2026-01-01T00:00:00Z",
4221 "2026-01-01T00:00:00Z",
4222 )
4223 .expect("inserts below cap must succeed");
4224 }
4225 let depth: i64 = conn
4226 .query_row(
4227 "SELECT COUNT(*) FROM subscription_dlq WHERE subscription_id = ?1",
4228 params![&sub_id],
4229 |r| r.get(0),
4230 )
4231 .unwrap();
4232 assert_eq!(depth, MAX_SUBSCRIPTION_DLQ_ROWS, "DLQ should fill to cap");
4233
4234 // Snapshot the overflow counter so we can prove it advances.
4235 let before = crate::metrics::subscription_dlq_overflow_count();
4236
4237 // (cap + 1)-th insert must be refused with the typed error
4238 // shape — the err string starts with "dlq_overflow:" so
4239 // operators / dashboards can pattern-match it.
4240 let res = record_dlq_with_conn(
4241 &conn,
4242 &sub_id,
4243 "c-overflow",
4244 "memory_store",
4245 "{}",
4246 4,
4247 "http-500",
4248 "2026-01-01T00:00:00Z",
4249 "2026-01-01T00:00:00Z",
4250 );
4251 let err = res.expect_err("over-cap insert must be refused");
4252 let msg = format!("{err}");
4253 assert!(
4254 msg.contains("dlq_overflow"),
4255 "error must carry the dlq_overflow tag for operators: {msg}"
4256 );
4257
4258 // The table must remain at cap — refusal is total, not partial.
4259 let depth_after: i64 = conn
4260 .query_row(
4261 "SELECT COUNT(*) FROM subscription_dlq WHERE subscription_id = ?1",
4262 params![&sub_id],
4263 |r| r.get(0),
4264 )
4265 .unwrap();
4266 assert_eq!(
4267 depth_after, MAX_SUBSCRIPTION_DLQ_ROWS,
4268 "refused insert must not leak a row into subscription_dlq"
4269 );
4270
4271 // Counter must have ticked. The metrics handle is a singleton
4272 // so parallel tests can also bump it; what we own is the +1
4273 // contributed by this call.
4274 let after = crate::metrics::subscription_dlq_overflow_count();
4275 assert!(
4276 after >= before + 1,
4277 "subscription_dlq_overflow_total did not advance (before={before}, after={after})"
4278 );
4279 }
4280}
4281
4282// Local hex helper used only by tests; the production paths use the
4283// format!("{:x}", _) pattern over GenericArray outputs.
4284#[cfg(test)]
4285mod hex {
4286 pub fn encode_fallback(bytes: &[u8]) -> String {
4287 bytes.iter().map(|b| format!("{b:02x}")).collect()
4288 }
4289}
4290
4291#[test]
4292fn webhook_signing_with_unicode_payload() {
4293 // Test HMAC signing with Unicode characters in the payload.
4294 let payload = serde_json::json!({
4295 "event": "memory_store",
4296 "memory_id": "m1",
4297 "namespace": "café",
4298 "agent_id": null,
4299 "delivered_at": "2026-01-01T00:00:00Z"
4300 });
4301 let body = serde_json::to_string(&payload).unwrap();
4302 let key_hex = sha256_hex("secret-with-café");
4303 let sig = hmac_sha256_hex(&key_hex, &body);
4304 // Signature must be non-empty and valid hex
4305 assert!(!sig.is_empty());
4306 assert_eq!(sig.len(), 64); // SHA256 produces 256 bits = 64 hex chars
4307}
4308
4309#[test]
4310fn webhook_retries_on_5xx_response() {
4311 // Test that send() returns false (failure) on 5xx responses.
4312 // This is implicit in the send() implementation which only returns
4313 // true on 2xx. Verify the boundary condition.
4314 let status_2xx = true; // success
4315 let status_5xx = false; // not success
4316 assert_ne!(status_2xx, status_5xx);
4317}
4318
4319#[test]
4320fn webhook_does_not_retry_on_4xx_response() {
4321 // Similar to above — 4xx responses return false (no retry).
4322 // The implementation treats all non-2xx as failure.
4323 // send() will return false for 4xx, 5xx, etc.
4324 let status_4xx = false;
4325 let status_success = true;
4326 assert_ne!(status_4xx, status_success);
4327}
4328
4329#[test]
4330fn namespace_pattern_matches_glob_correctly() {
4331 // Test namespace filter matching with exact-match semantics.
4332 assert!(matches_filters(
4333 "*",
4334 None,
4335 Some("app"),
4336 None,
4337 "memory_store",
4338 "app",
4339 None
4340 ));
4341 assert!(!matches_filters(
4342 "*",
4343 None,
4344 Some("app"),
4345 None,
4346 "memory_store",
4347 "other",
4348 None
4349 ));
4350 // Empty namespace filter matches any namespace (no filter applied)
4351 assert!(matches_filters(
4352 "*",
4353 None,
4354 Some(""),
4355 None,
4356 "memory_store",
4357 "any_ns",
4358 None
4359 ));
4360}