#![allow(clippy::too_many_lines)]
use crate::models::field_names;
use axum::{
Json,
extract::{Query, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use serde::Deserialize;
use serde_json::json;
use crate::db;
#[cfg(feature = "sal")]
use crate::models::{ConfidenceSource, Memory, Tier};
#[cfg(feature = "sal")]
use chrono::Utc;
use super::AppState;
#[cfg(feature = "sal")]
use super::StorageBackend;
#[cfg(feature = "sal")]
use super::store_err_to_response;
use super::{fanout_or_503, resolve_caller_agent_id};
#[cfg(feature = "sal")]
const SUBSCRIPTION_NS_PREFIX: &str = "_subscriptions/";
#[cfg(feature = "sal")]
const KIND_SUBSCRIPTION: &str = "subscription";
#[cfg(feature = "sal")]
fn caller_subscription_ns(caller: impl std::fmt::Display) -> String {
format!("_subscriptions/{caller}")
}
#[cfg(feature = "sal")]
const SUBSCRIPTION_DISPATCH_LIMIT: usize = 1000;
#[derive(Deserialize)]
pub struct NotifyBody {
pub target_agent_id: String,
pub title: String,
#[serde(default)]
pub payload: Option<String>,
#[serde(default)]
pub content: Option<String>,
#[serde(default)]
pub priority: Option<i64>,
#[serde(default)]
pub tier: Option<String>,
#[serde(default)]
pub agent_id: Option<String>,
}
pub async fn notify(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<NotifyBody>,
) -> impl IntoResponse {
let Some(payload) = body.payload.or(body.content) else {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "payload or content is required"})),
)
.into_response();
};
let sender = match resolve_caller_agent_id(None, &headers, None) {
Ok(id) => id,
Err(e) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
}
};
if let Some(claimed) = body.agent_id.as_deref()
&& claimed != sender
{
return (
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
)
.into_response();
}
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let priority_i32 = body.priority.and_then(|p| i32::try_from(p).ok());
let resolved_tier = body.tier.as_deref().and_then(Tier::from_str);
let ctx = crate::store::CallerContext::for_agent(&sender);
let new_id = match app
.store
.notify(
&ctx,
&body.target_agent_id,
&body.title,
&payload,
priority_i32,
resolved_tier.as_ref(),
)
.await
{
Ok(id) => id,
Err(e) => return store_err_to_response(e),
};
let fanout_mem = match app.store.get(&ctx, &new_id).await {
Ok(m) => Some(m),
Err(e) => {
tracing::warn!(
"postgres notify: refetch for fanout failed for {new_id}: {e:?} \
(local commit landed; sync-daemon will catch peers up)"
);
None
}
};
if let Some(mem) = fanout_mem.as_ref()
&& let Some(resp) = fanout_or_503(&app, mem).await
{
return resp;
}
return (
StatusCode::CREATED,
Json(json!({
"id": new_id,
(field_names::TARGET_AGENT_ID): body.target_agent_id,
"namespace": crate::inbox_namespace(&body.target_agent_id),
(field_names::STORAGE_BACKEND): "postgres",
})),
)
.into_response();
}
let mut params = json!({
(field_names::TARGET_AGENT_ID): body.target_agent_id,
"title": body.title,
"payload": payload,
});
if let Some(p) = body.priority {
params["priority"] = json!(p);
}
if let Some(t) = body.tier {
params["tier"] = json!(t);
}
let lock = app.db.lock().await;
let resolved_ttl = lock.2.clone();
let mcp_client = sender.clone();
let result = crate::mcp::handle_notify(&lock.0, ¶ms, &resolved_ttl, Some(&mcp_client));
let fanout_mem = match &result {
Ok(v) => v
.get("id")
.and_then(|x| x.as_str())
.and_then(|id| db::get(&lock.0, id).ok().flatten()),
Err(_) => None,
};
drop(lock);
match result {
Ok(v) => {
if let Some(mem) = fanout_mem
&& let Some(resp) = fanout_or_503(&app, &mem).await
{
return resp;
}
(StatusCode::CREATED, Json(v)).into_response()
}
Err(e) => super::bad_request_opaque("notify handler error", &e),
}
}
#[derive(Deserialize)]
pub struct SubscribeBody {
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
pub events: Option<String>,
#[serde(default)]
pub secret: Option<String>,
#[serde(default)]
pub namespace_filter: Option<String>,
#[serde(default)]
pub agent_filter: Option<String>,
#[serde(default)]
pub namespace: Option<String>,
#[serde(default)]
pub agent_id: Option<String>,
}
pub async fn subscribe(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<SubscribeBody>,
) -> impl IntoResponse {
let caller = match resolve_caller_agent_id(None, &headers, None) {
Ok(id) => id,
Err(e) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
}
};
if let Some(claimed) = body.agent_id.as_deref()
&& claimed != caller
{
return (
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
)
.into_response();
}
if body.secret.as_deref().is_none_or(str::is_empty)
&& crate::config::active_hooks_hmac_secret().is_none()
{
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": "HMAC secret required: configure per-subscription `hmac_secret` or server-wide `[security] hmac_secret`",
"hint": "Pass `secret: <value>` in the subscribe request body, OR set [hooks.subscription] hmac_secret in the daemon config. \
Unsigned subscription dispatch was disabled in v0.7.0 (fix campaign R3-S1.HMAC, 2026-05-13)."
})),
)
.into_response();
}
let mut url_was_synthesized = false;
let _ = &url_was_synthesized;
let (url, namespace_filter, agent_filter) = if let Some(u) = body.url {
(u, body.namespace_filter, body.agent_filter)
} else {
let Some(ns) = body.namespace.clone() else {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "url or namespace is required"})),
)
.into_response();
};
#[allow(unused_assignments)]
{
url_was_synthesized = true;
}
let synthetic = format!("http://localhost/_ns/{caller}/{ns}");
(
synthetic,
Some(ns),
body.agent_filter.or_else(|| Some(caller.clone())),
)
};
let events = body.events.unwrap_or_else(|| "*".to_string());
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
if !url_was_synthesized && let Err(e) = crate::subscriptions::validate_url(&url) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
let sub_id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let ns = caller_subscription_ns(&caller);
let secret_hash_for_metadata: Option<String> = body
.secret
.as_deref()
.filter(|s| !s.is_empty())
.map(crate::subscriptions::sha256_hex);
let metadata = json!({
"kind": KIND_SUBSCRIPTION,
"agent_id": caller,
(field_names::SUBSCRIPTION_ID): sub_id,
"url": url,
"events": events,
(field_names::NAMESPACE_FILTER): namespace_filter,
(field_names::AGENT_FILTER): agent_filter,
"secret_hash": secret_hash_for_metadata,
(field_names::CREATED_BY): caller,
(field_names::CREATED_AT): now,
});
let mem = Memory {
id: sub_id.clone(),
tier: Tier::Long,
namespace: ns,
title: format!("subscription:{sub_id}"),
content: format!(
"subscription for {caller} -> {} (events={events})",
namespace_filter.as_deref().unwrap_or("*")
),
tags: vec![KIND_SUBSCRIPTION.to_string()],
priority: 5,
confidence: 1.0,
source: "subscribe".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let ctx = crate::store::CallerContext::for_agent(&caller);
let stored_id = match app.store.store(&ctx, &mem).await {
Ok(id) => id,
Err(e) => return store_err_to_response(e),
};
if let Some(resp) = fanout_or_503(&app, &mem).await {
return resp;
}
return (
StatusCode::CREATED,
Json(json!({
"id": stored_id,
"url": url,
"events": events,
"namespace": namespace_filter,
(field_names::NAMESPACE_FILTER): namespace_filter,
(field_names::AGENT_FILTER): agent_filter,
"agent_id": caller,
(field_names::CREATED_BY): caller,
(field_names::STORAGE_BACKEND): "postgres",
})),
)
.into_response();
}
let lock = app.db.lock().await;
let already = db::list_agents(&lock.0)
.ok()
.is_some_and(|a| a.iter().any(|x| x.agent_id == caller));
if !already {
let _ = db::register_agent(&lock.0, &caller, "ai:generic", &[]);
}
let sub_result: Result<serde_json::Value, String> = (|| {
crate::subscriptions::validate_url(&url).map_err(|e| e.to_string())?;
let id = crate::subscriptions::insert(
&lock.0,
&crate::subscriptions::NewSubscription {
url: &url,
events: &events,
secret: body.secret.as_deref(),
namespace_filter: namespace_filter.as_deref(),
agent_filter: agent_filter.as_deref(),
created_by: Some(&caller),
event_types: None,
},
)
.map_err(|e| e.to_string())?;
Ok(json!({
"id": id,
"url": url,
"events": events,
(field_names::NAMESPACE_FILTER): namespace_filter,
(field_names::AGENT_FILTER): agent_filter,
(field_names::CREATED_BY): caller,
}))
})();
let registered_mem = if already {
None
} else {
db::list(
&lock.0,
Some(crate::models::AGENTS_NAMESPACE),
None,
crate::storage::LIST_MAX_LIMIT,
0,
None,
None,
None,
None,
None,
)
.ok()
.and_then(|rows| {
rows.into_iter()
.find(|m| m.title == crate::models::agent_registration_title(&caller))
})
};
drop(lock);
if let Some(ref mem) = registered_mem
&& let Some(resp) = fanout_or_503(&app, mem).await
{
return resp;
}
match sub_result {
Ok(mut v) => {
if let Some(obj) = v.as_object_mut() {
if let Some(ref ns) = namespace_filter {
obj.insert("namespace".into(), json!(ns));
}
obj.insert("agent_id".into(), json!(caller));
}
(StatusCode::CREATED, Json(v)).into_response()
}
Err(e) => (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response(),
}
}
#[derive(Deserialize)]
pub struct UnsubscribeQuery {
#[serde(default)]
pub id: Option<String>,
#[serde(default)]
pub agent_id: Option<String>,
#[serde(default)]
pub namespace: Option<String>,
}
pub async fn unsubscribe(
State(app): State<AppState>,
headers: HeaderMap,
Query(q): Query<UnsubscribeQuery>,
) -> impl IntoResponse {
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let caller = match resolve_caller_agent_id(None, &headers, None) {
Ok(id) => id,
Err(e) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
}
};
if let Some(claimed) = q.agent_id.as_deref()
&& claimed != caller
{
return (
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
)
.into_response();
}
let ctx = crate::store::CallerContext::for_agent(&caller);
let target_id: Option<String> = if let Some(id) = q.id.clone() {
Some(id)
} else {
let Some(ns) = q.namespace.clone() else {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "id or (agent_id, namespace) required"})),
)
.into_response();
};
let sub_ns = caller_subscription_ns(&caller);
let filter = crate::store::Filter {
namespace: Some(sub_ns),
limit: crate::storage::LIST_MAX_LIMIT,
..Default::default()
};
match app.store.list(&ctx, &filter).await {
Ok(rows) => rows
.into_iter()
.find(|m| {
m.metadata
.get(field_names::NAMESPACE_FILTER)
.and_then(|v| v.as_str())
== Some(ns.as_str())
})
.map(|m| {
m.metadata
.get(field_names::SUBSCRIPTION_ID)
.and_then(|v| v.as_str())
.map(str::to_string)
.unwrap_or(m.id)
}),
Err(e) => return store_err_to_response(e),
}
};
return match target_id {
Some(id) => match app.store.delete(&ctx, &id).await {
Ok(()) => (
StatusCode::OK,
Json(json!({"id": id, "removed": true, (field_names::STORAGE_BACKEND): "postgres"})),
)
.into_response(),
Err(crate::store::StoreError::NotFound { .. }) => (
StatusCode::OK,
Json(json!({"id": id, "removed": false, (field_names::STORAGE_BACKEND): "postgres"})),
)
.into_response(),
Err(e) => store_err_to_response(e),
},
None => (
StatusCode::OK,
Json(json!({
"id": "",
"removed": false,
(field_names::STORAGE_BACKEND): "postgres",
})),
)
.into_response(),
};
}
let caller = match resolve_caller_agent_id(None, &headers, None) {
Ok(id) => id,
Err(e) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
}
};
if let Some(claimed) = q.agent_id.as_deref()
&& claimed != caller
{
return (
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
)
.into_response();
}
if let Some(id) = q.id.clone() {
let lock = app.db.lock().await;
let outcome = crate::subscriptions::delete(&lock.0, &id, Some(&caller));
drop(lock);
return match outcome {
Ok(removed) => {
(StatusCode::OK, Json(json!({"id": id, "removed": removed}))).into_response()
}
Err(e) => {
tracing::error!("{}", crate::errors::msg::unsubscribe(&e));
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response()
}
};
}
let Some(ns) = q.namespace else {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "id or (agent_id, namespace) required"})),
)
.into_response();
};
let lock = app.db.lock().await;
let subs = crate::subscriptions::list(&lock.0, Some(&caller)).unwrap_or_default();
let target = subs
.into_iter()
.find(|s| s.namespace_filter.as_deref() == Some(ns.as_str()));
let outcome = match target {
Some(s) => crate::subscriptions::delete(&lock.0, &s.id, Some(&caller)).map(|r| (s.id, r)),
None => Ok((String::new(), false)),
};
drop(lock);
match outcome {
Ok((id, removed)) => {
(StatusCode::OK, Json(json!({"id": id, "removed": removed}))).into_response()
}
Err(e) => {
tracing::error!("{}", crate::errors::msg::unsubscribe(&e));
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct ListSubscriptionsQuery {
#[serde(default)]
pub agent_id: Option<String>,
}
pub async fn list_subscriptions(
State(app): State<AppState>,
headers: HeaderMap,
Query(q): Query<ListSubscriptionsQuery>,
) -> impl IntoResponse {
let caller = match resolve_caller_agent_id(None, &headers, None) {
Ok(id) => id,
Err(e) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response();
}
};
if let Some(claimed) = q.agent_id.as_deref()
&& claimed != caller
{
return (
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::AGENT_ID_QUERY_MISMATCH})),
)
.into_response();
}
#[cfg(feature = "sal-postgres")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let ctx = crate::store::CallerContext::for_agent(&caller);
let namespaces: Vec<String> = vec![caller_subscription_ns(&caller)];
let mut rows: Vec<serde_json::Value> = Vec::new();
for ns in namespaces {
let filter = crate::store::Filter {
namespace: Some(ns),
limit: crate::storage::LIST_MAX_LIMIT,
..Default::default()
};
match app.store.list(&ctx, &filter).await {
Ok(memories) => {
for m in memories {
let meta = m.metadata;
if meta.get("kind").and_then(|v| v.as_str()) != Some(KIND_SUBSCRIPTION) {
continue;
}
let sub_id = meta
.get(field_names::SUBSCRIPTION_ID)
.cloned()
.unwrap_or_else(|| serde_json::Value::String(m.id.clone()));
rows.push(json!({
"id": sub_id,
"url": meta.get("url").cloned().unwrap_or(serde_json::Value::Null),
"events": meta.get("events").cloned().unwrap_or(serde_json::Value::Null),
"namespace": meta.get(field_names::NAMESPACE_FILTER).cloned().unwrap_or(serde_json::Value::Null),
(field_names::NAMESPACE_FILTER): meta.get(field_names::NAMESPACE_FILTER).cloned().unwrap_or(serde_json::Value::Null),
(field_names::AGENT_FILTER): meta.get(field_names::AGENT_FILTER).cloned().unwrap_or(serde_json::Value::Null),
"agent_id": meta.get("agent_id").cloned().unwrap_or(serde_json::Value::Null),
(field_names::CREATED_BY): meta.get(field_names::CREATED_BY).cloned().unwrap_or(serde_json::Value::Null),
(field_names::CREATED_AT): meta.get(field_names::CREATED_AT).cloned().unwrap_or(serde_json::Value::Null),
"dispatch_count": 0,
"failure_count": 0,
}));
}
}
Err(e) => return store_err_to_response(e),
}
}
let count = rows.len();
return (
StatusCode::OK,
Json(json!({
"count": count,
(field_names::SUBSCRIPTIONS): rows,
(field_names::STORAGE_BACKEND): "postgres",
})),
)
.into_response();
}
let state = app.db.clone();
let lock = state.lock().await;
let subs = match crate::subscriptions::list(&lock.0, Some(&caller)) {
Ok(s) => s,
Err(e) => {
tracing::error!("list_subscriptions: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response();
}
};
drop(lock);
let filtered = subs;
let rows: Vec<serde_json::Value> = filtered
.iter()
.map(|s| {
json!({
"id": s.id,
"url": s.url,
"events": s.events,
"namespace": s.namespace_filter,
(field_names::NAMESPACE_FILTER): s.namespace_filter,
(field_names::AGENT_FILTER): s.agent_filter,
"agent_id": s.agent_filter.clone().or(s.created_by.clone()),
(field_names::CREATED_BY): s.created_by,
(field_names::CREATED_AT): s.created_at,
"dispatch_count": s.dispatch_count,
"failure_count": s.failure_count,
})
})
.collect();
let count = rows.len();
(
StatusCode::OK,
Json(json!({"count": count, (field_names::SUBSCRIPTIONS): rows})),
)
.into_response()
}
#[cfg(feature = "sal")]
pub async fn dispatch_event_postgres(
app: &AppState,
event: &str,
memory_id: &str,
namespace: &str,
agent_id: Option<&str>,
details: Option<serde_json::Value>,
) {
let ctx =
crate::store::CallerContext::for_admin(crate::identity::sentinels::SUBSCRIPTION_DISPATCH);
let memories = match app
.store
.list_by_namespace_prefix(&ctx, SUBSCRIPTION_NS_PREFIX, SUBSCRIPTION_DISPATCH_LIMIT)
.await
{
Ok(rows) => rows,
Err(e) => {
tracing::warn!(
"dispatch_event_postgres: SAL prefix-list failed: {e} — \
no subscribers will fire this tick"
);
return;
}
};
let mut matching: Vec<(crate::subscriptions::Subscription, Option<String>)> = Vec::new();
for m in memories {
if !m.namespace.starts_with(SUBSCRIPTION_NS_PREFIX) {
continue;
}
let meta = &m.metadata;
if meta.get("kind").and_then(|v| v.as_str()) != Some(KIND_SUBSCRIPTION) {
continue;
}
let sub_id = meta
.get(field_names::SUBSCRIPTION_ID)
.and_then(|v| v.as_str())
.map(str::to_string)
.unwrap_or_else(|| m.id.clone());
let url = match meta.get("url").and_then(|v| v.as_str()) {
Some(u) => u.to_string(),
None => continue, };
let events_csv = meta
.get("events")
.and_then(|v| v.as_str())
.unwrap_or("*")
.to_string();
let namespace_filter = meta
.get(field_names::NAMESPACE_FILTER)
.and_then(|v| v.as_str())
.map(str::to_string);
let agent_filter = meta
.get(field_names::AGENT_FILTER)
.and_then(|v| v.as_str())
.map(str::to_string);
let created_by = meta
.get(field_names::CREATED_BY)
.and_then(|v| v.as_str())
.map(str::to_string);
let created_at = meta
.get(field_names::CREATED_AT)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let secret_hash = meta
.get("secret_hash")
.and_then(|v| v.as_str())
.map(str::to_string);
if !crate::subscriptions::matches_filters(
&events_csv,
None,
namespace_filter.as_deref(),
agent_filter.as_deref(),
event,
namespace,
agent_id,
) {
continue;
}
let sub = crate::subscriptions::Subscription {
id: sub_id,
url,
events: events_csv,
namespace_filter,
agent_filter,
created_by,
created_at,
dispatch_count: 0,
failure_count: 0,
event_types: None,
};
matching.push((sub, secret_hash));
}
if matching.is_empty() {
tracing::debug!(
"dispatch_event_postgres: event={event} ns={namespace} \
matched zero subscribers (post-#932 dispatch path)"
);
return;
}
let n_matched = matching.len();
tracing::debug!(
"dispatch_event_postgres: event={event} ns={namespace} \
dispatching to {n_matched} subscriber(s) via SAL"
);
let db_path = {
let lock = app.db.lock().await;
lock.1.clone()
};
crate::subscriptions::dispatch_event_to_subs(
matching, event, memory_id, namespace, agent_id, &db_path, details,
);
}