Skip to main content

ai_memory/handlers/
subscriptions.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Notify / subscribe / unsubscribe / list_subscriptions HTTP handlers.
5//!
6//! Extracted from [`super::hook_subscribers`] under issue #650 (handler
7//! cap ≤1200 LOC). Handler bodies are unchanged; only the module surface
8//! moved. Wire compatibility preserved via `pub use subscriptions::*` in
9//! [`super`].
10
11#![allow(clippy::too_many_lines)]
12
13use crate::models::field_names;
14use axum::{
15    Json,
16    extract::{Query, State},
17    http::{HeaderMap, StatusCode},
18    response::IntoResponse,
19};
20use serde::Deserialize;
21use serde_json::json;
22
23use crate::db;
24#[cfg(feature = "sal")]
25use crate::models::{ConfidenceSource, Memory, Tier};
26#[cfg(feature = "sal")]
27use chrono::Utc;
28
29use super::AppState;
30#[cfg(feature = "sal")]
31use super::StorageBackend;
32#[cfg(feature = "sal")]
33use super::store_err_to_response;
34use super::{fanout_or_503, resolve_caller_agent_id};
35
36/// Namespace prefix under which subscriptions are mirrored as memories
37/// (`_subscriptions/<agent_id>`). Used by the postgres dispatch path to
38/// scope the subscriber lookup to a sargable namespace range.
39#[cfg(feature = "sal")]
40const SUBSCRIPTION_NS_PREFIX: &str = "_subscriptions/";
41
42/// Memory `kind` marker for subscription rows (#1558 batch 6).
43#[cfg(feature = "sal")]
44const KIND_SUBSCRIPTION: &str = "subscription";
45
46/// `_subscriptions/<caller>` — the per-caller subscription namespace.
47/// (Single synthesis site; `SUBSCRIPTION_NS_PREFIX` above is the sargable
48/// range form and is `sal`-gated, so the template stays self-contained.)
49#[cfg(feature = "sal")]
50fn caller_subscription_ns(caller: impl std::fmt::Display) -> String {
51    format!("_subscriptions/{caller}")
52}
53
54/// Upper bound on subscription rows pulled per dispatch tick. Matches
55/// the sqlite path's implicit ceiling; production deployments rarely
56/// exceed dozens of subscribers.
57#[cfg(feature = "sal")]
58const SUBSCRIPTION_DISPATCH_LIMIT: usize = 1000;
59
60// --- /api/v1/notify (POST) + /api/v1/inbox (GET) ---------------------------
61
62#[derive(Deserialize)]
63pub struct NotifyBody {
64    pub target_agent_id: String,
65    pub title: String,
66    /// Accept either `payload` (MCP tool name) or `content` (S32 scenario).
67    #[serde(default)]
68    pub payload: Option<String>,
69    #[serde(default)]
70    pub content: Option<String>,
71    #[serde(default)]
72    pub priority: Option<i64>,
73    #[serde(default)]
74    pub tier: Option<String>,
75    /// Optional explicit sender id — falls back to `X-Agent-Id` header.
76    #[serde(default)]
77    pub agent_id: Option<String>,
78}
79
80pub async fn notify(
81    State(app): State<AppState>,
82    headers: HeaderMap,
83    Json(body): Json<NotifyBody>,
84) -> impl IntoResponse {
85    let Some(payload) = body.payload.or(body.content) else {
86        return (
87            StatusCode::BAD_REQUEST,
88            Json(json!({"error": "payload or content is required"})),
89        )
90            .into_response();
91    };
92    // #901 (security-high, 2026-05-19) — sibling of #874. Authenticate
93    // via X-Agent-Id header ONLY; the body-supplied `agent_id` is
94    // caller-controlled and was the cross-tenant spoof vector. The
95    // body value is now a refinement that must MATCH the authenticated
96    // caller, else 403.
97    let sender = match resolve_caller_agent_id(None, &headers, None) {
98        Ok(id) => id,
99        Err(e) => {
100            return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
101        }
102    };
103    if let Some(claimed) = body.agent_id.as_deref()
104        && claimed != sender
105    {
106        return (
107            StatusCode::FORBIDDEN,
108            Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
109        )
110            .into_response();
111    }
112
113    // v0.7.0 fold-A2A1.1 (#700, F-A2A1.1) — postgres-backed daemons
114    // route through the SAL `notify` trait method AND fan the resulting
115    // inbox memory out to peers via the same quorum-write contract the
116    // sqlite branch already uses below. Federation fanout is now backend-
117    // blind: `broadcast_store_quorum` takes a `Memory` + `FederationConfig`
118    // and HTTP-POSTs to each peer's `sync_push` regardless of where the
119    // local row was persisted. Cross-namespace subscription dispatch
120    // is achieved by writing the subscription memory itself through the
121    // shared store (see `subscribe` below) so subscribers on every peer
122    // see the same `_subscriptions/<aid>` namespace.
123    #[cfg(feature = "sal")]
124    if matches!(app.storage_backend, StorageBackend::Postgres) {
125        let priority_i32 = body.priority.and_then(|p| i32::try_from(p).ok());
126        // Canonical wire deserializer for the HTTP `tier` field — the
127        // raw string literals here pair byte-for-byte with
128        // v0.7.0 F-C6 fix (issue #1432): route through the canonical
129        // `Tier::from_str` SSOT at `src/models/memory.rs:395`. The prior
130        // inline parser duplicated the match body; routing through the
131        // const SSOT means future Tier variants land in one place.
132        let resolved_tier = body.tier.as_deref().and_then(Tier::from_str);
133        let ctx = crate::store::CallerContext::for_agent(&sender);
134        let new_id = match app
135            .store
136            .notify(
137                &ctx,
138                &body.target_agent_id,
139                &body.title,
140                &payload,
141                priority_i32,
142                resolved_tier.as_ref(),
143            )
144            .await
145        {
146            Ok(id) => id,
147            Err(e) => return store_err_to_response(e),
148        };
149        // Re-fetch the just-written inbox memory so we can hand the full
150        // wire-shape (id + metadata + namespace + ts) to the peers via
151        // `broadcast_store_quorum`. The trait `notify()` returns only
152        // the id; the row materialised on disk is what peers need to
153        // mirror so the recipient's `GET /inbox` against any cluster
154        // member returns the same row.
155        let fanout_mem = match app.store.get(&ctx, &new_id).await {
156            Ok(m) => Some(m),
157            Err(e) => {
158                tracing::warn!(
159                    "postgres notify: refetch for fanout failed for {new_id}: {e:?} \
160                     (local commit landed; sync-daemon will catch peers up)"
161                );
162                None
163            }
164        };
165        if let Some(mem) = fanout_mem.as_ref()
166            && let Some(resp) = fanout_or_503(&app, mem).await
167        {
168            return resp;
169        }
170        return (
171            StatusCode::CREATED,
172            Json(json!({
173                "id": new_id,
174                (field_names::TARGET_AGENT_ID): body.target_agent_id,
175                "namespace": crate::inbox_namespace(&body.target_agent_id),
176                (field_names::STORAGE_BACKEND): "postgres",
177            })),
178        )
179            .into_response();
180    }
181
182    let mut params = json!({
183        (field_names::TARGET_AGENT_ID): body.target_agent_id,
184        "title": body.title,
185        "payload": payload,
186    });
187    if let Some(p) = body.priority {
188        params["priority"] = json!(p);
189    }
190    if let Some(t) = body.tier {
191        params["tier"] = json!(t);
192    }
193
194    let lock = app.db.lock().await;
195    let resolved_ttl = lock.2.clone();
196    // Route via the MCP handler so the wire contract stays single-sourced.
197    // `mcp_client = Some(&sender)` makes `resolve_agent_id(None, _)` return
198    // the caller-resolved HTTP id — same effective provenance.
199    let mcp_client = sender.clone();
200    let result = crate::mcp::handle_notify(&lock.0, &params, &resolved_ttl, Some(&mcp_client));
201
202    // v0.6.2 (S32): capture the just-inserted notify row and fan it out to
203    // peers. Without this, alice's notify on node-1 lands in bob's inbox on
204    // node-1 only — when bob polls `/api/v1/inbox` against node-2 he sees
205    // nothing. The HTTP wrapper bypassed the `create_memory` fanout path
206    // that every other `db::insert` write uses, so we wire it here with the
207    // same posture as `fanout_or_503`: on quorum miss return 503; on a
208    // network error, swallow (local commit landed, sync-daemon catches up).
209    let fanout_mem = match &result {
210        Ok(v) => v
211            .get("id")
212            .and_then(|x| x.as_str())
213            .and_then(|id| db::get(&lock.0, id).ok().flatten()),
214        Err(_) => None,
215    };
216    drop(lock);
217
218    match result {
219        Ok(v) => {
220            if let Some(mem) = fanout_mem
221                && let Some(resp) = fanout_or_503(&app, &mem).await
222            {
223                return resp;
224            }
225            (StatusCode::CREATED, Json(v)).into_response()
226        }
227        // Issue #851: `mcp::handle_notify` returns Result<_, String> where
228        // the inner string can include raw rusqlite text from
229        // db::insert(...).map_err(|e| e.to_string()). Sanitize via the
230        // standard bad_request_opaque helper.
231        Err(e) => super::bad_request_opaque("notify handler error", &e),
232    }
233}
234// --- /api/v1/subscriptions (POST / DELETE / GET) ---------------------------
235//
236// Two shapes are supported. The webhook shape from the MCP tool
237// (`{url, events, secret, namespace_filter, agent_filter}`) is the primary
238// contract. Scenario S33 uses a lighter shape (`{agent_id, namespace}`) to
239// express "subscribe this agent to a namespace". We accept both: when a
240// namespace is supplied without a URL we synthesize an internal loopback URL
241// (`http://localhost/_ns/<agent_id>/<namespace>`) that passes SSRF validation
242// and sets `agent_filter`/`namespace_filter` accordingly. This lets S33 round-
243// trip without needing a separate subscriptions table.
244
245#[derive(Deserialize)]
246pub struct SubscribeBody {
247    /// Webhook URL — required for the MCP contract, optional for the S33
248    /// namespace-subscription shape.
249    #[serde(default)]
250    pub url: Option<String>,
251    #[serde(default)]
252    pub events: Option<String>,
253    #[serde(default)]
254    pub secret: Option<String>,
255    #[serde(default)]
256    pub namespace_filter: Option<String>,
257    #[serde(default)]
258    pub agent_filter: Option<String>,
259    /// S33 shape: caller-supplied namespace to track.
260    #[serde(default)]
261    pub namespace: Option<String>,
262    /// Optional explicit subscriber id.
263    #[serde(default)]
264    pub agent_id: Option<String>,
265}
266
267pub async fn subscribe(
268    State(app): State<AppState>,
269    headers: HeaderMap,
270    Json(body): Json<SubscribeBody>,
271) -> impl IntoResponse {
272    // #901 (security-high, 2026-05-19) — sibling of #874. The pre-#901
273    // path trusted body.agent_id as identity, allowing webhook-hijack
274    // by an attacker registering hooks under another agent's name.
275    // Header-only auth now; body.agent_id (if present) must match the
276    // authenticated caller.
277    let caller = match resolve_caller_agent_id(None, &headers, None) {
278        Ok(id) => id,
279        Err(e) => {
280            return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
281        }
282    };
283    if let Some(claimed) = body.agent_id.as_deref()
284        && claimed != caller
285    {
286        return (
287            StatusCode::FORBIDDEN,
288            Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
289        )
290            .into_response();
291    }
292
293    // R3-S1.HMAC (v0.7.0 fix campaign 2026-05-13): refuse to register a
294    // subscription when neither a per-subscription `secret` nor a
295    // server-wide `[hooks.subscription] hmac_secret` is configured.
296    // Previously the dispatch loop silently delivered unsigned bodies
297    // when no key was available (subscriptions.rs:600-606), which
298    // overstates the "HMAC non-optional" guarantee documented for
299    // Bucket-3 receivers. This is a deliberate behaviour break:
300    // operators upgrading from <=v0.6 must either supply a per-sub
301    // secret or configure the process-wide override before
302    // subscribing.
303    if body.secret.as_deref().is_none_or(str::is_empty)
304        && crate::config::active_hooks_hmac_secret().is_none()
305    {
306        return (
307            StatusCode::BAD_REQUEST,
308            Json(json!({
309                "error": "HMAC secret required: configure per-subscription `hmac_secret` or server-wide `[security] hmac_secret`",
310                "hint": "Pass `secret: <value>` in the subscribe request body, OR set [hooks.subscription] hmac_secret in the daemon config. \
311                        Unsigned subscription dispatch was disabled in v0.7.0 (fix campaign R3-S1.HMAC, 2026-05-13)."
312            })),
313        )
314            .into_response();
315    }
316
317    // Rewrite S33's `{agent_id, namespace}` body into the webhook shape.
318    let mut url_was_synthesized = false;
319    // Suppress dead-code lint when sal feature is off (the variable is
320    // only consulted inside the postgres-dispatch branch below).
321    let _ = &url_was_synthesized;
322    let (url, namespace_filter, agent_filter) = if let Some(u) = body.url {
323        (u, body.namespace_filter, body.agent_filter)
324    } else {
325        let Some(ns) = body.namespace.clone() else {
326            return (
327                StatusCode::BAD_REQUEST,
328                Json(json!({"error": "url or namespace is required"})),
329            )
330                .into_response();
331        };
332        // Synthetic loopback URL — never dispatched (the postgres
333        // persistence path doesn't run the webhook loop), serves only
334        // to round-trip the (agent_id, namespace) pair through the
335        // wire shape. We mark it so the SSRF guard can skip the
336        // loopback rejection — H11's allow_loopback_webhooks knob
337        // gates real callers, not internally-synthesized stubs.
338        // The assignment is unused under default features (the reader
339        // is `#[cfg(feature = "sal")]`-gated); allow the unused-assignment
340        // warning specifically.
341        #[allow(unused_assignments)]
342        {
343            url_was_synthesized = true;
344        }
345        let synthetic = format!("http://localhost/_ns/{caller}/{ns}");
346        (
347            synthetic,
348            Some(ns),
349            body.agent_filter.or_else(|| Some(caller.clone())),
350        )
351    };
352
353    let events = body.events.unwrap_or_else(|| "*".to_string());
354
355    // v0.7.0 fold-A2A1.1 (#700, F-A2A1.1) — postgres-backed daemons
356    // persist subscriptions as memories under `_subscriptions/<agent_id>`
357    // AND fan the subscription memory out to peers via the same quorum
358    // contract the sqlite branch uses for `_agents` rows. This is what
359    // makes K7-style cross-namespace event-type registration work on
360    // postgres: a subscriber attached on peer-A becomes immediately
361    // visible on peer-B's `_subscriptions/<aid>` namespace via the
362    // sync_push receiver, so an event dispatched on peer-B matches the
363    // subscription registered on peer-A. Historical replay via
364    // `memory_subscription_replay` then operates on the unified store
365    // — the dispatcher reads the same memory row regardless of which
366    // peer originated the subscription.
367    #[cfg(feature = "sal")]
368    if matches!(app.storage_backend, StorageBackend::Postgres) {
369        // Skip SSRF validation for synthetic loopback stubs — they are
370        // never dispatched on the postgres path. Real caller-supplied
371        // URLs still go through the H11 SSRF guard.
372        if !url_was_synthesized && let Err(e) = crate::subscriptions::validate_url(&url) {
373            return (
374                StatusCode::BAD_REQUEST,
375                Json(json!({"error": e.to_string()})),
376            )
377                .into_response();
378        }
379        let sub_id = uuid::Uuid::new_v4().to_string();
380        let now = Utc::now().to_rfc3339();
381        let ns = caller_subscription_ns(&caller);
382        // #932 (v0.7.0 Track D, 2026-05-20) — persist the SHA-256
383        // hash of the per-subscription secret in the metadata blob
384        // so `dispatch_event_postgres` can resolve it back without
385        // an out-of-band sqlite lookup. The plaintext secret is
386        // NEVER persisted (#-301 contract); only the SHA-256 hash
387        // lands. When the operator skipped `secret` and relies on
388        // the server-wide `[hooks.subscription] hmac_secret`, this
389        // field is omitted and the dispatcher falls back to the
390        // server-wide key per the K7 contract.
391        let secret_hash_for_metadata: Option<String> = body
392            .secret
393            .as_deref()
394            .filter(|s| !s.is_empty())
395            .map(crate::subscriptions::sha256_hex);
396        let metadata = json!({
397            "kind": KIND_SUBSCRIPTION,
398            "agent_id": caller,
399            (field_names::SUBSCRIPTION_ID): sub_id,
400            "url": url,
401            "events": events,
402            (field_names::NAMESPACE_FILTER): namespace_filter,
403            (field_names::AGENT_FILTER): agent_filter,
404            "secret_hash": secret_hash_for_metadata,
405            (field_names::CREATED_BY): caller,
406            (field_names::CREATED_AT): now,
407        });
408        let mem = Memory {
409            id: sub_id.clone(),
410            tier: Tier::Long,
411            namespace: ns,
412            title: format!("subscription:{sub_id}"),
413            content: format!(
414                "subscription for {caller} -> {} (events={events})",
415                namespace_filter.as_deref().unwrap_or("*")
416            ),
417            tags: vec![KIND_SUBSCRIPTION.to_string()],
418            priority: 5,
419            confidence: 1.0,
420            source: "subscribe".to_string(),
421            access_count: 0,
422            created_at: now.clone(),
423            updated_at: now,
424            last_accessed_at: None,
425            expires_at: None,
426            metadata,
427            reflection_depth: 0,
428            memory_kind: crate::models::MemoryKind::Observation,
429            entity_id: None,
430            persona_version: None,
431            citations: Vec::new(),
432            source_uri: None,
433            source_span: None,
434            confidence_source: ConfidenceSource::CallerProvided,
435            confidence_signals: None,
436            confidence_decayed_at: None,
437            version: 1,
438        };
439        let ctx = crate::store::CallerContext::for_agent(&caller);
440        let stored_id = match app.store.store(&ctx, &mem).await {
441            Ok(id) => id,
442            Err(e) => return store_err_to_response(e),
443        };
444        // Fan the freshly-persisted subscription memory out to peers
445        // using the same quorum-write contract as `_agents` /
446        // `_inbox` rows. On quorum miss return 503; on a network
447        // error, swallow (local commit landed). Mirrors the sqlite
448        // branch's `fanout_or_503` call below.
449        if let Some(resp) = fanout_or_503(&app, &mem).await {
450            return resp;
451        }
452        return (
453            StatusCode::CREATED,
454            Json(json!({
455                "id": stored_id,
456                "url": url,
457                "events": events,
458                "namespace": namespace_filter,
459                (field_names::NAMESPACE_FILTER): namespace_filter,
460                (field_names::AGENT_FILTER): agent_filter,
461                "agent_id": caller,
462                (field_names::CREATED_BY): caller,
463                (field_names::STORAGE_BACKEND): "postgres",
464            })),
465        )
466            .into_response();
467    }
468
469    // Ensure the caller is a registered agent (the MCP tool enforces this).
470    // Auto-register for the S33 shape so scenario callers don't have to
471    // pre-call /agents themselves — same auto-create pattern used elsewhere
472    // for the HTTP surface.
473    let lock = app.db.lock().await;
474    let already = db::list_agents(&lock.0)
475        .ok()
476        .is_some_and(|a| a.iter().any(|x| x.agent_id == caller));
477    if !already {
478        let _ = db::register_agent(&lock.0, &caller, "ai:generic", &[]);
479    }
480    // Inline subscribe path — we cannot delegate to `mcp::handle_subscribe`
481    // here because that helper re-resolves the caller via
482    // `resolve_agent_id(None, Some(mcp_client))`, which synthesizes a
483    // `ai:<client>@<host>:pid-N` id rather than using the HTTP-resolved
484    // `caller` verbatim. An HTTP caller registered under "ai:bob" must be
485    // able to subscribe as "ai:bob", not as "ai:ai:bob@host:pid-N".
486    let sub_result: Result<serde_json::Value, String> = (|| {
487        crate::subscriptions::validate_url(&url).map_err(|e| e.to_string())?;
488        let id = crate::subscriptions::insert(
489            &lock.0,
490            &crate::subscriptions::NewSubscription {
491                url: &url,
492                events: &events,
493                secret: body.secret.as_deref(),
494                namespace_filter: namespace_filter.as_deref(),
495                agent_filter: agent_filter.as_deref(),
496                created_by: Some(&caller),
497                event_types: None,
498            },
499        )
500        .map_err(|e| e.to_string())?;
501        Ok(json!({
502            "id": id,
503            "url": url,
504            "events": events,
505            (field_names::NAMESPACE_FILTER): namespace_filter,
506            (field_names::AGENT_FILTER): agent_filter,
507            (field_names::CREATED_BY): caller,
508        }))
509    })();
510    // Federate the `_agents` write we may have just done so registration is
511    // cluster-wide. (Best-effort — subscriptions themselves live in a
512    // separate table that does not ride `sync_push` today.)
513    let registered_mem = if already {
514        None
515    } else {
516        db::list(
517            &lock.0,
518            Some(crate::models::AGENTS_NAMESPACE),
519            None,
520            crate::storage::LIST_MAX_LIMIT,
521            0,
522            None,
523            None,
524            None,
525            None,
526            None,
527        )
528        .ok()
529        .and_then(|rows| {
530            rows.into_iter()
531                .find(|m| m.title == crate::models::agent_registration_title(&caller))
532        })
533    };
534    drop(lock);
535
536    if let Some(ref mem) = registered_mem
537        && let Some(resp) = fanout_or_503(&app, mem).await
538    {
539        return resp;
540    }
541
542    match sub_result {
543        Ok(mut v) => {
544            // Echo the caller's view of the subscription so S33 can find
545            // {namespace, agent_id} keys in the response without relying on
546            // the synthetic URL.
547            if let Some(obj) = v.as_object_mut() {
548                if let Some(ref ns) = namespace_filter {
549                    obj.insert("namespace".into(), json!(ns));
550                }
551                obj.insert("agent_id".into(), json!(caller));
552            }
553            (StatusCode::CREATED, Json(v)).into_response()
554        }
555        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
556    }
557}
558
559#[derive(Deserialize)]
560pub struct UnsubscribeQuery {
561    #[serde(default)]
562    pub id: Option<String>,
563    /// S33 shape: (`agent_id`, namespace) lookup.
564    #[serde(default)]
565    pub agent_id: Option<String>,
566    #[serde(default)]
567    pub namespace: Option<String>,
568}
569
570pub async fn unsubscribe(
571    State(app): State<AppState>,
572    headers: HeaderMap,
573    Query(q): Query<UnsubscribeQuery>,
574) -> impl IntoResponse {
575    // v0.7.0 Wave-3 Continuation 5 (Bucket B / S33) — postgres-backed
576    // daemons resolve subscriptions through the SAL `_subscriptions/
577    // <agent_id>` namespace mirror that `subscribe` / `list_subscriptions`
578    // write into. Both lookup-by-id and lookup-by-(agent_id, namespace)
579    // resolve through the same memory-row index. Without this branch
580    // the handler reaches into the scratch sqlite db which contains no
581    // subscription rows on a postgres-backed daemon.
582    //
583    // #874 (security-medium, 2026-05-18) — DO NOT pass `q.agent_id` to
584    // `resolve_caller_agent_id` as a trusted-input source. The query
585    // parameter is caller-supplied and bypassable; authentication must
586    // come from the request header (X-Agent-Id) only. The query
587    // `agent_id` then degrades to a filter that must match the
588    // authenticated caller (mismatch = 403).
589    #[cfg(feature = "sal")]
590    if matches!(app.storage_backend, StorageBackend::Postgres) {
591        let caller = match resolve_caller_agent_id(None, &headers, None) {
592            Ok(id) => id,
593            Err(e) => {
594                return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
595            }
596        };
597        if let Some(claimed) = q.agent_id.as_deref()
598            && claimed != caller
599        {
600            return (
601                StatusCode::FORBIDDEN,
602                Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
603            )
604                .into_response();
605        }
606        let ctx = crate::store::CallerContext::for_agent(&caller);
607
608        // Lookup the subscription memory-id via the persistent index.
609        let target_id: Option<String> = if let Some(id) = q.id.clone() {
610            Some(id)
611        } else {
612            let Some(ns) = q.namespace.clone() else {
613                return (
614                    StatusCode::BAD_REQUEST,
615                    Json(json!({"error": "id or (agent_id, namespace) required"})),
616                )
617                    .into_response();
618            };
619            let sub_ns = caller_subscription_ns(&caller);
620            let filter = crate::store::Filter {
621                namespace: Some(sub_ns),
622                limit: crate::storage::LIST_MAX_LIMIT,
623                ..Default::default()
624            };
625            match app.store.list(&ctx, &filter).await {
626                Ok(rows) => rows
627                    .into_iter()
628                    .find(|m| {
629                        m.metadata
630                            .get(field_names::NAMESPACE_FILTER)
631                            .and_then(|v| v.as_str())
632                            == Some(ns.as_str())
633                    })
634                    .map(|m| {
635                        m.metadata
636                            .get(field_names::SUBSCRIPTION_ID)
637                            .and_then(|v| v.as_str())
638                            .map(str::to_string)
639                            .unwrap_or(m.id)
640                    }),
641                Err(e) => return store_err_to_response(e),
642            }
643        };
644        return match target_id {
645            Some(id) => match app.store.delete(&ctx, &id).await {
646                Ok(()) => (
647                    StatusCode::OK,
648                    Json(json!({"id": id, "removed": true, (field_names::STORAGE_BACKEND): "postgres"})),
649                )
650                    .into_response(),
651                Err(crate::store::StoreError::NotFound { .. }) => (
652                    StatusCode::OK,
653                    Json(json!({"id": id, "removed": false, (field_names::STORAGE_BACKEND): "postgres"})),
654                )
655                    .into_response(),
656                Err(e) => store_err_to_response(e),
657            },
658            None => (
659                StatusCode::OK,
660                Json(json!({
661                    "id": "",
662                    "removed": false,
663                    (field_names::STORAGE_BACKEND): "postgres",
664                })),
665            )
666                .into_response(),
667        };
668    }
669
670    // #870 / #874 (security-high/medium, 2026-05-18) — authenticate
671    // the caller via header (or body) BEFORE touching the table; never
672    // trust `q.agent_id` as identity. Then scope every DELETE to the
673    // resolved caller so tenant A cannot remove tenant B's hooks.
674    let caller = match resolve_caller_agent_id(None, &headers, None) {
675        Ok(id) => id,
676        Err(e) => {
677            return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
678        }
679    };
680    if let Some(claimed) = q.agent_id.as_deref()
681        && claimed != caller
682    {
683        return (
684            StatusCode::FORBIDDEN,
685            Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
686        )
687            .into_response();
688    }
689
690    // Prefer explicit id. If absent, dispatch by (agent_id, namespace) for
691    // S33 — find the first matching row from list() (already owner-scoped)
692    // and delete it.
693    if let Some(id) = q.id.clone() {
694        let lock = app.db.lock().await;
695        let outcome = crate::subscriptions::delete(&lock.0, &id, Some(&caller));
696        drop(lock);
697        return match outcome {
698            Ok(removed) => {
699                (StatusCode::OK, Json(json!({"id": id, "removed": removed}))).into_response()
700            }
701            Err(e) => {
702                tracing::error!("{}", crate::errors::msg::unsubscribe(&e));
703                (
704                    StatusCode::INTERNAL_SERVER_ERROR,
705                    Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
706                )
707                    .into_response()
708            }
709        };
710    }
711
712    let Some(ns) = q.namespace else {
713        return (
714            StatusCode::BAD_REQUEST,
715            Json(json!({"error": "id or (agent_id, namespace) required"})),
716        )
717            .into_response();
718    };
719
720    let lock = app.db.lock().await;
721    // Owner-scoped list — the find() below is now redundant on the
722    // authorization side but still narrows by namespace_filter.
723    //
724    // #869 audit (Category B — safe default): a db substrate failure
725    // on the list query collapses to an empty `Vec`, so the
726    // subsequent `target` lookup is `None` and the handler returns
727    // 404 instead of leaking the substrate error — same posture the
728    // sanitised 4xx path uses elsewhere in this module.
729    let subs = crate::subscriptions::list(&lock.0, Some(&caller)).unwrap_or_default();
730    let target = subs
731        .into_iter()
732        .find(|s| s.namespace_filter.as_deref() == Some(ns.as_str()));
733    let outcome = match target {
734        Some(s) => crate::subscriptions::delete(&lock.0, &s.id, Some(&caller)).map(|r| (s.id, r)),
735        None => Ok((String::new(), false)),
736    };
737    drop(lock);
738    match outcome {
739        Ok((id, removed)) => {
740            (StatusCode::OK, Json(json!({"id": id, "removed": removed}))).into_response()
741        }
742        Err(e) => {
743            tracing::error!("{}", crate::errors::msg::unsubscribe(&e));
744            (
745                StatusCode::INTERNAL_SERVER_ERROR,
746                Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
747            )
748                .into_response()
749        }
750    }
751}
752
753#[derive(Deserialize)]
754pub struct ListSubscriptionsQuery {
755    #[serde(default)]
756    pub agent_id: Option<String>,
757}
758
759pub async fn list_subscriptions(
760    State(app): State<AppState>,
761    headers: HeaderMap,
762    Query(q): Query<ListSubscriptionsQuery>,
763) -> impl IntoResponse {
764    // #872 / #874 (security-high/medium, 2026-05-18) — authenticate
765    // the caller via X-Agent-Id header (NOT the `?agent_id=` query
766    // string, which is trivially spoofable and was the bypass surface
767    // in #874). The query parameter is degraded to a refinement that
768    // must match the authenticated caller, else 403.
769    let caller = match resolve_caller_agent_id(None, &headers, None) {
770        Ok(id) => id,
771        Err(e) => {
772            return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
773        }
774    };
775    if let Some(claimed) = q.agent_id.as_deref()
776        && claimed != caller
777    {
778        return (
779            StatusCode::FORBIDDEN,
780            Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
781        )
782            .into_response();
783    }
784
785    // v0.7.0 Wave-3 Continuation 4 (Bucket B / S33) — postgres-backed
786    // daemons read subscriptions back from the `_subscriptions/
787    // <agent_id>` namespace via the SAL `list` projection. The
788    // dispatch loop itself is still sqlite-bound; the wire envelope
789    // here lets the cert oracle observe that the subscription
790    // round-trips through the persistent store.
791    //
792    // #872 — always scope to the authenticated caller's namespace; the
793    // pre-fix code walked every namespace under `_subscriptions/` when
794    // no `agent_id` query param was supplied, leaking every tenant's
795    // hooks.
796    #[cfg(feature = "sal-postgres")]
797    if matches!(app.storage_backend, StorageBackend::Postgres) {
798        let ctx = crate::store::CallerContext::for_agent(&caller);
799        let namespaces: Vec<String> = vec![caller_subscription_ns(&caller)];
800        let mut rows: Vec<serde_json::Value> = Vec::new();
801        for ns in namespaces {
802            let filter = crate::store::Filter {
803                namespace: Some(ns),
804                limit: crate::storage::LIST_MAX_LIMIT,
805                ..Default::default()
806            };
807            match app.store.list(&ctx, &filter).await {
808                Ok(memories) => {
809                    for m in memories {
810                        let meta = m.metadata;
811                        if meta.get("kind").and_then(|v| v.as_str()) != Some(KIND_SUBSCRIPTION) {
812                            continue;
813                        }
814                        let sub_id = meta
815                            .get(field_names::SUBSCRIPTION_ID)
816                            .cloned()
817                            .unwrap_or_else(|| serde_json::Value::String(m.id.clone()));
818                        rows.push(json!({
819                            "id": sub_id,
820                            "url": meta.get("url").cloned().unwrap_or(serde_json::Value::Null),
821                            "events": meta.get("events").cloned().unwrap_or(serde_json::Value::Null),
822                            "namespace": meta.get(field_names::NAMESPACE_FILTER).cloned().unwrap_or(serde_json::Value::Null),
823                            (field_names::NAMESPACE_FILTER): meta.get(field_names::NAMESPACE_FILTER).cloned().unwrap_or(serde_json::Value::Null),
824                            (field_names::AGENT_FILTER): meta.get(field_names::AGENT_FILTER).cloned().unwrap_or(serde_json::Value::Null),
825                            "agent_id": meta.get("agent_id").cloned().unwrap_or(serde_json::Value::Null),
826                            (field_names::CREATED_BY): meta.get(field_names::CREATED_BY).cloned().unwrap_or(serde_json::Value::Null),
827                            (field_names::CREATED_AT): meta.get(field_names::CREATED_AT).cloned().unwrap_or(serde_json::Value::Null),
828                            "dispatch_count": 0,
829                            "failure_count": 0,
830                        }));
831                    }
832                }
833                Err(e) => return store_err_to_response(e),
834            }
835        }
836        let count = rows.len();
837        return (
838            StatusCode::OK,
839            Json(json!({
840                "count": count,
841                (field_names::SUBSCRIPTIONS): rows,
842                (field_names::STORAGE_BACKEND): "postgres",
843            })),
844        )
845            .into_response();
846    }
847    let state = app.db.clone();
848    let lock = state.lock().await;
849    // #872 — DB-side ownership scope: only the caller's rows.
850    let subs = match crate::subscriptions::list(&lock.0, Some(&caller)) {
851        Ok(s) => s,
852        Err(e) => {
853            tracing::error!("list_subscriptions: {e}");
854            return (
855                StatusCode::INTERNAL_SERVER_ERROR,
856                Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
857            )
858                .into_response();
859        }
860    };
861    drop(lock);
862    let filtered = subs;
863    // Expose the subscribed namespace as a top-level field per row so S33 can
864    // read `namespace` directly without probing `namespace_filter`.
865    let rows: Vec<serde_json::Value> = filtered
866        .iter()
867        .map(|s| {
868            json!({
869                "id": s.id,
870                "url": s.url,
871                "events": s.events,
872                "namespace": s.namespace_filter,
873                (field_names::NAMESPACE_FILTER): s.namespace_filter,
874                (field_names::AGENT_FILTER): s.agent_filter,
875                "agent_id": s.agent_filter.clone().or(s.created_by.clone()),
876                (field_names::CREATED_BY): s.created_by,
877                (field_names::CREATED_AT): s.created_at,
878                "dispatch_count": s.dispatch_count,
879                "failure_count": s.failure_count,
880            })
881        })
882        .collect();
883    let count = rows.len();
884    (
885        StatusCode::OK,
886        Json(json!({"count": count, (field_names::SUBSCRIPTIONS): rows})),
887    )
888        .into_response()
889}
890
891/// #932 (v0.7.0 Track D, 2026-05-20) — postgres-backed webhook
892/// dispatch.
893///
894/// The sqlite path runs `subscriptions::dispatch_event_with_details`
895/// which queries the `subscriptions` table via the shared
896/// `Mutex<Connection>`. On postgres-backed daemons that table is
897/// EMPTY — subscriptions land as memory rows in
898/// `_subscriptions/<agent_id>` via the SAL store (see `subscribe`
899/// above). Pre-#932 the postgres `create_memory_postgres` path
900/// invoked no dispatch helper at all, so a subscribe + store
901/// round-trip on postgres fired zero webhooks — vacuously
902/// satisfying the v0.7.0 "HMAC non-optional" contract.
903///
904/// This helper walks `_subscriptions/<*>` rows across every tenant
905/// (using `for_admin`/`bypass_visibility=true` so visibility doesn't
906/// drop cross-tenant subscribers — same as the sqlite dispatch which
907/// passes `None` as the caller_agent_id scope), reshapes each row
908/// into a `Subscription` struct, resolves the secret_hash from the
909/// memory's metadata, and feeds the canonical
910/// `subscriptions::dispatch_event_to_subs` worker pool. Audit
911/// rows (`record_subscription_event` / `record_dispatch` / DLQ)
912/// still write to sqlite via `db_path` because postgres-backed
913/// daemons keep a sqlite scratch DB alongside the SAL store handle.
914///
915/// Fire-and-forget — never panics, errors logged at warn / debug.
916#[cfg(feature = "sal")]
917pub async fn dispatch_event_postgres(
918    app: &AppState,
919    event: &str,
920    memory_id: &str,
921    namespace: &str,
922    agent_id: Option<&str>,
923    details: Option<serde_json::Value>,
924) {
925    // Cross-tenant view: subscription dispatch needs the full subscriber
926    // population, not just the caller's. `for_admin` bypasses the
927    // scope=private filter so a tenant's collective-scope event can fire
928    // every matching subscriber's hook regardless of which tenant
929    // registered it. The cross-tenant authorization gate lives at the
930    // wire surface (subscribe/list/unsubscribe handlers).
931    let ctx =
932        crate::store::CallerContext::for_admin(crate::identity::sentinels::SUBSCRIPTION_DISPATCH);
933
934    // Pull only the subscription mirror rows (`_subscriptions/<agent>`)
935    // via the sargable namespace-prefix scan. `Filter::namespace` is
936    // exact-match, so dispatch historically listed with `namespace=None`
937    // (every row) and filtered to `_subscriptions/` in Rust — a full
938    // table seq-scan on EVERY write that scaled with corpus size. The
939    // prefix scan lets the planner range-scan the `namespace` index
940    // instead, making dispatch O(subscribers) rather than O(corpus).
941    let memories = match app
942        .store
943        .list_by_namespace_prefix(&ctx, SUBSCRIPTION_NS_PREFIX, SUBSCRIPTION_DISPATCH_LIMIT)
944        .await
945    {
946        Ok(rows) => rows,
947        Err(e) => {
948            tracing::warn!(
949                "dispatch_event_postgres: SAL prefix-list failed: {e} — \
950                 no subscribers will fire this tick"
951            );
952            return;
953        }
954    };
955
956    let mut matching: Vec<(crate::subscriptions::Subscription, Option<String>)> = Vec::new();
957    for m in memories {
958        if !m.namespace.starts_with(SUBSCRIPTION_NS_PREFIX) {
959            continue;
960        }
961        let meta = &m.metadata;
962        if meta.get("kind").and_then(|v| v.as_str()) != Some(KIND_SUBSCRIPTION) {
963            continue;
964        }
965        let sub_id = meta
966            .get(field_names::SUBSCRIPTION_ID)
967            .and_then(|v| v.as_str())
968            .map(str::to_string)
969            .unwrap_or_else(|| m.id.clone());
970        let url = match meta.get("url").and_then(|v| v.as_str()) {
971            Some(u) => u.to_string(),
972            None => continue, // malformed row, skip
973        };
974        let events_csv = meta
975            .get("events")
976            .and_then(|v| v.as_str())
977            .unwrap_or("*")
978            .to_string();
979        let namespace_filter = meta
980            .get(field_names::NAMESPACE_FILTER)
981            .and_then(|v| v.as_str())
982            .map(str::to_string);
983        let agent_filter = meta
984            .get(field_names::AGENT_FILTER)
985            .and_then(|v| v.as_str())
986            .map(str::to_string);
987        let created_by = meta
988            .get(field_names::CREATED_BY)
989            .and_then(|v| v.as_str())
990            .map(str::to_string);
991        let created_at = meta
992            .get(field_names::CREATED_AT)
993            .and_then(|v| v.as_str())
994            .unwrap_or("")
995            .to_string();
996        let secret_hash = meta
997            .get("secret_hash")
998            .and_then(|v| v.as_str())
999            .map(str::to_string);
1000
1001        // Apply the canonical filter (same predicate the sqlite path
1002        // uses) so dispatch surface matches across adapters.
1003        if !crate::subscriptions::matches_filters(
1004            &events_csv,
1005            None,
1006            namespace_filter.as_deref(),
1007            agent_filter.as_deref(),
1008            event,
1009            namespace,
1010            agent_id,
1011        ) {
1012            continue;
1013        }
1014
1015        let sub = crate::subscriptions::Subscription {
1016            id: sub_id,
1017            url,
1018            events: events_csv,
1019            namespace_filter,
1020            agent_filter,
1021            created_by,
1022            created_at,
1023            dispatch_count: 0,
1024            failure_count: 0,
1025            event_types: None,
1026        };
1027        matching.push((sub, secret_hash));
1028    }
1029
1030    if matching.is_empty() {
1031        tracing::debug!(
1032            "dispatch_event_postgres: event={event} ns={namespace} \
1033             matched zero subscribers (post-#932 dispatch path)"
1034        );
1035        return;
1036    }
1037    let n_matched = matching.len();
1038    tracing::debug!(
1039        "dispatch_event_postgres: event={event} ns={namespace} \
1040         dispatching to {n_matched} subscriber(s) via SAL"
1041    );
1042
1043    // Resolve the audit sqlite path via the shared db_state. Postgres
1044    // daemons still keep a sqlite scratch DB for federation/governance
1045    // state — audit rows + DLQ + dispatch counters still land there.
1046    let db_path = {
1047        let lock = app.db.lock().await;
1048        lock.1.clone()
1049    };
1050
1051    crate::subscriptions::dispatch_event_to_subs(
1052        matching, event, memory_id, namespace, agent_id, &db_path, details,
1053    );
1054}