Skip to main content

ai_memory/
subscriptions.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.6.0.0 — webhook subscriptions.
5//!
6//! Subscribers register a URL + shared secret + event/namespace/agent
7//! filters. When a matching event fires (e.g. `memory_store`), a
8//! fire-and-forget thread POSTs an HMAC-SHA256-signed JSON payload.
9//!
10//! SSRF hardening:
11//! - `http://` only to `127.0.0.0/8` or `localhost` hosts;
12//!   everywhere else requires `https://`
13//! - RFC1918 / RFC4193 / link-local hosts are rejected unless
14//!   `allow_private_networks = true` in the daemon config
15//!
16//! Signature:
17//! - Header `X-Ai-Memory-Signature: sha256=<hex>` over the raw
18//!   JSON body
19//! - The secret stored in the DB is a SHA-256 of the plaintext
20//!   shared secret; the plaintext is returned **once** at
21//!   subscription time and never leaves the DB after.
22
23use std::net::{IpAddr, ToSocketAddrs};
24use std::str::FromStr;
25
26use anyhow::{Context, Result, anyhow};
27use rusqlite::{Connection, params};
28use serde::{Deserialize, Serialize};
29use sha2::{Digest, Sha256};
30
31/// Public-facing subscription record (no secret plaintext).
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct Subscription {
34    pub id: String,
35    pub url: String,
36    pub events: String,
37    pub namespace_filter: Option<String>,
38    pub agent_filter: Option<String>,
39    pub created_by: Option<String>,
40    pub created_at: String,
41    pub dispatch_count: i64,
42    pub failure_count: i64,
43    /// v0.6.3.1 P5 (G9): structured per-event-type opt-in list. When
44    /// `Some(list)` the subscription only fires for event types in
45    /// `list` (overriding the legacy comma-separated `events`
46    /// whitelist). When `None` (default) all events match — preserves
47    /// pre-P5 behaviour for existing subscribers.
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub event_types: Option<Vec<String>>,
50}
51
52/// Parameters for creating a subscription.
53pub struct NewSubscription<'a> {
54    pub url: &'a str,
55    pub events: &'a str,
56    pub secret: Option<&'a str>,
57    pub namespace_filter: Option<&'a str>,
58    pub agent_filter: Option<&'a str>,
59    pub created_by: Option<&'a str>,
60    /// v0.6.3.1 P5 (G9): optional structured event-type whitelist. When
61    /// `Some`, only the listed event types fire. When `None`, the legacy
62    /// `events` field (comma-separated / `*`) governs — the historical
63    /// behaviour for backward compatibility.
64    pub event_types: Option<&'a [String]>,
65}
66
67/// Canonical list of webhook lifecycle events surfaced to subscribers
68/// and to `memory_capabilities` (capabilities v2 `webhook_events`).
69/// Keep stable: integrators pin against these strings.
70pub const WEBHOOK_EVENT_TYPES: &[&str] = &[
71    "memory_store",
72    "memory_promote",
73    "memory_delete",
74    "memory_link_created",
75    "memory_consolidated",
76];
77
78/// Insert a subscription, hashing any secret before persisting.
79///
80/// Returns the new subscription's id.
81///
82/// P5 (G9): when `event_types` is `Some`, the structured opt-in list is
83/// JSON-encoded into the new `event_types` column AND mirrored into
84/// the legacy comma-separated `events` column so the existing
85/// dispatch matcher continues to work without a second code path. An
86/// unknown event type returns Err — the canonical list lives in
87/// `WEBHOOK_EVENT_TYPES`.
88pub fn insert(conn: &Connection, req: &NewSubscription<'_>) -> Result<String> {
89    validate_url(req.url)?;
90    let id = uuid::Uuid::new_v4().to_string();
91    let secret_hash = req.secret.map(sha256_hex);
92    let now = chrono::Utc::now().to_rfc3339();
93
94    // P5: validate + serialise the structured event-type list.
95    let (events_csv, event_types_json) = if let Some(list) = req.event_types {
96        for ev in list {
97            if !WEBHOOK_EVENT_TYPES.contains(&ev.as_str()) {
98                return Err(anyhow!(
99                    "unknown webhook event type {ev:?}; valid types: {WEBHOOK_EVENT_TYPES:?}"
100                ));
101            }
102        }
103        // Mirror into the legacy events column so dispatch keeps working.
104        let csv = list.join(",");
105        let json = serde_json::to_string(list).context("event_types serialise")?;
106        (csv, Some(json))
107    } else {
108        (req.events.to_string(), None)
109    };
110
111    conn.execute(
112        "INSERT INTO subscriptions (id, url, events, secret_hash, namespace_filter, agent_filter, created_by, created_at, event_types) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
113        params![id, req.url, events_csv, secret_hash, req.namespace_filter, req.agent_filter, req.created_by, now, event_types_json],
114    )?;
115    Ok(id)
116}
117
118/// Delete a subscription by id. Returns true if a row was removed.
119pub fn delete(conn: &Connection, id: &str) -> Result<bool> {
120    let n = conn.execute("DELETE FROM subscriptions WHERE id = ?1", params![id])?;
121    Ok(n > 0)
122}
123
124/// List all active subscriptions.
125pub fn list(conn: &Connection) -> Result<Vec<Subscription>> {
126    let mut stmt = conn.prepare(
127        "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions ORDER BY created_at DESC",
128    )?;
129    let rows = stmt.query_map([], |row| {
130        let event_types_raw: Option<String> = row.get(9)?;
131        // P5: decode the JSON column. A corrupt row should not break
132        // the entire list — fall back to None (= all-events) and warn.
133        let event_types =
134            event_types_raw.and_then(|s| match serde_json::from_str::<Vec<String>>(&s) {
135                Ok(v) => Some(v),
136                Err(e) => {
137                    tracing::warn!(
138                        "subscription event_types JSON decode failed, treating as all-events: {e}"
139                    );
140                    None
141                }
142            });
143        Ok(Subscription {
144            id: row.get(0)?,
145            url: row.get(1)?,
146            events: row.get(2)?,
147            namespace_filter: row.get(3)?,
148            agent_filter: row.get(4)?,
149            created_by: row.get(5)?,
150            created_at: row.get(6)?,
151            dispatch_count: row.get(7)?,
152            failure_count: row.get(8)?,
153            event_types,
154        })
155    })?;
156    rows.collect::<rusqlite::Result<Vec<_>>>()
157        .context("subscription row decode failed")
158}
159
160/// P5 (G9): list subscriptions matching a specific event type. Returns
161/// rows where either:
162///   - `event_types` is NULL (= all events; backward-compat default), OR
163///   - `event_types` JSON array contains `event_type`.
164///
165/// This is the DB-side variant of the per-event filter; the in-memory
166/// `matches_filters` is the authoritative gate at dispatch time and
167/// honours both the legacy `events` whitelist and the new
168/// `event_types` opt-in list.
169pub fn list_by_event(conn: &Connection, event_type: &str) -> Result<Vec<Subscription>> {
170    // SQLite doesn't have a JSON contains operator portable across all
171    // builds; we filter in Rust after a coarse SQL prefilter that drops
172    // rows whose stored JSON clearly doesn't mention the event. The
173    // text LIKE match is conservative (it can yield false positives the
174    // post-filter then rejects) which keeps the SQL simple while still
175    // letting an idx_subscriptions_event_types-backed scan win on large
176    // tables.
177    let pattern = format!("%{event_type}%");
178    let mut stmt = conn.prepare(
179        "SELECT id, url, events, namespace_filter, agent_filter, created_by, created_at, dispatch_count, failure_count, event_types FROM subscriptions WHERE event_types IS NULL OR event_types LIKE ?1 ORDER BY created_at DESC",
180    )?;
181    let rows = stmt.query_map(params![pattern], |row| {
182        let event_types_raw: Option<String> = row.get(9)?;
183        let event_types =
184            event_types_raw.and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok());
185        Ok(Subscription {
186            id: row.get(0)?,
187            url: row.get(1)?,
188            events: row.get(2)?,
189            namespace_filter: row.get(3)?,
190            agent_filter: row.get(4)?,
191            created_by: row.get(5)?,
192            created_at: row.get(6)?,
193            dispatch_count: row.get(7)?,
194            failure_count: row.get(8)?,
195            event_types,
196        })
197    })?;
198    let mut out: Vec<Subscription> = Vec::new();
199    for sub in rows {
200        let s = sub.context("subscription row decode failed")?;
201        match &s.event_types {
202            None => out.push(s),
203            Some(list) if list.iter().any(|e| e == event_type) => out.push(s),
204            Some(_) => {} // structured opt-in present but doesn't include this event
205        }
206    }
207    Ok(out)
208}
209
210/// Test whether a subscription's filters match the given event.
211///
212/// P5 (G9): when `sub_event_types` is `Some(list)` it overrides the
213/// legacy `sub_events` comma-string — the structured opt-in is the
214/// authoritative filter for that subscriber. When `None`, the legacy
215/// whitelist applies (backward compat for pre-P5 subscribers).
216fn matches_filters(
217    sub_events: &str,
218    sub_event_types: Option<&[String]>,
219    sub_namespace: Option<&str>,
220    sub_agent: Option<&str>,
221    event: &str,
222    namespace: &str,
223    agent: Option<&str>,
224) -> bool {
225    let event_match = if let Some(list) = sub_event_types {
226        // Structured opt-in: empty list means "no events" (defensive — the
227        // insert path validates non-empty, but defend against hand-crafted
228        // rows).
229        list.iter().any(|e| e == event)
230    } else {
231        // Legacy whitelist (comma-separated or `*`).
232        sub_events == "*"
233            || sub_events
234                .split(',')
235                .map(str::trim)
236                .any(|e| e == event || e == "*")
237    };
238    if !event_match {
239        return false;
240    }
241    if let Some(ns) = sub_namespace
242        && !ns.is_empty()
243        && ns != namespace
244    {
245        return false;
246    }
247    if let Some(filter) = sub_agent
248        && !filter.is_empty()
249        && agent.is_none_or(|a| a != filter)
250    {
251        return false;
252    }
253    true
254}
255
256/// Payload fired to subscribers. Stable JSON shape.
257#[derive(Serialize)]
258struct DispatchPayload<'a> {
259    event: &'a str,
260    memory_id: &'a str,
261    namespace: &'a str,
262    agent_id: Option<&'a str>,
263    delivered_at: String,
264    /// P5 (G9): event-specific extra fields. Flattened so the wire shape
265    /// stays a flat object — older subscribers that ignore unknown keys
266    /// keep working. Each new event type uses one of the
267    /// `*EventDetails` structs below.
268    #[serde(flatten, skip_serializing_if = "Option::is_none")]
269    details: Option<serde_json::Value>,
270}
271
272// ---------------------------------------------------------------------
273// P5 (G9) — event payload structs for the four new lifecycle events.
274//
275// Each struct is the `details` block flattened into `DispatchPayload`
276// for its event type. They are intentionally small and JSON-stable —
277// the same shape ships on both the MCP and HTTP webhook surfaces.
278// Adding a new field is backward-compatible (subscribers ignore
279// unknowns); renaming or removing a field is breaking — bump the
280// payload schema version per AI_DEVELOPER_GOVERNANCE.md.
281// ---------------------------------------------------------------------
282
283/// `memory_promote` event — fires after a tier or vertical promotion
284/// commits. `to_namespace` is `Some` for vertical (`memory_promote`
285/// with a `to_namespace` argument); for the default tier promotion it
286/// is `None` and `tier` is set to the new tier (`"long"`).
287#[derive(Debug, Clone, Serialize, Deserialize)]
288pub struct PromoteEventDetails {
289    /// `"vertical"` for namespace promote-clone, `"tier"` for the
290    /// default tier upgrade.
291    pub mode: String,
292    /// New tier after promotion (always `"long"` for `mode = "tier"`).
293    #[serde(default, skip_serializing_if = "Option::is_none")]
294    pub tier: Option<String>,
295    /// Target namespace (vertical promote only).
296    #[serde(default, skip_serializing_if = "Option::is_none")]
297    pub to_namespace: Option<String>,
298    /// Clone id (vertical promote only); the `memory_id` field on the
299    /// outer payload carries the source memory id in vertical mode.
300    #[serde(default, skip_serializing_if = "Option::is_none")]
301    pub clone_id: Option<String>,
302}
303
304/// `memory_delete` event — fires after the row is removed from
305/// `memories`. `title` and `tier` come from the pre-delete snapshot so
306/// subscribers can write meaningful audit entries without a
307/// roundtrip.
308#[derive(Debug, Clone, Serialize, Deserialize)]
309pub struct DeleteEventDetails {
310    pub title: String,
311    pub tier: String,
312}
313
314/// `memory_link_created` event — fires after `db::create_link`
315/// commits. The outer `memory_id` carries the source id (the
316/// link-author side); `target_id` is the destination of the directed
317/// link.
318#[derive(Debug, Clone, Serialize, Deserialize)]
319pub struct LinkCreatedEventDetails {
320    pub target_id: String,
321    pub relation: String,
322}
323
324/// `memory_consolidated` event — fires after `db::consolidate`
325/// commits. The outer `memory_id` carries the new consolidated
326/// memory's id; `source_ids` is the array of memories that were
327/// merged (and deleted by the consolidate op).
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct ConsolidatedEventDetails {
330    pub source_ids: Vec<String>,
331    pub source_count: usize,
332}
333
334/// Fire an event to all matching subscribers. Each dispatch runs in
335/// its own OS thread and does NOT block the caller. Errors are logged
336/// and counted in the DB via `failure_count`.
337///
338/// Caller owns the connection. Dispatch threads re-open the connection
339/// as needed to update counters (cheap — `SQLite` connections are
340/// process-shared via WAL).
341///
342/// P5 (G9): convenience wrapper for the historical no-details case
343/// (used by `memory_store`). New event types should call
344/// `dispatch_event_with_details` and pass the matching
345/// `*EventDetails` struct serialised to JSON.
346pub fn dispatch_event(
347    conn: &Connection,
348    event: &str,
349    memory_id: &str,
350    namespace: &str,
351    agent_id: Option<&str>,
352    db_path: &std::path::Path,
353) {
354    dispatch_event_with_details(conn, event, memory_id, namespace, agent_id, db_path, None);
355}
356
357/// P5 (G9): full lifecycle dispatch with optional event-specific
358/// details. The details JSON is FLATTENED into the dispatch payload —
359/// keys must not collide with the outer envelope (`event`,
360/// `memory_id`, `namespace`, `agent_id`, `delivered_at`). The four
361/// new event types (`memory_promote`, `memory_delete`,
362/// `memory_link_created`, `memory_consolidated`) supply their
363/// `*EventDetails` struct serialised via `serde_json::to_value`.
364pub fn dispatch_event_with_details(
365    conn: &Connection,
366    event: &str,
367    memory_id: &str,
368    namespace: &str,
369    agent_id: Option<&str>,
370    db_path: &std::path::Path,
371    details: Option<serde_json::Value>,
372) {
373    let subs = match list(conn) {
374        Ok(s) => s,
375        Err(e) => {
376            tracing::warn!("subscription list failed during dispatch: {e}");
377            return;
378        }
379    };
380    let matching: Vec<Subscription> = subs
381        .into_iter()
382        .filter(|s| {
383            matches_filters(
384                &s.events,
385                s.event_types.as_deref(),
386                s.namespace_filter.as_deref(),
387                s.agent_filter.as_deref(),
388                event,
389                namespace,
390                agent_id,
391            )
392        })
393        .collect();
394    if matching.is_empty() {
395        return;
396    }
397    let payload = DispatchPayload {
398        event,
399        memory_id,
400        namespace,
401        agent_id,
402        delivered_at: chrono::Utc::now().to_rfc3339(),
403        details,
404    };
405    let body = match serde_json::to_string(&payload) {
406        Ok(s) => s,
407        Err(e) => {
408            tracing::warn!("dispatch payload serialize failed: {e}");
409            return;
410        }
411    };
412    // Timestamp is part of the canonical string the signature is
413    // computed over. Receivers SHOULD reject requests whose timestamp
414    // differs from their clock by more than 5 minutes (replay window).
415    // (#301 item 1 — prior implementation had no replay protection.)
416    let timestamp = chrono::Utc::now().timestamp().to_string();
417    for sub in matching {
418        let url = sub.url.clone();
419        let sub_id = sub.id.clone();
420        let body = body.clone();
421        let ts = timestamp.clone();
422        let db_path = db_path.to_path_buf();
423        std::thread::spawn(move || {
424            let secret_hash = match load_secret_hash(&db_path, &sub_id) {
425                Ok(s) => s,
426                Err(e) => {
427                    tracing::warn!("subscription secret lookup failed: {e}");
428                    return;
429                }
430            };
431            // Canonical string: "<timestamp>.<body>". Keyed HMAC over
432            // the DB-stored secret hash. Receivers verify by computing
433            // SHA256(plaintext_secret) and then
434            // HMAC-SHA256(key, "<timestamp>.<body>").
435            let canonical = format!("{ts}.{body}");
436            let signature = secret_hash
437                .as_deref()
438                .map(|h| hmac_sha256_hex(h, &canonical));
439            let ok = send(&url, &body, &ts, signature.as_deref());
440            record_dispatch(&db_path, &sub_id, ok);
441        });
442    }
443}
444
445/// Perform one HTTP POST with SSRF-hardened URL check + signature
446/// + timestamp headers. Returns true on any 2xx response.
447fn send(url: &str, body: &str, timestamp: &str, signature: Option<&str>) -> bool {
448    if let Err(e) = validate_url(url) {
449        tracing::warn!("SSRF guard rejected webhook URL {url}: {e}");
450        return false;
451    }
452    // DNS-resolution guard (#301 item 2). We rely on reqwest to
453    // perform the connect, but pre-check by resolving the host here
454    // and rejecting if any returned address is private / loopback /
455    // link-local. Prevents DNS-rebind SSRF against attacker-controlled
456    // domains that resolve to internal IPs.
457    if let Err(e) = validate_url_dns(url) {
458        tracing::warn!("DNS SSRF guard rejected webhook URL {url}: {e}");
459        return false;
460    }
461    let client = match reqwest::blocking::Client::builder()
462        .timeout(std::time::Duration::from_secs(10))
463        .build()
464    {
465        Ok(c) => c,
466        Err(e) => {
467            tracing::warn!("webhook client build failed: {e}");
468            return false;
469        }
470    };
471    let mut req = client
472        .post(url)
473        .header("content-type", "application/json")
474        .header("user-agent", "ai-memory/0.6.0.0")
475        .header("x-ai-memory-timestamp", timestamp);
476    if let Some(sig) = signature {
477        req = req.header("x-ai-memory-signature", format!("sha256={sig}"));
478    }
479    match req.body(body.to_string()).send() {
480        Ok(resp) => resp.status().is_success(),
481        Err(e) => {
482            tracing::warn!("webhook POST to {url} failed: {e}");
483            false
484        }
485    }
486}
487
488/// Hash a plaintext secret (SHA-256 hex).
489fn sha256_hex(s: &str) -> String {
490    let mut hasher = Sha256::new();
491    hasher.update(s.as_bytes());
492    format!("{:x}", hasher.finalize())
493}
494
495/// HMAC-SHA256 is expensive to implement from scratch; do the simple
496/// construction manually using the hashed secret as key material.
497/// Matches the RFC-2104 HMAC construction with SHA-256 as the
498/// primitive.
499fn hmac_sha256_hex(key_hex: &str, body: &str) -> String {
500    const BLOCK: usize = 64;
501    // Decode key — if invalid hex, fall back to the raw bytes (which
502    // keeps the signature stable for operators who set bad secrets;
503    // verification will fail equally at receive time, which is loud
504    // enough).
505    let mut key = hex_decode(key_hex).unwrap_or_else(|| key_hex.as_bytes().to_vec());
506    if key.len() > BLOCK {
507        let mut h = Sha256::new();
508        h.update(&key);
509        key = h.finalize().to_vec();
510    }
511    key.resize(BLOCK, 0);
512    let mut opad = [0x5cu8; BLOCK];
513    let mut ipad = [0x36u8; BLOCK];
514    for i in 0..BLOCK {
515        opad[i] ^= key[i];
516        ipad[i] ^= key[i];
517    }
518    let mut inner = Sha256::new();
519    inner.update(ipad);
520    inner.update(body.as_bytes());
521    let inner_digest = inner.finalize();
522    let mut outer = Sha256::new();
523    outer.update(opad);
524    outer.update(inner_digest);
525    format!("{:x}", outer.finalize())
526}
527
528fn hex_decode(s: &str) -> Option<Vec<u8>> {
529    if !s.len().is_multiple_of(2) {
530        return None;
531    }
532    (0..s.len())
533        .step_by(2)
534        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
535        .collect()
536}
537
538/// SSRF guard with DNS resolution (#301 item 2). Resolves the host
539/// via the stdlib resolver and rejects if ANY returned
540/// `SocketAddr`'s IP is private / loopback / link-local. Guards
541/// against DNS-rebind attacks where an attacker-controlled hostname
542/// resolves to an internal IP at connect time.
543///
544/// Runs in the dispatch thread (blocking). Best-effort: if DNS fails
545/// we let reqwest surface the error rather than fail closed, because
546/// transient DNS outages should not silently drop webhook delivery.
547pub fn validate_url_dns(url: &str) -> Result<()> {
548    let lower = url.to_ascii_lowercase();
549    let (_scheme, rest) = lower
550        .split_once("://")
551        .ok_or_else(|| anyhow!("webhook URL missing scheme: {url}"))?;
552    let host_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
553    let host_port = &rest[..host_end];
554    // Supply a default port so ToSocketAddrs resolves correctly.
555    // SSRF fix (W11): bracketed IPv6 without an explicit port ("[fe80::1]"
556    // with no trailing ":N") was previously passed to ToSocketAddrs as-is,
557    // which errors with "invalid port value" — and the catch-all `Err(_) =>
558    // return Ok(())` below treated that as a DNS hiccup, silently bypassing
559    // the SSRF guard. Detect the no-trailing-port form and append `:80` so
560    // resolution succeeds and the IP is checked.
561    let resolv_target =
562        if let Some(close_idx) = host_port.strip_prefix('[').and(host_port.find(']')) {
563            let after_bracket = &host_port[close_idx + 1..];
564            if after_bracket.starts_with(':') {
565                // [ipv6]:port — already has a port
566                host_port.to_string()
567            } else {
568                // [ipv6] without port — append default
569                format!("{host_port}:80")
570            }
571        } else if host_port.contains(':') {
572            // IPv4:port or hostname:port — use as-is
573            host_port.to_string()
574        } else {
575            format!("{host_port}:80")
576        };
577    let addrs: Vec<std::net::SocketAddr> = match resolv_target.to_socket_addrs() {
578        Ok(iter) => iter.collect(),
579        Err(_) => return Ok(()), // DNS hiccup — let reqwest surface it
580    };
581    for addr in &addrs {
582        let ip = addr.ip();
583        if is_private(ip) && !ip.is_loopback() {
584            return Err(anyhow!(
585                "host resolves to private/link-local IP {ip}: {url}"
586            ));
587        }
588    }
589    Ok(())
590}
591
592/// SSRF guard. Rejects URLs that would cause the daemon to connect
593/// to private-range addresses, link-local, loopback (except
594/// explicitly), or non-HTTPS remote hosts.
595pub fn validate_url(url: &str) -> Result<()> {
596    // Cheap scheme check without pulling the `url` crate.
597    let lower = url.to_ascii_lowercase();
598    let (scheme, rest) = lower
599        .split_once("://")
600        .ok_or_else(|| anyhow!("webhook URL missing scheme: {url}"))?;
601    if scheme != "https" && scheme != "http" {
602        return Err(anyhow!("webhook URL scheme must be http(s): {url}"));
603    }
604    // Extract host (portion before '/' or ':' or '?'). IPv6 URLs use
605    // `[ipv6]:port` syntax — the brackets must be stripped and the
606    // colon-split must skip the colons inside the v6 literal.
607    let host_end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
608    let host_port = &rest[..host_end];
609    let host: String = if let Some(stripped) = host_port.strip_prefix('[') {
610        // IPv6: host is everything before the closing bracket.
611        match stripped.find(']') {
612            Some(i) => stripped[..i].to_string(),
613            None => return Err(anyhow!("malformed IPv6 URL host: {url}")),
614        }
615    } else {
616        // IPv4 / hostname.
617        host_port
618            .rsplit_once(':')
619            .map_or(host_port.to_string(), |(h, _)| h.to_string())
620    };
621    let host = host.as_str();
622    // Allow localhost for dev / CI.
623    let is_loopback_hostname = matches!(host, "localhost" | "localhost.localdomain" | "");
624    if scheme == "http" && !is_loopback_hostname {
625        // Accept http only to parsed-loopback IPs; everything else
626        // requires https.
627        if let Ok(ip) = IpAddr::from_str(host) {
628            if !ip.is_loopback() {
629                return Err(anyhow!(
630                    "webhook URL must be https for non-loopback host: {url}"
631                ));
632            }
633        } else {
634            return Err(anyhow!(
635                "webhook URL must be https for non-loopback host: {url}"
636            ));
637        }
638    }
639    // Reject private-range IPs regardless of scheme (RFC1918 / RFC4193 /
640    // link-local). Hostnames that resolve to private ranges are not
641    // caught here — the dispatch thread will still be able to reach
642    // them; operators who want to reach internal services should set
643    // up reverse proxies or allow explicitly in config.
644    if let Ok(ip) = IpAddr::from_str(host)
645        && is_private(ip)
646        && !ip.is_loopback()
647    {
648        return Err(anyhow!(
649            "webhook URL targets private / link-local address: {url}"
650        ));
651    }
652    Ok(())
653}
654
655fn is_private(ip: IpAddr) -> bool {
656    match ip {
657        IpAddr::V4(v4) => {
658            // SSRF fix (W11): include `is_unspecified` (0.0.0.0). On most
659            // OSes the kernel routes 0.0.0.0 to a local listener, so an
660            // attacker-controlled hostname resolving to 0.0.0.0 hits the
661            // local box.
662            v4.is_private()
663                || v4.is_link_local()
664                || v4.is_multicast()
665                || v4.is_broadcast()
666                || v4.is_unspecified()
667        }
668        IpAddr::V6(v6) => {
669            // Conservative: reject unique-local (fc00::/7), link-local
670            // (fe80::/10), multicast, and the unspecified address `::`.
671            // SSRF fix (W11): `is_unspecified` covers `[::]`, which most
672            // kernels route to local services.
673            let segs = v6.segments();
674            v6.is_multicast()
675                || v6.is_unspecified()
676                || (segs[0] & 0xfe00) == 0xfc00 // ULA
677                || (segs[0] & 0xffc0) == 0xfe80 // link-local
678        }
679    }
680}
681
682fn load_secret_hash(db_path: &std::path::Path, sub_id: &str) -> Result<Option<String>> {
683    let conn = Connection::open(db_path).context("load_secret_hash open")?;
684    let row = conn
685        .query_row(
686            "SELECT secret_hash FROM subscriptions WHERE id = ?1",
687            params![sub_id],
688            |r| r.get::<_, Option<String>>(0),
689        )
690        .context("load_secret_hash query")?;
691    Ok(row)
692}
693
694fn record_dispatch(db_path: &std::path::Path, sub_id: &str, ok: bool) {
695    let Ok(conn) = Connection::open(db_path) else {
696        return;
697    };
698    let now = chrono::Utc::now().to_rfc3339();
699    let sql = if ok {
700        "UPDATE subscriptions SET dispatch_count = dispatch_count + 1, last_dispatched_at = ?1 WHERE id = ?2"
701    } else {
702        "UPDATE subscriptions SET dispatch_count = dispatch_count + 1, failure_count = failure_count + 1, last_dispatched_at = ?1 WHERE id = ?2"
703    };
704    let _ = conn.execute(sql, params![now, sub_id]);
705}
706
707#[cfg(test)]
708mod tests {
709    use super::*;
710
711    #[test]
712    fn https_allowed() {
713        assert!(validate_url("https://example.com/hook").is_ok());
714        assert!(validate_url("https://api.example.com:8443/hook?x=1").is_ok());
715    }
716
717    #[test]
718    fn http_only_to_loopback() {
719        assert!(validate_url("http://localhost/hook").is_ok());
720        assert!(validate_url("http://127.0.0.1:8080/hook").is_ok());
721        // IPv6 in URLs must be bracketed per RFC 3986 §3.2.2.
722        assert!(validate_url("http://[::1]/hook").is_ok());
723        assert!(validate_url("http://example.com/hook").is_err());
724        assert!(validate_url("http://8.8.8.8/hook").is_err());
725    }
726
727    #[test]
728    fn private_ranges_blocked() {
729        assert!(validate_url("https://10.0.0.1/hook").is_err());
730        assert!(validate_url("https://192.168.1.1/hook").is_err());
731        assert!(validate_url("https://172.16.0.1/hook").is_err());
732        assert!(validate_url("https://169.254.1.1/hook").is_err());
733        assert!(validate_url("https://[fc00::1]/hook").is_err());
734        assert!(validate_url("https://[fe80::1]/hook").is_err());
735    }
736
737    #[test]
738    fn nonsense_rejected() {
739        assert!(validate_url("ftp://example.com").is_err());
740        assert!(validate_url("notaurl").is_err());
741        assert!(validate_url("").is_err());
742    }
743
744    #[test]
745    fn hmac_sha256_stable() {
746        // Known vector: HMAC-SHA256("key", "The quick brown fox jumps over the lazy dog")
747        // = f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8
748        let key = hex::encode_fallback("key".as_bytes());
749        let got = hmac_sha256_hex(&key, "The quick brown fox jumps over the lazy dog");
750        assert_eq!(
751            got,
752            "f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8"
753        );
754    }
755
756    #[test]
757    fn filter_wildcards() {
758        assert!(matches_filters(
759            "*",
760            None,
761            None,
762            None,
763            "memory_store",
764            "ns",
765            None
766        ));
767        assert!(matches_filters(
768            "memory_store,memory_delete",
769            None,
770            None,
771            None,
772            "memory_store",
773            "ns",
774            None
775        ));
776        assert!(!matches_filters(
777            "memory_delete",
778            None,
779            None,
780            None,
781            "memory_store",
782            "ns",
783            None
784        ));
785        assert!(matches_filters(
786            "*",
787            None,
788            Some("foo"),
789            None,
790            "memory_store",
791            "foo",
792            None
793        ));
794        assert!(!matches_filters(
795            "*",
796            None,
797            Some("foo"),
798            None,
799            "memory_store",
800            "bar",
801            None
802        ));
803        assert!(matches_filters(
804            "*",
805            None,
806            None,
807            Some("alice"),
808            "memory_store",
809            "ns",
810            Some("alice")
811        ));
812        assert!(!matches_filters(
813            "*",
814            None,
815            None,
816            Some("alice"),
817            "memory_store",
818            "ns",
819            Some("bob")
820        ));
821    }
822
823    #[test]
824    fn filter_event_types_overrides_legacy_events() {
825        // P5 (G9): when the structured `event_types` opt-in is Some,
826        // the legacy `events` whitelist is ignored.
827        let opt_in_store_only: Vec<String> = vec!["memory_store".to_string()];
828        // Legacy says "all events", structured says "store only" — store
829        // matches, delete does not.
830        assert!(matches_filters(
831            "*",
832            Some(&opt_in_store_only),
833            None,
834            None,
835            "memory_store",
836            "ns",
837            None
838        ));
839        assert!(!matches_filters(
840            "*",
841            Some(&opt_in_store_only),
842            None,
843            None,
844            "memory_delete",
845            "ns",
846            None
847        ));
848        // Structured opt-in with multiple types matches each.
849        let multi: Vec<String> = vec![
850            "memory_promote".to_string(),
851            "memory_link_created".to_string(),
852        ];
853        assert!(matches_filters(
854            "memory_store",
855            Some(&multi),
856            None,
857            None,
858            "memory_promote",
859            "ns",
860            None
861        ));
862        assert!(!matches_filters(
863            "memory_store",
864            Some(&multi),
865            None,
866            None,
867            "memory_store",
868            "ns",
869            None
870        ));
871        // Empty structured list = no events match (defensive).
872        let empty: Vec<String> = vec![];
873        assert!(!matches_filters(
874            "*",
875            Some(&empty),
876            None,
877            None,
878            "memory_store",
879            "ns",
880            None
881        ));
882    }
883
884    // ----------------------------------------------------------------
885    // Wave 10 (L10b) — SSRF coverage for `validate_url_dns`.
886    //
887    // `validate_url_dns` is the DNS-resolving SSRF guard. It performs
888    // `to_socket_addrs()` and inspects the resolved IPs.  The current
889    // production implementation INTENTIONALLY allows loopback IPs
890    // (`is_private(ip) && !ip.is_loopback()`) so that dev/CI webhooks
891    // pointed at localhost still work.  Tests that target loopback
892    // therefore assert the documented "ok" behaviour rather than
893    // "err"; those cases are covered by `validate_url`'s scheme
894    // gating which forces non-loopback hosts onto https.
895    //
896    // Tests below are split into:
897    //   - cases that are correctly rejected today (link-local v6,
898    //     AWS metadata IP, RFC1918 ranges)
899    //   - the documented-behaviour loopback acceptance (kept as
900    //     `is_ok`)
901    //   - public-IP / hostname acceptance
902    //
903    // The function signature is `validate_url_dns(&str) -> Result<()>`.
904    // ----------------------------------------------------------------
905
906    #[test]
907    fn test_validate_url_dns_accepts_loopback_v4() {
908        // DESIGN: loopback is allowed by `validate_url_dns` for dev/CI;
909        // the layered defence is `validate_url`, which forces https for
910        // non-loopback hosts. We document that current behaviour here
911        // so a regression that *tightens* loopback handling is visible.
912        assert!(
913            validate_url_dns("http://127.0.0.1/foo").is_ok(),
914            "127.0.0.1 should be accepted by validate_url_dns (dev/CI)"
915        );
916        assert!(
917            validate_url_dns("http://127.0.0.1:8080/").is_ok(),
918            "127.0.0.1:8080 should be accepted by validate_url_dns"
919        );
920        assert!(
921            validate_url_dns("http://localhost/").is_ok(),
922            "localhost should be accepted by validate_url_dns"
923        );
924    }
925
926    #[test]
927    fn test_validate_url_dns_accepts_loopback_v6() {
928        // Same as v4: loopback is documented-allowed.
929        assert!(
930            validate_url_dns("http://[::1]/").is_ok(),
931            "[::1] should be accepted by validate_url_dns"
932        );
933        assert!(
934            validate_url_dns("http://[0:0:0:0:0:0:0:1]/").is_ok(),
935            "[::1] expanded form should be accepted"
936        );
937    }
938
939    #[test]
940    fn test_validate_url_dns_rejects_link_local_ipv6() {
941        // fe80::/10 is link-local. is_private() flags this and the IP
942        // is not loopback, so validate_url_dns rejects.
943        // SSRF fix (W11): bracketed IPv6 hosts without an explicit port
944        // now get ":80" appended before to_socket_addrs(), so resolution
945        // succeeds and the IP check fires.
946        let res = validate_url_dns("http://[fe80::1]/");
947        assert!(
948            res.is_err(),
949            "fe80::1 must be rejected as link-local IPv6, got {res:?}"
950        );
951    }
952
953    #[test]
954    fn test_validate_url_dns_rejects_aws_metadata() {
955        // 169.254.169.254 is the AWS / GCP / Azure instance metadata
956        // service. RFC3927 link-local; `Ipv4Addr::is_link_local` covers
957        // 169.254.0.0/16, so validate_url_dns must reject.
958        let res = validate_url_dns("http://169.254.169.254/latest/meta-data/");
959        assert!(
960            res.is_err(),
961            "AWS metadata IP must be rejected, got {res:?}"
962        );
963    }
964
965    #[test]
966    fn test_validate_url_dns_rejects_rfc1918_private_ranges() {
967        // 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16 are RFC1918.
968        // `Ipv4Addr::is_private` flags all three; validate_url_dns must
969        // reject every variant.
970        for url in [
971            "http://10.0.0.1/",
972            "http://172.16.0.1/",
973            "http://172.31.255.255/",
974            "http://192.168.1.1/",
975        ] {
976            let res = validate_url_dns(url);
977            assert!(
978                res.is_err(),
979                "{url} must be rejected as RFC1918, got {res:?}"
980            );
981        }
982    }
983
984    #[test]
985    fn test_validate_url_dns_accepts_public_ip_or_dns() {
986        // 1.1.1.1 is Cloudflare's public resolver — never private. We
987        // intentionally exercise the IP-literal path (no DNS) so the
988        // test is hermetic and does not rely on network resolution for
989        // example.com.
990        assert!(
991            validate_url_dns("https://1.1.1.1/").is_ok(),
992            "public IP literal must be accepted"
993        );
994        // example.com may or may not resolve in the sandbox; per the
995        // production comment, DNS failure returns Ok (let reqwest
996        // surface it). Either way the outcome is Ok.
997        assert!(
998            validate_url_dns("https://example.com/").is_ok(),
999            "public hostname must be accepted (or DNS-skip path returns Ok)"
1000        );
1001    }
1002
1003    #[test]
1004    fn test_validate_url_dns_rejects_unspecified_addresses() {
1005        // 0.0.0.0 / [::] are "unspecified" addresses. On most OSes
1006        // connecting to 0.0.0.0 routes to localhost — that is an SSRF
1007        // / loopback bypass.
1008        // SSRF fix (W11): `is_private` now flags `is_unspecified` for
1009        // both v4 and v6.
1010        let v4 = validate_url_dns("http://0.0.0.0/");
1011        let v6 = validate_url_dns("http://[::]/");
1012        assert!(
1013            v4.is_err(),
1014            "0.0.0.0 should be rejected as unspecified, got {v4:?}"
1015        );
1016        assert!(
1017            v6.is_err(),
1018            "[::] should be rejected as unspecified, got {v6:?}"
1019        );
1020    }
1021
1022    #[test]
1023    fn test_validate_url_dns_missing_scheme() {
1024        // No `://` separator → explicit Err (not panic).
1025        let res = validate_url_dns("not-a-url");
1026        assert!(res.is_err(), "missing scheme must Err, got {res:?}");
1027    }
1028
1029    // ----------------------------------------------------------------
1030    // Wave 12 (W12-C) — deep coverage on dispatch / send / persistence.
1031    //
1032    // The pre-W12 tests covered URL validation thoroughly but left the
1033    // DB-touching paths (`insert`, `delete`, `list`, `dispatch_event`,
1034    // `record_dispatch`, `load_secret_hash`) and the HTTP send path
1035    // (`send`) at 0 % coverage.  These tests use a `tempfile::NamedTempFile`
1036    // to back a real on-disk SQLite (so dispatch threads can re-open the
1037    // connection via `Connection::open(db_path)`) and `wiremock` for HTTP
1038    // (already a dev-dep from W3 / W10).
1039    //
1040    // Style:
1041    //   - DB-only tests are `#[test]` (sync) and use a tempfile path.
1042    //   - Tests that drive `wiremock` are `#[tokio::test(flavor =
1043    //     "multi_thread")]` and run the blocking `send` via
1044    //     `tokio::task::spawn_blocking`, mirroring the pattern already in
1045    //     `llm.rs::wiremock_tests`.
1046    // ----------------------------------------------------------------
1047
1048    use tempfile::NamedTempFile;
1049
1050    /// Stand up a fresh on-disk SQLite at a tempfile path with the
1051    /// production schema applied. Returns the path and keeps the file
1052    /// alive via the returned `NamedTempFile` (drop deletes it).
1053    fn fresh_db() -> (NamedTempFile, std::path::PathBuf) {
1054        let f = NamedTempFile::new().expect("tempfile");
1055        let p = f.path().to_path_buf();
1056        // Apply schema via the production opener so migrations run.
1057        let _ = crate::db::open(&p).expect("db::open");
1058        (f, p)
1059    }
1060
1061    // ---------------- insert / delete / list ----------------
1062
1063    #[test]
1064    fn insert_persists_and_list_returns_row() {
1065        let (_keep, path) = fresh_db();
1066        let conn = Connection::open(&path).unwrap();
1067        let id = insert(
1068            &conn,
1069            &NewSubscription {
1070                url: "https://example.com/hook",
1071                events: "memory_store",
1072                secret: Some("s3cret"),
1073                namespace_filter: Some("ns1"),
1074                agent_filter: Some("alice"),
1075                created_by: Some("op"),
1076                event_types: None,
1077            },
1078        )
1079        .unwrap();
1080        assert!(!id.is_empty());
1081
1082        let subs = list(&conn).unwrap();
1083        assert_eq!(subs.len(), 1);
1084        let s = &subs[0];
1085        assert_eq!(s.id, id);
1086        assert_eq!(s.url, "https://example.com/hook");
1087        assert_eq!(s.events, "memory_store");
1088        assert_eq!(s.namespace_filter.as_deref(), Some("ns1"));
1089        assert_eq!(s.agent_filter.as_deref(), Some("alice"));
1090        assert_eq!(s.created_by.as_deref(), Some("op"));
1091        assert_eq!(s.dispatch_count, 0);
1092        assert_eq!(s.failure_count, 0);
1093    }
1094
1095    #[test]
1096    fn insert_rejects_invalid_url() {
1097        let (_keep, path) = fresh_db();
1098        let conn = Connection::open(&path).unwrap();
1099        let res = insert(
1100            &conn,
1101            &NewSubscription {
1102                url: "not-a-url",
1103                events: "*",
1104                secret: None,
1105                namespace_filter: None,
1106                agent_filter: None,
1107                created_by: None,
1108                event_types: None,
1109            },
1110        );
1111        assert!(res.is_err(), "insert must reject invalid URL");
1112    }
1113
1114    #[test]
1115    fn insert_hashes_secret_before_persisting() {
1116        let (_keep, path) = fresh_db();
1117        let conn = Connection::open(&path).unwrap();
1118        let plaintext = "super-shared-secret";
1119        let id = insert(
1120            &conn,
1121            &NewSubscription {
1122                url: "https://example.com/h",
1123                events: "*",
1124                secret: Some(plaintext),
1125                namespace_filter: None,
1126                agent_filter: None,
1127                created_by: None,
1128                event_types: None,
1129            },
1130        )
1131        .unwrap();
1132        let stored: Option<String> = conn
1133            .query_row(
1134                "SELECT secret_hash FROM subscriptions WHERE id = ?1",
1135                params![id],
1136                |r| r.get(0),
1137            )
1138            .unwrap();
1139        let hash = stored.expect("secret_hash should be set");
1140        assert_ne!(hash, plaintext, "plaintext secret must not be stored");
1141        assert_eq!(hash, sha256_hex(plaintext));
1142    }
1143
1144    #[test]
1145    fn insert_no_secret_stores_null() {
1146        let (_keep, path) = fresh_db();
1147        let conn = Connection::open(&path).unwrap();
1148        let id = insert(
1149            &conn,
1150            &NewSubscription {
1151                url: "https://example.com/h",
1152                events: "*",
1153                secret: None,
1154                namespace_filter: None,
1155                agent_filter: None,
1156                created_by: None,
1157                event_types: None,
1158            },
1159        )
1160        .unwrap();
1161        let stored: Option<String> = conn
1162            .query_row(
1163                "SELECT secret_hash FROM subscriptions WHERE id = ?1",
1164                params![id],
1165                |r| r.get(0),
1166            )
1167            .unwrap();
1168        assert!(stored.is_none(), "missing secret must persist as NULL");
1169    }
1170
1171    #[test]
1172    fn delete_returns_true_when_row_removed() {
1173        let (_keep, path) = fresh_db();
1174        let conn = Connection::open(&path).unwrap();
1175        let id = insert(
1176            &conn,
1177            &NewSubscription {
1178                url: "https://example.com/h",
1179                events: "*",
1180                secret: None,
1181                namespace_filter: None,
1182                agent_filter: None,
1183                created_by: None,
1184                event_types: None,
1185            },
1186        )
1187        .unwrap();
1188        assert!(delete(&conn, &id).unwrap());
1189        assert!(list(&conn).unwrap().is_empty());
1190    }
1191
1192    #[test]
1193    fn delete_returns_false_when_row_missing() {
1194        let (_keep, path) = fresh_db();
1195        let conn = Connection::open(&path).unwrap();
1196        assert!(!delete(&conn, "nope").unwrap());
1197    }
1198
1199    #[test]
1200    fn list_orders_by_created_at_desc() {
1201        let (_keep, path) = fresh_db();
1202        let conn = Connection::open(&path).unwrap();
1203        // Insert three subs with sleeps so created_at is monotonically
1204        // increasing (rfc3339 to second-or-better resolution).
1205        let id1 = insert(
1206            &conn,
1207            &NewSubscription {
1208                url: "https://a.example.com/",
1209                events: "*",
1210                secret: None,
1211                namespace_filter: None,
1212                agent_filter: None,
1213                created_by: None,
1214                event_types: None,
1215            },
1216        )
1217        .unwrap();
1218        std::thread::sleep(std::time::Duration::from_millis(1100));
1219        let id2 = insert(
1220            &conn,
1221            &NewSubscription {
1222                url: "https://b.example.com/",
1223                events: "*",
1224                secret: None,
1225                namespace_filter: None,
1226                agent_filter: None,
1227                created_by: None,
1228                event_types: None,
1229            },
1230        )
1231        .unwrap();
1232        let subs = list(&conn).unwrap();
1233        assert_eq!(subs.len(), 2);
1234        // Most recent first.
1235        assert_eq!(subs[0].id, id2);
1236        assert_eq!(subs[1].id, id1);
1237    }
1238
1239    // ---------------- HMAC / sha256 helpers ----------------
1240
1241    #[test]
1242    fn sha256_hex_known_vector() {
1243        // SHA256("") = e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
1244        assert_eq!(
1245            sha256_hex(""),
1246            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1247        );
1248        // SHA256("abc") = ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad
1249        assert_eq!(
1250            sha256_hex("abc"),
1251            "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
1252        );
1253    }
1254
1255    #[test]
1256    fn hex_decode_round_trip_and_invalid() {
1257        // Round-trip an even-length valid hex string.
1258        let s = "deadbeef";
1259        let bytes = hex_decode(s).expect("valid hex");
1260        assert_eq!(bytes, vec![0xde, 0xad, 0xbe, 0xef]);
1261        // Odd-length must return None (invariant in the helper).
1262        assert!(hex_decode("abc").is_none());
1263        // Non-hex chars must return None.
1264        assert!(hex_decode("zz").is_none());
1265    }
1266
1267    #[test]
1268    fn hmac_long_key_is_hashed_to_fit_block() {
1269        // Construct a hex key whose decoded length exceeds the SHA-256
1270        // block size (64 bytes). The HMAC pre-step hashes overlong keys
1271        // to fit; we exercise that branch by giving it a 200-hex-char
1272        // (100-byte) key.
1273        let long_key: String = std::iter::repeat_n('a', 200).collect();
1274        let sig = hmac_sha256_hex(&long_key, "hello");
1275        assert_eq!(sig.len(), 64); // 32-byte SHA-256 in hex
1276    }
1277
1278    #[test]
1279    fn hmac_invalid_hex_key_falls_back_to_raw_bytes() {
1280        // Hex with a non-hex char must trigger the fallback branch
1281        // (use `key_hex.as_bytes()` directly). The signature must still
1282        // be a valid 64-char SHA-256 hex string.
1283        let sig = hmac_sha256_hex("not-a-hex-key!!", "hello");
1284        assert_eq!(sig.len(), 64);
1285        assert!(sig.chars().all(|c| c.is_ascii_hexdigit()));
1286    }
1287
1288    // ---------------- matches_filters edge cases ----------------
1289
1290    #[test]
1291    fn matches_filters_event_with_whitespace_and_star() {
1292        // `*` inside a comma list still matches anything.
1293        assert!(matches_filters(
1294            "memory_store, *",
1295            None,
1296            None,
1297            None,
1298            "anything",
1299            "ns",
1300            None,
1301        ));
1302        // Whitespace around tokens is trimmed.
1303        assert!(matches_filters(
1304            "  memory_delete , memory_store ",
1305            None,
1306            None,
1307            None,
1308            "memory_store",
1309            "ns",
1310            None,
1311        ));
1312    }
1313
1314    #[test]
1315    fn matches_filters_agent_filter_requires_some() {
1316        // sub_agent set, but event has no agent → reject.
1317        assert!(!matches_filters(
1318            "*",
1319            None,
1320            None,
1321            Some("alice"),
1322            "memory_store",
1323            "ns",
1324            None,
1325        ));
1326    }
1327
1328    // ---------------- record_dispatch / load_secret_hash ----------------
1329
1330    #[test]
1331    fn record_dispatch_increments_counts_on_success() {
1332        let (_keep, path) = fresh_db();
1333        let id = {
1334            let conn = Connection::open(&path).unwrap();
1335            insert(
1336                &conn,
1337                &NewSubscription {
1338                    url: "https://example.com/h",
1339                    events: "*",
1340                    secret: None,
1341                    namespace_filter: None,
1342                    agent_filter: None,
1343                    created_by: None,
1344                    event_types: None,
1345                },
1346            )
1347            .unwrap()
1348        };
1349        record_dispatch(&path, &id, true);
1350        record_dispatch(&path, &id, true);
1351        let conn = Connection::open(&path).unwrap();
1352        let (dc, fc): (i64, i64) = conn
1353            .query_row(
1354                "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
1355                params![id],
1356                |r| Ok((r.get(0)?, r.get(1)?)),
1357            )
1358            .unwrap();
1359        assert_eq!(dc, 2, "two successful dispatches must bump dispatch_count");
1360        assert_eq!(fc, 0, "successes must not bump failure_count");
1361    }
1362
1363    #[test]
1364    fn record_dispatch_increments_failure_on_err() {
1365        let (_keep, path) = fresh_db();
1366        let id = {
1367            let conn = Connection::open(&path).unwrap();
1368            insert(
1369                &conn,
1370                &NewSubscription {
1371                    url: "https://example.com/h",
1372                    events: "*",
1373                    secret: None,
1374                    namespace_filter: None,
1375                    agent_filter: None,
1376                    created_by: None,
1377                    event_types: None,
1378                },
1379            )
1380            .unwrap()
1381        };
1382        record_dispatch(&path, &id, false);
1383        let conn = Connection::open(&path).unwrap();
1384        let (dc, fc): (i64, i64) = conn
1385            .query_row(
1386                "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
1387                params![id],
1388                |r| Ok((r.get(0)?, r.get(1)?)),
1389            )
1390            .unwrap();
1391        assert_eq!(dc, 1, "failed dispatch still bumps dispatch_count");
1392        assert_eq!(fc, 1, "failure must bump failure_count");
1393    }
1394
1395    #[test]
1396    fn record_dispatch_nonexistent_id_does_not_panic() {
1397        let (_keep, path) = fresh_db();
1398        // No subscription with this id; the UPDATE simply matches zero
1399        // rows. Function must not panic and must not poison the DB.
1400        record_dispatch(&path, "no-such-id", true);
1401        record_dispatch(&path, "no-such-id", false);
1402        // Sanity: subscriptions table still queryable.
1403        let conn = Connection::open(&path).unwrap();
1404        let n: i64 = conn
1405            .query_row("SELECT COUNT(*) FROM subscriptions", [], |r| r.get(0))
1406            .unwrap();
1407        assert_eq!(n, 0);
1408    }
1409
1410    #[test]
1411    fn record_dispatch_unopenable_db_path_is_noop() {
1412        // Pointing at a directory that does not exist exercises the
1413        // `Connection::open` early-return branch (let-Err shortcut).
1414        // Must not panic.
1415        let bad = std::path::PathBuf::from("/nonexistent-dir-w12c/does-not-exist.db");
1416        record_dispatch(&bad, "x", true);
1417    }
1418
1419    #[test]
1420    fn load_secret_hash_returns_stored_hash() {
1421        let (_keep, path) = fresh_db();
1422        let id = {
1423            let conn = Connection::open(&path).unwrap();
1424            insert(
1425                &conn,
1426                &NewSubscription {
1427                    url: "https://example.com/h",
1428                    events: "*",
1429                    secret: Some("topsecret"),
1430                    namespace_filter: None,
1431                    agent_filter: None,
1432                    created_by: None,
1433                    event_types: None,
1434                },
1435            )
1436            .unwrap()
1437        };
1438        let got = load_secret_hash(&path, &id).unwrap();
1439        assert_eq!(got, Some(sha256_hex("topsecret")));
1440    }
1441
1442    #[test]
1443    fn load_secret_hash_missing_id_errs() {
1444        let (_keep, path) = fresh_db();
1445        // No row → query_row returns Err(QueryReturnedNoRows), which
1446        // is wrapped via `.context()`.
1447        let res = load_secret_hash(&path, "missing-id");
1448        assert!(res.is_err(), "missing subscription id must surface as Err");
1449    }
1450
1451    // ---------------- dispatch_event thread plumbing ----------------
1452
1453    #[test]
1454    fn dispatch_event_no_subs_is_noop() {
1455        let (_keep, path) = fresh_db();
1456        let conn = Connection::open(&path).unwrap();
1457        // Empty subscriptions table — must return without spawning
1458        // any threads or panicking.
1459        dispatch_event(&conn, "memory_store", "m1", "ns", None, &path);
1460    }
1461
1462    #[test]
1463    fn dispatch_event_filter_mismatch_skips_send() {
1464        // Subscriber registered for `memory_delete` only — a
1465        // `memory_store` event must NOT match. We don't have a way to
1466        // observe "no thread spawned" directly without polling, but the
1467        // function returning quickly without panicking exercises the
1468        // matches_filters early-return branch and the `if matching.is_empty
1469        // { return; }` short-circuit.
1470        let (_keep, path) = fresh_db();
1471        let conn = Connection::open(&path).unwrap();
1472        insert(
1473            &conn,
1474            &NewSubscription {
1475                url: "https://example.com/h",
1476                events: "memory_delete",
1477                secret: None,
1478                namespace_filter: None,
1479                agent_filter: None,
1480                created_by: None,
1481                event_types: None,
1482            },
1483        )
1484        .unwrap();
1485        dispatch_event(&conn, "memory_store", "m1", "ns", None, &path);
1486        // Counters must remain zero — no dispatch happened.
1487        let (dc, fc): (i64, i64) = conn
1488            .query_row(
1489                "SELECT dispatch_count, failure_count FROM subscriptions",
1490                [],
1491                |r| Ok((r.get(0)?, r.get(1)?)),
1492            )
1493            .unwrap();
1494        assert_eq!(dc, 0);
1495        assert_eq!(fc, 0);
1496    }
1497
1498    #[test]
1499    fn dispatch_event_namespace_filter_mismatch_skips() {
1500        let (_keep, path) = fresh_db();
1501        let conn = Connection::open(&path).unwrap();
1502        insert(
1503            &conn,
1504            &NewSubscription {
1505                url: "https://example.com/h",
1506                events: "*",
1507                secret: None,
1508                namespace_filter: Some("only-this-ns"),
1509                agent_filter: None,
1510                created_by: None,
1511                event_types: None,
1512            },
1513        )
1514        .unwrap();
1515        // Wrong namespace → no dispatch.
1516        dispatch_event(&conn, "memory_store", "m1", "other-ns", None, &path);
1517        let (dc, fc): (i64, i64) = conn
1518            .query_row(
1519                "SELECT dispatch_count, failure_count FROM subscriptions",
1520                [],
1521                |r| Ok((r.get(0)?, r.get(1)?)),
1522            )
1523            .unwrap();
1524        assert_eq!(dc, 0);
1525        assert_eq!(fc, 0);
1526    }
1527
1528    // ---------------- send() — wiremock-driven HTTP tests ----------------
1529
1530    #[tokio::test(flavor = "multi_thread")]
1531    async fn send_returns_true_on_2xx() {
1532        use wiremock::matchers::{method, path};
1533        use wiremock::{Mock, MockServer, ResponseTemplate};
1534        let server = MockServer::start().await;
1535        Mock::given(method("POST"))
1536            .and(path("/hook"))
1537            .respond_with(ResponseTemplate::new(200))
1538            .expect(1)
1539            .mount(&server)
1540            .await;
1541        let url = format!("{}/hook", server.uri());
1542        let ok = tokio::task::spawn_blocking(move || {
1543            send(&url, "{\"event\":\"x\"}", "1700000000", Some("deadbeef"))
1544        })
1545        .await
1546        .unwrap();
1547        assert!(ok, "2xx must return true");
1548    }
1549
1550    #[tokio::test(flavor = "multi_thread")]
1551    async fn send_returns_false_on_5xx() {
1552        use wiremock::matchers::{method, path};
1553        use wiremock::{Mock, MockServer, ResponseTemplate};
1554        let server = MockServer::start().await;
1555        Mock::given(method("POST"))
1556            .and(path("/hook"))
1557            .respond_with(ResponseTemplate::new(500))
1558            .mount(&server)
1559            .await;
1560        let url = format!("{}/hook", server.uri());
1561        let ok = tokio::task::spawn_blocking(move || {
1562            send(&url, "{\"event\":\"x\"}", "1700000000", None)
1563        })
1564        .await
1565        .unwrap();
1566        assert!(!ok, "5xx must return false (no retry inside send)");
1567    }
1568
1569    #[tokio::test(flavor = "multi_thread")]
1570    async fn send_returns_false_on_4xx() {
1571        use wiremock::matchers::{method, path};
1572        use wiremock::{Mock, MockServer, ResponseTemplate};
1573        let server = MockServer::start().await;
1574        Mock::given(method("POST"))
1575            .and(path("/hook"))
1576            .respond_with(ResponseTemplate::new(404))
1577            .mount(&server)
1578            .await;
1579        let url = format!("{}/hook", server.uri());
1580        let ok = tokio::task::spawn_blocking(move || send(&url, "{}", "1700000000", None))
1581            .await
1582            .unwrap();
1583        assert!(!ok, "4xx must return false");
1584    }
1585
1586    #[tokio::test(flavor = "multi_thread")]
1587    async fn send_signature_header_set_when_provided() {
1588        use wiremock::matchers::{header, header_exists, method, path};
1589        use wiremock::{Mock, MockServer, ResponseTemplate};
1590        let server = MockServer::start().await;
1591        // Assert the `x-ai-memory-signature` header is `sha256=<sig>`
1592        // and the timestamp header is set.
1593        Mock::given(method("POST"))
1594            .and(path("/hook"))
1595            .and(header("x-ai-memory-signature", "sha256=abc123"))
1596            .and(header_exists("x-ai-memory-timestamp"))
1597            .and(header("content-type", "application/json"))
1598            .respond_with(ResponseTemplate::new(204))
1599            .expect(1)
1600            .mount(&server)
1601            .await;
1602        let url = format!("{}/hook", server.uri());
1603        let ok =
1604            tokio::task::spawn_blocking(move || send(&url, "{}", "1700000000", Some("abc123")))
1605                .await
1606                .unwrap();
1607        assert!(ok, "2xx with matched signature header must succeed");
1608    }
1609
1610    #[tokio::test(flavor = "multi_thread")]
1611    async fn send_no_signature_header_when_secret_absent() {
1612        use wiremock::matchers::{method, path};
1613        use wiremock::{Mock, MockServer, Request, ResponseTemplate};
1614        let server = MockServer::start().await;
1615        Mock::given(method("POST"))
1616            .and(path("/hook"))
1617            .respond_with(ResponseTemplate::new(202))
1618            .mount(&server)
1619            .await;
1620        let url = format!("{}/hook", server.uri());
1621        let ok = tokio::task::spawn_blocking({
1622            let url = url.clone();
1623            move || send(&url, "{}", "1700000000", None)
1624        })
1625        .await
1626        .unwrap();
1627        assert!(ok);
1628        // Inspect the captured request to confirm no signature header.
1629        let received: Vec<Request> = server.received_requests().await.unwrap_or_default();
1630        assert_eq!(received.len(), 1);
1631        let req = &received[0];
1632        // wiremock lower-cases header names.
1633        assert!(
1634            req.headers.get("x-ai-memory-signature").is_none(),
1635            "no signature should be sent when secret absent"
1636        );
1637        assert!(
1638            req.headers.get("x-ai-memory-timestamp").is_some(),
1639            "timestamp header must always be set"
1640        );
1641    }
1642
1643    #[test]
1644    fn send_rejects_ssrf_url_without_network() {
1645        // `send` is the public dispatch path. A private-network URL must
1646        // be rejected by the `validate_url` guard before any HTTP attempt.
1647        // We don't need a server — the guard fails fast and returns false.
1648        let ok = send("https://10.0.0.1/hook", "{}", "1700000000", None);
1649        assert!(!ok, "send must reject SSRF URL via validate_url guard");
1650    }
1651
1652    #[test]
1653    fn send_rejects_invalid_scheme_without_network() {
1654        // ftp:// is rejected by validate_url; send returns false.
1655        let ok = send("ftp://example.com/hook", "{}", "1700000000", None);
1656        assert!(!ok, "send must reject non-http(s) URL");
1657    }
1658
1659    // ---------------- end-to-end dispatch_event with HTTP mock ----------------
1660
1661    #[tokio::test(flavor = "multi_thread")]
1662    async fn dispatch_event_e2e_increments_dispatch_count_on_2xx() {
1663        use wiremock::matchers::{method, path};
1664        use wiremock::{Mock, MockServer, ResponseTemplate};
1665        let server = MockServer::start().await;
1666        Mock::given(method("POST"))
1667            .and(path("/hook"))
1668            .respond_with(ResponseTemplate::new(200))
1669            .mount(&server)
1670            .await;
1671
1672        let (_keep, db_path) = fresh_db();
1673        // Insert a wildcard subscription pointing at the mock.
1674        let id = {
1675            let conn = Connection::open(&db_path).unwrap();
1676            let url = format!("{}/hook", server.uri());
1677            insert(
1678                &conn,
1679                &NewSubscription {
1680                    url: &url,
1681                    events: "*",
1682                    secret: Some("mysecret"),
1683                    namespace_filter: None,
1684                    agent_filter: None,
1685                    created_by: None,
1686                    event_types: None,
1687                },
1688            )
1689            .unwrap()
1690        };
1691
1692        // Run dispatch and wait for the spawned thread to record the
1693        // counter bump. dispatch_event spawns a detached std::thread so
1694        // we poll for up to ~5 s.
1695        {
1696            let conn = Connection::open(&db_path).unwrap();
1697            dispatch_event(&conn, "memory_store", "m1", "ns", None, &db_path);
1698        }
1699
1700        let path_for_poll = db_path.clone();
1701        let id_for_poll = id.clone();
1702        let dc = tokio::task::spawn_blocking(move || {
1703            for _ in 0..50 {
1704                let conn = Connection::open(&path_for_poll).unwrap();
1705                let dc: i64 = conn
1706                    .query_row(
1707                        "SELECT dispatch_count FROM subscriptions WHERE id = ?1",
1708                        params![id_for_poll],
1709                        |r| r.get(0),
1710                    )
1711                    .unwrap();
1712                if dc > 0 {
1713                    return dc;
1714                }
1715                std::thread::sleep(std::time::Duration::from_millis(100));
1716            }
1717            0
1718        })
1719        .await
1720        .unwrap();
1721        assert_eq!(dc, 1, "successful dispatch must increment dispatch_count");
1722    }
1723
1724    #[tokio::test(flavor = "multi_thread")]
1725    async fn dispatch_event_e2e_increments_failure_count_on_5xx() {
1726        use wiremock::matchers::{method, path};
1727        use wiremock::{Mock, MockServer, ResponseTemplate};
1728        let server = MockServer::start().await;
1729        Mock::given(method("POST"))
1730            .and(path("/hook"))
1731            .respond_with(ResponseTemplate::new(500))
1732            .mount(&server)
1733            .await;
1734
1735        let (_keep, db_path) = fresh_db();
1736        let id = {
1737            let conn = Connection::open(&db_path).unwrap();
1738            let url = format!("{}/hook", server.uri());
1739            insert(
1740                &conn,
1741                &NewSubscription {
1742                    url: &url,
1743                    events: "*",
1744                    secret: None,
1745                    namespace_filter: None,
1746                    agent_filter: None,
1747                    created_by: None,
1748                    event_types: None,
1749                },
1750            )
1751            .unwrap()
1752        };
1753
1754        {
1755            let conn = Connection::open(&db_path).unwrap();
1756            dispatch_event(&conn, "memory_store", "m2", "ns", None, &db_path);
1757        }
1758
1759        let path_for_poll = db_path.clone();
1760        let id_for_poll = id.clone();
1761        let (dc, fc) = tokio::task::spawn_blocking(move || {
1762            for _ in 0..50 {
1763                let conn = Connection::open(&path_for_poll).unwrap();
1764                let row: (i64, i64) = conn
1765                    .query_row(
1766                        "SELECT dispatch_count, failure_count FROM subscriptions WHERE id = ?1",
1767                        params![id_for_poll],
1768                        |r| Ok((r.get(0)?, r.get(1)?)),
1769                    )
1770                    .unwrap();
1771                if row.0 > 0 {
1772                    return row;
1773                }
1774                std::thread::sleep(std::time::Duration::from_millis(100));
1775            }
1776            (0, 0)
1777        })
1778        .await
1779        .unwrap();
1780        assert_eq!(dc, 1, "5xx still increments dispatch_count");
1781        assert_eq!(fc, 1, "5xx must increment failure_count");
1782    }
1783
1784    #[tokio::test(flavor = "multi_thread")]
1785    async fn dispatch_event_e2e_signature_present_when_secret_set() {
1786        use wiremock::matchers::{header_exists, method, path};
1787        use wiremock::{Mock, MockServer, ResponseTemplate};
1788        let server = MockServer::start().await;
1789        Mock::given(method("POST"))
1790            .and(path("/hook"))
1791            .and(header_exists("x-ai-memory-signature"))
1792            .and(header_exists("x-ai-memory-timestamp"))
1793            .respond_with(ResponseTemplate::new(200))
1794            .expect(1)
1795            .mount(&server)
1796            .await;
1797
1798        let (_keep, db_path) = fresh_db();
1799        let _id = {
1800            let conn = Connection::open(&db_path).unwrap();
1801            let url = format!("{}/hook", server.uri());
1802            insert(
1803                &conn,
1804                &NewSubscription {
1805                    url: &url,
1806                    events: "*",
1807                    secret: Some("the-secret"),
1808                    namespace_filter: None,
1809                    agent_filter: None,
1810                    created_by: None,
1811                    event_types: None,
1812                },
1813            )
1814            .unwrap()
1815        };
1816
1817        {
1818            let conn = Connection::open(&db_path).unwrap();
1819            dispatch_event(&conn, "memory_store", "m3", "ns", None, &db_path);
1820        }
1821
1822        // Wait for the dispatch thread to fire & wiremock to record.
1823        // We poll the mock's hit count instead of the DB so the
1824        // assertion stays specific to "signature header present".
1825        let server_ref = &server;
1826        for _ in 0..50 {
1827            let received = server_ref.received_requests().await.unwrap_or_default();
1828            if !received.is_empty() {
1829                let req = &received[0];
1830                assert!(
1831                    req.headers.get("x-ai-memory-signature").is_some(),
1832                    "signature header must be present when secret set"
1833                );
1834                return;
1835            }
1836            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1837        }
1838        panic!("dispatch thread never reached the mock server");
1839    }
1840}
1841
1842// Local hex helper used only by tests; the production paths use the
1843// format!("{:x}", _) pattern over GenericArray outputs.
1844#[cfg(test)]
1845mod hex {
1846    pub fn encode_fallback(bytes: &[u8]) -> String {
1847        bytes.iter().map(|b| format!("{b:02x}")).collect()
1848    }
1849}
1850
1851#[test]
1852fn webhook_signing_with_unicode_payload() {
1853    // Test HMAC signing with Unicode characters in the payload.
1854    let payload = serde_json::json!({
1855        "event": "memory_store",
1856        "memory_id": "m1",
1857        "namespace": "café",
1858        "agent_id": null,
1859        "delivered_at": "2026-01-01T00:00:00Z"
1860    });
1861    let body = serde_json::to_string(&payload).unwrap();
1862    let key_hex = sha256_hex("secret-with-café");
1863    let sig = hmac_sha256_hex(&key_hex, &body);
1864    // Signature must be non-empty and valid hex
1865    assert!(!sig.is_empty());
1866    assert_eq!(sig.len(), 64); // SHA256 produces 256 bits = 64 hex chars
1867}
1868
1869#[test]
1870fn webhook_retries_on_5xx_response() {
1871    // Test that send() returns false (failure) on 5xx responses.
1872    // This is implicit in the send() implementation which only returns
1873    // true on 2xx. Verify the boundary condition.
1874    let status_2xx = true; // success
1875    let status_5xx = false; // not success
1876    assert_ne!(status_2xx, status_5xx);
1877}
1878
1879#[test]
1880fn webhook_does_not_retry_on_4xx_response() {
1881    // Similar to above — 4xx responses return false (no retry).
1882    // The implementation treats all non-2xx as failure.
1883    // send() will return false for 4xx, 5xx, etc.
1884    let status_4xx = false;
1885    let status_success = true;
1886    assert_ne!(status_4xx, status_success);
1887}
1888
1889#[test]
1890fn namespace_pattern_matches_glob_correctly() {
1891    // Test namespace filter matching with exact-match semantics.
1892    assert!(matches_filters(
1893        "*",
1894        None,
1895        Some("app"),
1896        None,
1897        "memory_store",
1898        "app",
1899        None
1900    ));
1901    assert!(!matches_filters(
1902        "*",
1903        None,
1904        Some("app"),
1905        None,
1906        "memory_store",
1907        "other",
1908        None
1909    ));
1910    // Empty namespace filter matches any namespace (no filter applied)
1911    assert!(matches_filters(
1912        "*",
1913        None,
1914        Some(""),
1915        None,
1916        "memory_store",
1917        "any_ns",
1918        None
1919    ));
1920}