ai_memory/handlers/subscriptions.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Notify / subscribe / unsubscribe / list_subscriptions HTTP handlers.
5//!
6//! Extracted from [`super::hook_subscribers`] under issue #650 (handler
7//! cap ≤1200 LOC). Handler bodies are unchanged; only the module surface
8//! moved. Wire compatibility preserved via `pub use subscriptions::*` in
9//! [`super`].
10
11#![allow(clippy::too_many_lines)]
12
13use crate::models::field_names;
14use axum::{
15 Json,
16 extract::{Query, State},
17 http::{HeaderMap, StatusCode},
18 response::IntoResponse,
19};
20use serde::Deserialize;
21use serde_json::json;
22
23use crate::db;
24#[cfg(feature = "sal")]
25use crate::models::{ConfidenceSource, Memory, Tier};
26#[cfg(feature = "sal")]
27use chrono::Utc;
28
29use super::AppState;
30#[cfg(feature = "sal")]
31use super::StorageBackend;
32#[cfg(feature = "sal")]
33use super::store_err_to_response;
34use super::{fanout_or_503, resolve_caller_agent_id};
35
36/// Namespace prefix under which subscriptions are mirrored as memories
37/// (`_subscriptions/<agent_id>`). Used by the postgres dispatch path to
38/// scope the subscriber lookup to a sargable namespace range.
39#[cfg(feature = "sal")]
40const SUBSCRIPTION_NS_PREFIX: &str = "_subscriptions/";
41
42/// Memory `kind` marker for subscription rows (#1558 batch 6).
43#[cfg(feature = "sal")]
44const KIND_SUBSCRIPTION: &str = "subscription";
45
46/// `_subscriptions/<caller>` — the per-caller subscription namespace.
47/// (Single synthesis site; `SUBSCRIPTION_NS_PREFIX` above is the sargable
48/// range form and is `sal`-gated, so the template stays self-contained.)
49#[cfg(feature = "sal")]
50fn caller_subscription_ns(caller: impl std::fmt::Display) -> String {
51 format!("_subscriptions/{caller}")
52}
53
54/// Upper bound on subscription rows pulled per dispatch tick. Matches
55/// the sqlite path's implicit ceiling; production deployments rarely
56/// exceed dozens of subscribers.
57#[cfg(feature = "sal")]
58const SUBSCRIPTION_DISPATCH_LIMIT: usize = 1000;
59
60// --- /api/v1/notify (POST) + /api/v1/inbox (GET) ---------------------------
61
62#[derive(Deserialize)]
63pub struct NotifyBody {
64 pub target_agent_id: String,
65 pub title: String,
66 /// Accept either `payload` (MCP tool name) or `content` (S32 scenario).
67 #[serde(default)]
68 pub payload: Option<String>,
69 #[serde(default)]
70 pub content: Option<String>,
71 #[serde(default)]
72 pub priority: Option<i64>,
73 #[serde(default)]
74 pub tier: Option<String>,
75 /// Optional explicit sender id — falls back to `X-Agent-Id` header.
76 #[serde(default)]
77 pub agent_id: Option<String>,
78}
79
80pub async fn notify(
81 State(app): State<AppState>,
82 headers: HeaderMap,
83 Json(body): Json<NotifyBody>,
84) -> impl IntoResponse {
85 let Some(payload) = body.payload.or(body.content) else {
86 return (
87 StatusCode::BAD_REQUEST,
88 Json(json!({"error": "payload or content is required"})),
89 )
90 .into_response();
91 };
92 // #901 (security-high, 2026-05-19) — sibling of #874. Authenticate
93 // via X-Agent-Id header ONLY; the body-supplied `agent_id` is
94 // caller-controlled and was the cross-tenant spoof vector. The
95 // body value is now a refinement that must MATCH the authenticated
96 // caller, else 403.
97 let sender = match resolve_caller_agent_id(None, &headers, None) {
98 Ok(id) => id,
99 Err(e) => {
100 return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
101 }
102 };
103 if let Some(claimed) = body.agent_id.as_deref()
104 && claimed != sender
105 {
106 return (
107 StatusCode::FORBIDDEN,
108 Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
109 )
110 .into_response();
111 }
112
113 // v0.7.0 fold-A2A1.1 (#700, F-A2A1.1) — postgres-backed daemons
114 // route through the SAL `notify` trait method AND fan the resulting
115 // inbox memory out to peers via the same quorum-write contract the
116 // sqlite branch already uses below. Federation fanout is now backend-
117 // blind: `broadcast_store_quorum` takes a `Memory` + `FederationConfig`
118 // and HTTP-POSTs to each peer's `sync_push` regardless of where the
119 // local row was persisted. Cross-namespace subscription dispatch
120 // is achieved by writing the subscription memory itself through the
121 // shared store (see `subscribe` below) so subscribers on every peer
122 // see the same `_subscriptions/<aid>` namespace.
123 #[cfg(feature = "sal")]
124 if matches!(app.storage_backend, StorageBackend::Postgres) {
125 let priority_i32 = body.priority.and_then(|p| i32::try_from(p).ok());
126 // Canonical wire deserializer for the HTTP `tier` field — the
127 // raw string literals here pair byte-for-byte with
128 // v0.7.0 F-C6 fix (issue #1432): route through the canonical
129 // `Tier::from_str` SSOT at `src/models/memory.rs:395`. The prior
130 // inline parser duplicated the match body; routing through the
131 // const SSOT means future Tier variants land in one place.
132 let resolved_tier = body.tier.as_deref().and_then(Tier::from_str);
133 let ctx = crate::store::CallerContext::for_agent(&sender);
134 let new_id = match app
135 .store
136 .notify(
137 &ctx,
138 &body.target_agent_id,
139 &body.title,
140 &payload,
141 priority_i32,
142 resolved_tier.as_ref(),
143 )
144 .await
145 {
146 Ok(id) => id,
147 Err(e) => return store_err_to_response(e),
148 };
149 // Re-fetch the just-written inbox memory so we can hand the full
150 // wire-shape (id + metadata + namespace + ts) to the peers via
151 // `broadcast_store_quorum`. The trait `notify()` returns only
152 // the id; the row materialised on disk is what peers need to
153 // mirror so the recipient's `GET /inbox` against any cluster
154 // member returns the same row.
155 let fanout_mem = match app.store.get(&ctx, &new_id).await {
156 Ok(m) => Some(m),
157 Err(e) => {
158 tracing::warn!(
159 "postgres notify: refetch for fanout failed for {new_id}: {e:?} \
160 (local commit landed; sync-daemon will catch peers up)"
161 );
162 None
163 }
164 };
165 if let Some(mem) = fanout_mem.as_ref()
166 && let Some(resp) = fanout_or_503(&app, mem).await
167 {
168 return resp;
169 }
170 return (
171 StatusCode::CREATED,
172 Json(json!({
173 "id": new_id,
174 (field_names::TARGET_AGENT_ID): body.target_agent_id,
175 "namespace": crate::inbox_namespace(&body.target_agent_id),
176 (field_names::STORAGE_BACKEND): "postgres",
177 })),
178 )
179 .into_response();
180 }
181
182 let mut params = json!({
183 (field_names::TARGET_AGENT_ID): body.target_agent_id,
184 "title": body.title,
185 "payload": payload,
186 });
187 if let Some(p) = body.priority {
188 params["priority"] = json!(p);
189 }
190 if let Some(t) = body.tier {
191 params["tier"] = json!(t);
192 }
193
194 let lock = app.db.lock().await;
195 let resolved_ttl = lock.2.clone();
196 // Route via the MCP handler so the wire contract stays single-sourced.
197 // `mcp_client = Some(&sender)` makes `resolve_agent_id(None, _)` return
198 // the caller-resolved HTTP id — same effective provenance.
199 let mcp_client = sender.clone();
200 let result = crate::mcp::handle_notify(&lock.0, ¶ms, &resolved_ttl, Some(&mcp_client));
201
202 // v0.6.2 (S32): capture the just-inserted notify row and fan it out to
203 // peers. Without this, alice's notify on node-1 lands in bob's inbox on
204 // node-1 only — when bob polls `/api/v1/inbox` against node-2 he sees
205 // nothing. The HTTP wrapper bypassed the `create_memory` fanout path
206 // that every other `db::insert` write uses, so we wire it here with the
207 // same posture as `fanout_or_503`: on quorum miss return 503; on a
208 // network error, swallow (local commit landed, sync-daemon catches up).
209 let fanout_mem = match &result {
210 Ok(v) => v
211 .get("id")
212 .and_then(|x| x.as_str())
213 .and_then(|id| db::get(&lock.0, id).ok().flatten()),
214 Err(_) => None,
215 };
216 drop(lock);
217
218 match result {
219 Ok(v) => {
220 if let Some(mem) = fanout_mem
221 && let Some(resp) = fanout_or_503(&app, &mem).await
222 {
223 return resp;
224 }
225 (StatusCode::CREATED, Json(v)).into_response()
226 }
227 // Issue #851: `mcp::handle_notify` returns Result<_, String> where
228 // the inner string can include raw rusqlite text from
229 // db::insert(...).map_err(|e| e.to_string()). Sanitize via the
230 // standard bad_request_opaque helper.
231 Err(e) => super::bad_request_opaque("notify handler error", &e),
232 }
233}
234// --- /api/v1/subscriptions (POST / DELETE / GET) ---------------------------
235//
236// Two shapes are supported. The webhook shape from the MCP tool
237// (`{url, events, secret, namespace_filter, agent_filter}`) is the primary
238// contract. Scenario S33 uses a lighter shape (`{agent_id, namespace}`) to
239// express "subscribe this agent to a namespace". We accept both: when a
240// namespace is supplied without a URL we synthesize an internal loopback URL
241// (`http://localhost/_ns/<agent_id>/<namespace>`) that passes SSRF validation
242// and sets `agent_filter`/`namespace_filter` accordingly. This lets S33 round-
243// trip without needing a separate subscriptions table.
244
245#[derive(Deserialize)]
246pub struct SubscribeBody {
247 /// Webhook URL — required for the MCP contract, optional for the S33
248 /// namespace-subscription shape.
249 #[serde(default)]
250 pub url: Option<String>,
251 #[serde(default)]
252 pub events: Option<String>,
253 #[serde(default)]
254 pub secret: Option<String>,
255 #[serde(default)]
256 pub namespace_filter: Option<String>,
257 #[serde(default)]
258 pub agent_filter: Option<String>,
259 /// S33 shape: caller-supplied namespace to track.
260 #[serde(default)]
261 pub namespace: Option<String>,
262 /// Optional explicit subscriber id.
263 #[serde(default)]
264 pub agent_id: Option<String>,
265}
266
267pub async fn subscribe(
268 State(app): State<AppState>,
269 headers: HeaderMap,
270 Json(body): Json<SubscribeBody>,
271) -> impl IntoResponse {
272 // #901 (security-high, 2026-05-19) — sibling of #874. The pre-#901
273 // path trusted body.agent_id as identity, allowing webhook-hijack
274 // by an attacker registering hooks under another agent's name.
275 // Header-only auth now; body.agent_id (if present) must match the
276 // authenticated caller.
277 let caller = match resolve_caller_agent_id(None, &headers, None) {
278 Ok(id) => id,
279 Err(e) => {
280 return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
281 }
282 };
283 if let Some(claimed) = body.agent_id.as_deref()
284 && claimed != caller
285 {
286 return (
287 StatusCode::FORBIDDEN,
288 Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
289 )
290 .into_response();
291 }
292
293 // R3-S1.HMAC (v0.7.0 fix campaign 2026-05-13): refuse to register a
294 // subscription when neither a per-subscription `secret` nor a
295 // server-wide `[hooks.subscription] hmac_secret` is configured.
296 // Previously the dispatch loop silently delivered unsigned bodies
297 // when no key was available (subscriptions.rs:600-606), which
298 // overstates the "HMAC non-optional" guarantee documented for
299 // Bucket-3 receivers. This is a deliberate behaviour break:
300 // operators upgrading from <=v0.6 must either supply a per-sub
301 // secret or configure the process-wide override before
302 // subscribing.
303 if body.secret.as_deref().is_none_or(str::is_empty)
304 && crate::config::active_hooks_hmac_secret().is_none()
305 {
306 return (
307 StatusCode::BAD_REQUEST,
308 Json(json!({
309 "error": "HMAC secret required: configure per-subscription `hmac_secret` or server-wide `[security] hmac_secret`",
310 "hint": "Pass `secret: <value>` in the subscribe request body, OR set [hooks.subscription] hmac_secret in the daemon config. \
311 Unsigned subscription dispatch was disabled in v0.7.0 (fix campaign R3-S1.HMAC, 2026-05-13)."
312 })),
313 )
314 .into_response();
315 }
316
317 // Rewrite S33's `{agent_id, namespace}` body into the webhook shape.
318 let mut url_was_synthesized = false;
319 // Suppress dead-code lint when sal feature is off (the variable is
320 // only consulted inside the postgres-dispatch branch below).
321 let _ = &url_was_synthesized;
322 let (url, namespace_filter, agent_filter) = if let Some(u) = body.url {
323 (u, body.namespace_filter, body.agent_filter)
324 } else {
325 let Some(ns) = body.namespace.clone() else {
326 return (
327 StatusCode::BAD_REQUEST,
328 Json(json!({"error": "url or namespace is required"})),
329 )
330 .into_response();
331 };
332 // Synthetic loopback URL — never dispatched (the postgres
333 // persistence path doesn't run the webhook loop), serves only
334 // to round-trip the (agent_id, namespace) pair through the
335 // wire shape. We mark it so the SSRF guard can skip the
336 // loopback rejection — H11's allow_loopback_webhooks knob
337 // gates real callers, not internally-synthesized stubs.
338 // The assignment is unused under default features (the reader
339 // is `#[cfg(feature = "sal")]`-gated); allow the unused-assignment
340 // warning specifically.
341 #[allow(unused_assignments)]
342 {
343 url_was_synthesized = true;
344 }
345 let synthetic = format!("http://localhost/_ns/{caller}/{ns}");
346 (
347 synthetic,
348 Some(ns),
349 body.agent_filter.or_else(|| Some(caller.clone())),
350 )
351 };
352
353 let events = body.events.unwrap_or_else(|| "*".to_string());
354
355 // v0.7.0 fold-A2A1.1 (#700, F-A2A1.1) — postgres-backed daemons
356 // persist subscriptions as memories under `_subscriptions/<agent_id>`
357 // AND fan the subscription memory out to peers via the same quorum
358 // contract the sqlite branch uses for `_agents` rows. This is what
359 // makes K7-style cross-namespace event-type registration work on
360 // postgres: a subscriber attached on peer-A becomes immediately
361 // visible on peer-B's `_subscriptions/<aid>` namespace via the
362 // sync_push receiver, so an event dispatched on peer-B matches the
363 // subscription registered on peer-A. Historical replay via
364 // `memory_subscription_replay` then operates on the unified store
365 // — the dispatcher reads the same memory row regardless of which
366 // peer originated the subscription.
367 #[cfg(feature = "sal")]
368 if matches!(app.storage_backend, StorageBackend::Postgres) {
369 // Skip SSRF validation for synthetic loopback stubs — they are
370 // never dispatched on the postgres path. Real caller-supplied
371 // URLs still go through the H11 SSRF guard.
372 if !url_was_synthesized && let Err(e) = crate::subscriptions::validate_url(&url) {
373 return (
374 StatusCode::BAD_REQUEST,
375 Json(json!({"error": e.to_string()})),
376 )
377 .into_response();
378 }
379 let sub_id = uuid::Uuid::new_v4().to_string();
380 let now = Utc::now().to_rfc3339();
381 let ns = caller_subscription_ns(&caller);
382 // #932 (v0.7.0 Track D, 2026-05-20) — persist the SHA-256
383 // hash of the per-subscription secret in the metadata blob
384 // so `dispatch_event_postgres` can resolve it back without
385 // an out-of-band sqlite lookup. The plaintext secret is
386 // NEVER persisted (#-301 contract); only the SHA-256 hash
387 // lands. When the operator skipped `secret` and relies on
388 // the server-wide `[hooks.subscription] hmac_secret`, this
389 // field is omitted and the dispatcher falls back to the
390 // server-wide key per the K7 contract.
391 let secret_hash_for_metadata: Option<String> = body
392 .secret
393 .as_deref()
394 .filter(|s| !s.is_empty())
395 .map(crate::subscriptions::sha256_hex);
396 let metadata = json!({
397 "kind": KIND_SUBSCRIPTION,
398 "agent_id": caller,
399 (field_names::SUBSCRIPTION_ID): sub_id,
400 "url": url,
401 "events": events,
402 (field_names::NAMESPACE_FILTER): namespace_filter,
403 (field_names::AGENT_FILTER): agent_filter,
404 "secret_hash": secret_hash_for_metadata,
405 (field_names::CREATED_BY): caller,
406 (field_names::CREATED_AT): now,
407 });
408 let mem = Memory {
409 id: sub_id.clone(),
410 tier: Tier::Long,
411 namespace: ns,
412 title: format!("subscription:{sub_id}"),
413 content: format!(
414 "subscription for {caller} -> {} (events={events})",
415 namespace_filter.as_deref().unwrap_or("*")
416 ),
417 tags: vec![KIND_SUBSCRIPTION.to_string()],
418 priority: 5,
419 confidence: 1.0,
420 source: "subscribe".to_string(),
421 access_count: 0,
422 created_at: now.clone(),
423 updated_at: now,
424 last_accessed_at: None,
425 expires_at: None,
426 metadata,
427 reflection_depth: 0,
428 memory_kind: crate::models::MemoryKind::Observation,
429 entity_id: None,
430 persona_version: None,
431 citations: Vec::new(),
432 source_uri: None,
433 source_span: None,
434 confidence_source: ConfidenceSource::CallerProvided,
435 confidence_signals: None,
436 confidence_decayed_at: None,
437 version: 1,
438 };
439 let ctx = crate::store::CallerContext::for_agent(&caller);
440 let stored_id = match app.store.store(&ctx, &mem).await {
441 Ok(id) => id,
442 Err(e) => return store_err_to_response(e),
443 };
444 // Fan the freshly-persisted subscription memory out to peers
445 // using the same quorum-write contract as `_agents` /
446 // `_inbox` rows. On quorum miss return 503; on a network
447 // error, swallow (local commit landed). Mirrors the sqlite
448 // branch's `fanout_or_503` call below.
449 if let Some(resp) = fanout_or_503(&app, &mem).await {
450 return resp;
451 }
452 return (
453 StatusCode::CREATED,
454 Json(json!({
455 "id": stored_id,
456 "url": url,
457 "events": events,
458 "namespace": namespace_filter,
459 (field_names::NAMESPACE_FILTER): namespace_filter,
460 (field_names::AGENT_FILTER): agent_filter,
461 "agent_id": caller,
462 (field_names::CREATED_BY): caller,
463 (field_names::STORAGE_BACKEND): "postgres",
464 })),
465 )
466 .into_response();
467 }
468
469 // Ensure the caller is a registered agent (the MCP tool enforces this).
470 // Auto-register for the S33 shape so scenario callers don't have to
471 // pre-call /agents themselves — same auto-create pattern used elsewhere
472 // for the HTTP surface.
473 let lock = app.db.lock().await;
474 let already = db::list_agents(&lock.0)
475 .ok()
476 .is_some_and(|a| a.iter().any(|x| x.agent_id == caller));
477 if !already {
478 let _ = db::register_agent(&lock.0, &caller, "ai:generic", &[]);
479 }
480 // Inline subscribe path — we cannot delegate to `mcp::handle_subscribe`
481 // here because that helper re-resolves the caller via
482 // `resolve_agent_id(None, Some(mcp_client))`, which synthesizes a
483 // `ai:<client>@<host>:pid-N` id rather than using the HTTP-resolved
484 // `caller` verbatim. An HTTP caller registered under "ai:bob" must be
485 // able to subscribe as "ai:bob", not as "ai:ai:bob@host:pid-N".
486 let sub_result: Result<serde_json::Value, String> = (|| {
487 crate::subscriptions::validate_url(&url).map_err(|e| e.to_string())?;
488 let id = crate::subscriptions::insert(
489 &lock.0,
490 &crate::subscriptions::NewSubscription {
491 url: &url,
492 events: &events,
493 secret: body.secret.as_deref(),
494 namespace_filter: namespace_filter.as_deref(),
495 agent_filter: agent_filter.as_deref(),
496 created_by: Some(&caller),
497 event_types: None,
498 },
499 )
500 .map_err(|e| e.to_string())?;
501 Ok(json!({
502 "id": id,
503 "url": url,
504 "events": events,
505 (field_names::NAMESPACE_FILTER): namespace_filter,
506 (field_names::AGENT_FILTER): agent_filter,
507 (field_names::CREATED_BY): caller,
508 }))
509 })();
510 // Federate the `_agents` write we may have just done so registration is
511 // cluster-wide. (Best-effort — subscriptions themselves live in a
512 // separate table that does not ride `sync_push` today.)
513 let registered_mem = if already {
514 None
515 } else {
516 db::list(
517 &lock.0,
518 Some(crate::models::AGENTS_NAMESPACE),
519 None,
520 crate::storage::LIST_MAX_LIMIT,
521 0,
522 None,
523 None,
524 None,
525 None,
526 None,
527 )
528 .ok()
529 .and_then(|rows| {
530 rows.into_iter()
531 .find(|m| m.title == crate::models::agent_registration_title(&caller))
532 })
533 };
534 drop(lock);
535
536 if let Some(ref mem) = registered_mem
537 && let Some(resp) = fanout_or_503(&app, mem).await
538 {
539 return resp;
540 }
541
542 match sub_result {
543 Ok(mut v) => {
544 // Echo the caller's view of the subscription so S33 can find
545 // {namespace, agent_id} keys in the response without relying on
546 // the synthetic URL.
547 if let Some(obj) = v.as_object_mut() {
548 if let Some(ref ns) = namespace_filter {
549 obj.insert("namespace".into(), json!(ns));
550 }
551 obj.insert("agent_id".into(), json!(caller));
552 }
553 (StatusCode::CREATED, Json(v)).into_response()
554 }
555 Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
556 }
557}
558
559#[derive(Deserialize)]
560pub struct UnsubscribeQuery {
561 #[serde(default)]
562 pub id: Option<String>,
563 /// S33 shape: (`agent_id`, namespace) lookup.
564 #[serde(default)]
565 pub agent_id: Option<String>,
566 #[serde(default)]
567 pub namespace: Option<String>,
568}
569
570pub async fn unsubscribe(
571 State(app): State<AppState>,
572 headers: HeaderMap,
573 Query(q): Query<UnsubscribeQuery>,
574) -> impl IntoResponse {
575 // v0.7.0 Wave-3 Continuation 5 (Bucket B / S33) — postgres-backed
576 // daemons resolve subscriptions through the SAL `_subscriptions/
577 // <agent_id>` namespace mirror that `subscribe` / `list_subscriptions`
578 // write into. Both lookup-by-id and lookup-by-(agent_id, namespace)
579 // resolve through the same memory-row index. Without this branch
580 // the handler reaches into the scratch sqlite db which contains no
581 // subscription rows on a postgres-backed daemon.
582 //
583 // #874 (security-medium, 2026-05-18) — DO NOT pass `q.agent_id` to
584 // `resolve_caller_agent_id` as a trusted-input source. The query
585 // parameter is caller-supplied and bypassable; authentication must
586 // come from the request header (X-Agent-Id) only. The query
587 // `agent_id` then degrades to a filter that must match the
588 // authenticated caller (mismatch = 403).
589 #[cfg(feature = "sal")]
590 if matches!(app.storage_backend, StorageBackend::Postgres) {
591 let caller = match resolve_caller_agent_id(None, &headers, None) {
592 Ok(id) => id,
593 Err(e) => {
594 return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
595 }
596 };
597 if let Some(claimed) = q.agent_id.as_deref()
598 && claimed != caller
599 {
600 return (
601 StatusCode::FORBIDDEN,
602 Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
603 )
604 .into_response();
605 }
606 let ctx = crate::store::CallerContext::for_agent(&caller);
607
608 // Lookup the subscription memory-id via the persistent index.
609 let target_id: Option<String> = if let Some(id) = q.id.clone() {
610 Some(id)
611 } else {
612 let Some(ns) = q.namespace.clone() else {
613 return (
614 StatusCode::BAD_REQUEST,
615 Json(json!({"error": "id or (agent_id, namespace) required"})),
616 )
617 .into_response();
618 };
619 let sub_ns = caller_subscription_ns(&caller);
620 let filter = crate::store::Filter {
621 namespace: Some(sub_ns),
622 limit: crate::storage::LIST_MAX_LIMIT,
623 ..Default::default()
624 };
625 match app.store.list(&ctx, &filter).await {
626 Ok(rows) => rows
627 .into_iter()
628 .find(|m| {
629 m.metadata
630 .get(field_names::NAMESPACE_FILTER)
631 .and_then(|v| v.as_str())
632 == Some(ns.as_str())
633 })
634 .map(|m| {
635 m.metadata
636 .get(field_names::SUBSCRIPTION_ID)
637 .and_then(|v| v.as_str())
638 .map(str::to_string)
639 .unwrap_or(m.id)
640 }),
641 Err(e) => return store_err_to_response(e),
642 }
643 };
644 return match target_id {
645 Some(id) => match app.store.delete(&ctx, &id).await {
646 Ok(()) => (
647 StatusCode::OK,
648 Json(json!({"id": id, "removed": true, (field_names::STORAGE_BACKEND): "postgres"})),
649 )
650 .into_response(),
651 Err(crate::store::StoreError::NotFound { .. }) => (
652 StatusCode::OK,
653 Json(json!({"id": id, "removed": false, (field_names::STORAGE_BACKEND): "postgres"})),
654 )
655 .into_response(),
656 Err(e) => store_err_to_response(e),
657 },
658 None => (
659 StatusCode::OK,
660 Json(json!({
661 "id": "",
662 "removed": false,
663 (field_names::STORAGE_BACKEND): "postgres",
664 })),
665 )
666 .into_response(),
667 };
668 }
669
670 // #870 / #874 (security-high/medium, 2026-05-18) — authenticate
671 // the caller via header (or body) BEFORE touching the table; never
672 // trust `q.agent_id` as identity. Then scope every DELETE to the
673 // resolved caller so tenant A cannot remove tenant B's hooks.
674 let caller = match resolve_caller_agent_id(None, &headers, None) {
675 Ok(id) => id,
676 Err(e) => {
677 return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
678 }
679 };
680 if let Some(claimed) = q.agent_id.as_deref()
681 && claimed != caller
682 {
683 return (
684 StatusCode::FORBIDDEN,
685 Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
686 )
687 .into_response();
688 }
689
690 // Prefer explicit id. If absent, dispatch by (agent_id, namespace) for
691 // S33 — find the first matching row from list() (already owner-scoped)
692 // and delete it.
693 if let Some(id) = q.id.clone() {
694 let lock = app.db.lock().await;
695 let outcome = crate::subscriptions::delete(&lock.0, &id, Some(&caller));
696 drop(lock);
697 return match outcome {
698 Ok(removed) => {
699 (StatusCode::OK, Json(json!({"id": id, "removed": removed}))).into_response()
700 }
701 Err(e) => {
702 tracing::error!("{}", crate::errors::msg::unsubscribe(&e));
703 (
704 StatusCode::INTERNAL_SERVER_ERROR,
705 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
706 )
707 .into_response()
708 }
709 };
710 }
711
712 let Some(ns) = q.namespace else {
713 return (
714 StatusCode::BAD_REQUEST,
715 Json(json!({"error": "id or (agent_id, namespace) required"})),
716 )
717 .into_response();
718 };
719
720 let lock = app.db.lock().await;
721 // Owner-scoped list — the find() below is now redundant on the
722 // authorization side but still narrows by namespace_filter.
723 //
724 // #869 audit (Category B — safe default): a db substrate failure
725 // on the list query collapses to an empty `Vec`, so the
726 // subsequent `target` lookup is `None` and the handler returns
727 // 404 instead of leaking the substrate error — same posture the
728 // sanitised 4xx path uses elsewhere in this module.
729 let subs = crate::subscriptions::list(&lock.0, Some(&caller)).unwrap_or_default();
730 let target = subs
731 .into_iter()
732 .find(|s| s.namespace_filter.as_deref() == Some(ns.as_str()));
733 let outcome = match target {
734 Some(s) => crate::subscriptions::delete(&lock.0, &s.id, Some(&caller)).map(|r| (s.id, r)),
735 None => Ok((String::new(), false)),
736 };
737 drop(lock);
738 match outcome {
739 Ok((id, removed)) => {
740 (StatusCode::OK, Json(json!({"id": id, "removed": removed}))).into_response()
741 }
742 Err(e) => {
743 tracing::error!("{}", crate::errors::msg::unsubscribe(&e));
744 (
745 StatusCode::INTERNAL_SERVER_ERROR,
746 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
747 )
748 .into_response()
749 }
750 }
751}
752
753#[derive(Deserialize)]
754pub struct ListSubscriptionsQuery {
755 #[serde(default)]
756 pub agent_id: Option<String>,
757}
758
759pub async fn list_subscriptions(
760 State(app): State<AppState>,
761 headers: HeaderMap,
762 Query(q): Query<ListSubscriptionsQuery>,
763) -> impl IntoResponse {
764 // #872 / #874 (security-high/medium, 2026-05-18) — authenticate
765 // the caller via X-Agent-Id header (NOT the `?agent_id=` query
766 // string, which is trivially spoofable and was the bypass surface
767 // in #874). The query parameter is degraded to a refinement that
768 // must match the authenticated caller, else 403.
769 let caller = match resolve_caller_agent_id(None, &headers, None) {
770 Ok(id) => id,
771 Err(e) => {
772 return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
773 }
774 };
775 if let Some(claimed) = q.agent_id.as_deref()
776 && claimed != caller
777 {
778 return (
779 StatusCode::FORBIDDEN,
780 Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
781 )
782 .into_response();
783 }
784
785 // v0.7.0 Wave-3 Continuation 4 (Bucket B / S33) — postgres-backed
786 // daemons read subscriptions back from the `_subscriptions/
787 // <agent_id>` namespace via the SAL `list` projection. The
788 // dispatch loop itself is still sqlite-bound; the wire envelope
789 // here lets the cert oracle observe that the subscription
790 // round-trips through the persistent store.
791 //
792 // #872 — always scope to the authenticated caller's namespace; the
793 // pre-fix code walked every namespace under `_subscriptions/` when
794 // no `agent_id` query param was supplied, leaking every tenant's
795 // hooks.
796 #[cfg(feature = "sal-postgres")]
797 if matches!(app.storage_backend, StorageBackend::Postgres) {
798 let ctx = crate::store::CallerContext::for_agent(&caller);
799 let namespaces: Vec<String> = vec![caller_subscription_ns(&caller)];
800 let mut rows: Vec<serde_json::Value> = Vec::new();
801 for ns in namespaces {
802 let filter = crate::store::Filter {
803 namespace: Some(ns),
804 limit: crate::storage::LIST_MAX_LIMIT,
805 ..Default::default()
806 };
807 match app.store.list(&ctx, &filter).await {
808 Ok(memories) => {
809 for m in memories {
810 let meta = m.metadata;
811 if meta.get("kind").and_then(|v| v.as_str()) != Some(KIND_SUBSCRIPTION) {
812 continue;
813 }
814 let sub_id = meta
815 .get(field_names::SUBSCRIPTION_ID)
816 .cloned()
817 .unwrap_or_else(|| serde_json::Value::String(m.id.clone()));
818 rows.push(json!({
819 "id": sub_id,
820 "url": meta.get("url").cloned().unwrap_or(serde_json::Value::Null),
821 "events": meta.get("events").cloned().unwrap_or(serde_json::Value::Null),
822 "namespace": meta.get(field_names::NAMESPACE_FILTER).cloned().unwrap_or(serde_json::Value::Null),
823 (field_names::NAMESPACE_FILTER): meta.get(field_names::NAMESPACE_FILTER).cloned().unwrap_or(serde_json::Value::Null),
824 (field_names::AGENT_FILTER): meta.get(field_names::AGENT_FILTER).cloned().unwrap_or(serde_json::Value::Null),
825 "agent_id": meta.get("agent_id").cloned().unwrap_or(serde_json::Value::Null),
826 (field_names::CREATED_BY): meta.get(field_names::CREATED_BY).cloned().unwrap_or(serde_json::Value::Null),
827 (field_names::CREATED_AT): meta.get(field_names::CREATED_AT).cloned().unwrap_or(serde_json::Value::Null),
828 "dispatch_count": 0,
829 "failure_count": 0,
830 }));
831 }
832 }
833 Err(e) => return store_err_to_response(e),
834 }
835 }
836 let count = rows.len();
837 return (
838 StatusCode::OK,
839 Json(json!({
840 "count": count,
841 (field_names::SUBSCRIPTIONS): rows,
842 (field_names::STORAGE_BACKEND): "postgres",
843 })),
844 )
845 .into_response();
846 }
847 let state = app.db.clone();
848 let lock = state.lock().await;
849 // #872 — DB-side ownership scope: only the caller's rows.
850 let subs = match crate::subscriptions::list(&lock.0, Some(&caller)) {
851 Ok(s) => s,
852 Err(e) => {
853 tracing::error!("list_subscriptions: {e}");
854 return (
855 StatusCode::INTERNAL_SERVER_ERROR,
856 Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
857 )
858 .into_response();
859 }
860 };
861 drop(lock);
862 let filtered = subs;
863 // Expose the subscribed namespace as a top-level field per row so S33 can
864 // read `namespace` directly without probing `namespace_filter`.
865 let rows: Vec<serde_json::Value> = filtered
866 .iter()
867 .map(|s| {
868 json!({
869 "id": s.id,
870 "url": s.url,
871 "events": s.events,
872 "namespace": s.namespace_filter,
873 (field_names::NAMESPACE_FILTER): s.namespace_filter,
874 (field_names::AGENT_FILTER): s.agent_filter,
875 "agent_id": s.agent_filter.clone().or(s.created_by.clone()),
876 (field_names::CREATED_BY): s.created_by,
877 (field_names::CREATED_AT): s.created_at,
878 "dispatch_count": s.dispatch_count,
879 "failure_count": s.failure_count,
880 })
881 })
882 .collect();
883 let count = rows.len();
884 (
885 StatusCode::OK,
886 Json(json!({"count": count, (field_names::SUBSCRIPTIONS): rows})),
887 )
888 .into_response()
889}
890
891/// #932 (v0.7.0 Track D, 2026-05-20) — postgres-backed webhook
892/// dispatch.
893///
894/// The sqlite path runs `subscriptions::dispatch_event_with_details`
895/// which queries the `subscriptions` table via the shared
896/// `Mutex<Connection>`. On postgres-backed daemons that table is
897/// EMPTY — subscriptions land as memory rows in
898/// `_subscriptions/<agent_id>` via the SAL store (see `subscribe`
899/// above). Pre-#932 the postgres `create_memory_postgres` path
900/// invoked no dispatch helper at all, so a subscribe + store
901/// round-trip on postgres fired zero webhooks — vacuously
902/// satisfying the v0.7.0 "HMAC non-optional" contract.
903///
904/// This helper walks `_subscriptions/<*>` rows across every tenant
905/// (using `for_admin`/`bypass_visibility=true` so visibility doesn't
906/// drop cross-tenant subscribers — same as the sqlite dispatch which
907/// passes `None` as the caller_agent_id scope), reshapes each row
908/// into a `Subscription` struct, resolves the secret_hash from the
909/// memory's metadata, and feeds the canonical
910/// `subscriptions::dispatch_event_to_subs` worker pool. Audit
911/// rows (`record_subscription_event` / `record_dispatch` / DLQ)
912/// still write to sqlite via `db_path` because postgres-backed
913/// daemons keep a sqlite scratch DB alongside the SAL store handle.
914///
915/// Fire-and-forget — never panics, errors logged at warn / debug.
916#[cfg(feature = "sal")]
917pub async fn dispatch_event_postgres(
918 app: &AppState,
919 event: &str,
920 memory_id: &str,
921 namespace: &str,
922 agent_id: Option<&str>,
923 details: Option<serde_json::Value>,
924) {
925 // Cross-tenant view: subscription dispatch needs the full subscriber
926 // population, not just the caller's. `for_admin` bypasses the
927 // scope=private filter so a tenant's collective-scope event can fire
928 // every matching subscriber's hook regardless of which tenant
929 // registered it. The cross-tenant authorization gate lives at the
930 // wire surface (subscribe/list/unsubscribe handlers).
931 let ctx =
932 crate::store::CallerContext::for_admin(crate::identity::sentinels::SUBSCRIPTION_DISPATCH);
933
934 // Pull only the subscription mirror rows (`_subscriptions/<agent>`)
935 // via the sargable namespace-prefix scan. `Filter::namespace` is
936 // exact-match, so dispatch historically listed with `namespace=None`
937 // (every row) and filtered to `_subscriptions/` in Rust — a full
938 // table seq-scan on EVERY write that scaled with corpus size. The
939 // prefix scan lets the planner range-scan the `namespace` index
940 // instead, making dispatch O(subscribers) rather than O(corpus).
941 let memories = match app
942 .store
943 .list_by_namespace_prefix(&ctx, SUBSCRIPTION_NS_PREFIX, SUBSCRIPTION_DISPATCH_LIMIT)
944 .await
945 {
946 Ok(rows) => rows,
947 Err(e) => {
948 tracing::warn!(
949 "dispatch_event_postgres: SAL prefix-list failed: {e} — \
950 no subscribers will fire this tick"
951 );
952 return;
953 }
954 };
955
956 let mut matching: Vec<(crate::subscriptions::Subscription, Option<String>)> = Vec::new();
957 for m in memories {
958 if !m.namespace.starts_with(SUBSCRIPTION_NS_PREFIX) {
959 continue;
960 }
961 let meta = &m.metadata;
962 if meta.get("kind").and_then(|v| v.as_str()) != Some(KIND_SUBSCRIPTION) {
963 continue;
964 }
965 let sub_id = meta
966 .get(field_names::SUBSCRIPTION_ID)
967 .and_then(|v| v.as_str())
968 .map(str::to_string)
969 .unwrap_or_else(|| m.id.clone());
970 let url = match meta.get("url").and_then(|v| v.as_str()) {
971 Some(u) => u.to_string(),
972 None => continue, // malformed row, skip
973 };
974 let events_csv = meta
975 .get("events")
976 .and_then(|v| v.as_str())
977 .unwrap_or("*")
978 .to_string();
979 let namespace_filter = meta
980 .get(field_names::NAMESPACE_FILTER)
981 .and_then(|v| v.as_str())
982 .map(str::to_string);
983 let agent_filter = meta
984 .get(field_names::AGENT_FILTER)
985 .and_then(|v| v.as_str())
986 .map(str::to_string);
987 let created_by = meta
988 .get(field_names::CREATED_BY)
989 .and_then(|v| v.as_str())
990 .map(str::to_string);
991 let created_at = meta
992 .get(field_names::CREATED_AT)
993 .and_then(|v| v.as_str())
994 .unwrap_or("")
995 .to_string();
996 let secret_hash = meta
997 .get("secret_hash")
998 .and_then(|v| v.as_str())
999 .map(str::to_string);
1000
1001 // Apply the canonical filter (same predicate the sqlite path
1002 // uses) so dispatch surface matches across adapters.
1003 if !crate::subscriptions::matches_filters(
1004 &events_csv,
1005 None,
1006 namespace_filter.as_deref(),
1007 agent_filter.as_deref(),
1008 event,
1009 namespace,
1010 agent_id,
1011 ) {
1012 continue;
1013 }
1014
1015 let sub = crate::subscriptions::Subscription {
1016 id: sub_id,
1017 url,
1018 events: events_csv,
1019 namespace_filter,
1020 agent_filter,
1021 created_by,
1022 created_at,
1023 dispatch_count: 0,
1024 failure_count: 0,
1025 event_types: None,
1026 };
1027 matching.push((sub, secret_hash));
1028 }
1029
1030 if matching.is_empty() {
1031 tracing::debug!(
1032 "dispatch_event_postgres: event={event} ns={namespace} \
1033 matched zero subscribers (post-#932 dispatch path)"
1034 );
1035 return;
1036 }
1037 let n_matched = matching.len();
1038 tracing::debug!(
1039 "dispatch_event_postgres: event={event} ns={namespace} \
1040 dispatching to {n_matched} subscriber(s) via SAL"
1041 );
1042
1043 // Resolve the audit sqlite path via the shared db_state. Postgres
1044 // daemons still keep a sqlite scratch DB for federation/governance
1045 // state — audit rows + DLQ + dispatch counters still land there.
1046 let db_path = {
1047 let lock = app.db.lock().await;
1048 lock.1.clone()
1049 };
1050
1051 crate::subscriptions::dispatch_event_to_subs(
1052 matching, event, memory_id, namespace, agent_id, &db_path, details,
1053 );
1054}