use crate::mcp::param_names;
use crate::mcp::registry::McpTool;
use crate::models::field_names;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_json::{Value, json};
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct SubscribeRequest {
pub url: String,
#[serde(default)]
pub events: Option<String>,
#[serde(default)]
pub secret: Option<String>,
#[serde(default)]
pub namespace_filter: Option<String>,
#[serde(default)]
pub agent_filter: Option<String>,
#[schemars(description = "#912 event-type subset.")]
#[serde(default)]
pub event_types: Option<Vec<String>>,
}
#[allow(dead_code)]
pub struct SubscribeTool;
impl McpTool for SubscribeTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_SUBSCRIBE
}
fn description() -> &'static str {
"Register a webhook subscription for memory events."
}
fn docs() -> &'static str {
"Webhook subscription. HMAC-SHA256 signed via X-Ai-Memory-Signature when secret supplied. https required (http only for loopback). Secret stored hashed only."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<SubscribeRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Governance.name()
}
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct UnsubscribeRequest {
pub id: String,
}
#[allow(dead_code)]
pub struct UnsubscribeTool;
impl McpTool for UnsubscribeTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_UNSUBSCRIBE
}
fn description() -> &'static str {
"Delete a subscription by id."
}
fn docs() -> &'static str {
"Delete subscription. DLQ rows retained for audit."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<UnsubscribeRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Governance.name()
}
}
pub fn handle_subscribe(
conn: &rusqlite::Connection,
params: &Value,
mcp_client: Option<&str>,
) -> Result<Value, String> {
let url = params["url"].as_str().ok_or("url is required")?;
let events = params["events"].as_str().unwrap_or("*");
let secret = params["secret"].as_str();
let namespace_filter = params[param_names::NAMESPACE_FILTER].as_str();
let agent_filter = params[param_names::AGENT_FILTER].as_str();
let created_by =
crate::identity::resolve_agent_id(None, mcp_client).map_err(|e| e.to_string())?;
if secret.is_none_or(str::is_empty) && crate::config::active_hooks_hmac_secret().is_none() {
return Err(
"HMAC secret required: configure per-subscription `hmac_secret` or \
server-wide `[security] hmac_secret`. Pass `secret: <value>` in the \
tool call, OR set [hooks.subscription] hmac_secret in the daemon \
config. Unsigned subscription dispatch was disabled in v0.7.0 \
(fix campaign R3-S1.HMAC, 2026-05-13)."
.to_string(),
);
}
let event_types: Option<Vec<String>> = params[field_names::EVENT_TYPES].as_array().map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
});
let registered = crate::db::list_agents(conn)
.map_err(|e| e.to_string())?
.into_iter()
.any(|a| a.agent_id == created_by);
if !registered {
return Err(format!(
"agent {created_by:?} is not registered; call memory_agent_register before memory_subscribe"
));
}
crate::subscriptions::validate_url(url).map_err(|e| e.to_string())?;
let id = crate::subscriptions::insert(
conn,
&crate::subscriptions::NewSubscription {
url,
events,
secret,
namespace_filter,
agent_filter,
created_by: Some(&created_by),
event_types: event_types.as_deref(),
},
)
.map_err(|e| e.to_string())?;
let mut response = json!({
"id": id,
"url": url,
"events": events,
(field_names::NAMESPACE_FILTER): namespace_filter,
(field_names::AGENT_FILTER): agent_filter,
(field_names::CREATED_BY): created_by,
});
if let Some(et) = &event_types {
response[field_names::EVENT_TYPES] = json!(et);
}
Ok(response)
}
pub fn handle_unsubscribe(
conn: &rusqlite::Connection,
params: &Value,
mcp_client: Option<&str>,
) -> Result<Value, String> {
let id = params["id"]
.as_str()
.ok_or(crate::errors::msg::ID_REQUIRED)?;
let caller = crate::identity::resolve_agent_id(None, mcp_client).map_err(|e| e.to_string())?;
let removed =
crate::subscriptions::delete(conn, id, Some(&caller)).map_err(|e| e.to_string())?;
Ok(json!({"id": id, "removed": removed}))
}
pub fn handle_list_subscriptions(
conn: &rusqlite::Connection,
mcp_client: Option<&str>,
) -> Result<Value, String> {
let caller = crate::identity::resolve_agent_id(None, mcp_client).map_err(|e| e.to_string())?;
let subs = crate::subscriptions::list(conn, Some(&caller)).map_err(|e| e.to_string())?;
Ok(json!({"count": subs.len(), (field_names::SUBSCRIPTIONS): subs}))
}
pub fn handle_subscription_replay(
conn: &rusqlite::Connection,
params: &Value,
mcp_client: Option<&str>,
) -> Result<Value, String> {
let subscription_id = params[param_names::SUBSCRIPTION_ID]
.as_str()
.ok_or("subscription_id is required")?;
let since = params["since"]
.as_str()
.ok_or("since is required (RFC3339)")?;
let caller = crate::identity::resolve_agent_id(None, mcp_client).map_err(|e| e.to_string())?;
let owner =
crate::subscriptions::get_owner(conn, subscription_id).map_err(|e| e.to_string())?;
let owner_matches = owner.as_deref() == Some(caller.as_str());
if !owner_matches {
return Ok(json!({
(field_names::SUBSCRIPTION_ID): subscription_id,
"since": since,
"count": 0,
"events": Vec::<Value>::new(),
}));
}
crate::subscriptions::memory_subscription_replay(conn, subscription_id, since)
.map_err(|e| e.to_string())
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct ListSubscriptionsRequest {}
#[allow(dead_code)]
pub struct ListSubscriptionsTool;
impl McpTool for ListSubscriptionsTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_LIST_SUBSCRIPTIONS
}
fn description() -> &'static str {
"List active webhook subscriptions."
}
fn docs() -> &'static str {
"List subscriptions. Secrets never returned."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<ListSubscriptionsRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Other.name()
}
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct SubscriptionReplayRequest {
pub subscription_id: String,
pub since: String,
}
#[allow(dead_code)]
pub struct SubscriptionReplayTool;
impl McpTool for SubscriptionReplayTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_SUBSCRIPTION_REPLAY
}
fn description() -> &'static str {
"Replay subscription_events since an RFC3339 timestamp."
}
fn docs() -> &'static str {
"K7: replay events ordered by delivered_at asc."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<SubscriptionReplayRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Power.name()
}
}
#[cfg(test)]
mod d1_5_986_tests {
use super::*;
use crate::mcp::parity_test_helpers::{
assert_descriptions_match, assert_property_set_parity, derived_props_for,
};
#[test]
fn list_subscriptions_parity_986() {
let derived = derived_props_for::<ListSubscriptionsRequest>();
assert_property_set_parity("memory_list_subscriptions", &derived);
assert_descriptions_match("memory_list_subscriptions", &derived);
}
#[test]
fn list_subscriptions_tool_metadata_986() {
assert_eq!(ListSubscriptionsTool::name(), "memory_list_subscriptions");
assert_eq!(ListSubscriptionsTool::family(), "other");
}
#[test]
fn subscription_replay_parity_986() {
let derived = derived_props_for::<SubscriptionReplayRequest>();
assert_property_set_parity("memory_subscription_replay", &derived);
assert_descriptions_match("memory_subscription_replay", &derived);
}
#[test]
fn subscription_replay_tool_metadata_986() {
assert_eq!(SubscriptionReplayTool::name(), "memory_subscription_replay");
assert_eq!(SubscriptionReplayTool::family(), "power");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage as db;
use serde_json::json;
fn fresh_conn() -> rusqlite::Connection {
db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
}
fn register_agent(conn: &rusqlite::Connection) -> String {
let agent_id = crate::identity::resolve_agent_id(None, None).unwrap();
db::register_agent(conn, &agent_id, "test", &[]).expect("register");
agent_id
}
#[test]
fn no_secret_refuses_unsigned() {
crate::config::set_active_hooks_hmac_secret(None);
let conn = fresh_conn();
let _ = register_agent(&conn);
let err = handle_subscribe(
&conn,
&json!({"url": "https://example.com/hook", "events": "*"}),
None,
)
.unwrap_err();
assert!(err.contains("HMAC secret required"), "got: {err}");
}
#[test]
fn per_subscription_secret_accepted() {
crate::config::set_active_hooks_hmac_secret(None);
let conn = fresh_conn();
let _ = register_agent(&conn);
let resp = handle_subscribe(
&conn,
&json!({
"url": "https://example.com/hook",
"events": "memory_store",
"secret": "shared-secret-hex",
}),
None,
)
.expect("ok");
assert!(resp["id"].is_string());
assert_eq!(resp["url"].as_str(), Some("https://example.com/hook"));
assert_eq!(resp["events"].as_str(), Some("memory_store"));
}
#[test]
fn event_types_array_propagated() {
crate::config::set_active_hooks_hmac_secret(None);
let conn = fresh_conn();
let _ = register_agent(&conn);
let resp = handle_subscribe(
&conn,
&json!({
"url": "https://example.com/hook",
"secret": "shared-secret-hex",
"event_types": ["memory_store", "memory_link_created"],
}),
None,
)
.expect("ok");
let arr = resp["event_types"].as_array().expect("array");
assert_eq!(arr.len(), 2);
}
#[test]
fn missing_url_errors() {
crate::config::set_active_hooks_hmac_secret(None);
let conn = fresh_conn();
let _ = register_agent(&conn);
let err = handle_subscribe(&conn, &json!({"secret": "s"}), None).unwrap_err();
assert!(err.contains("url"), "got: {err}");
}
#[test]
fn unregistered_agent_refused() {
crate::config::set_active_hooks_hmac_secret(None);
let conn = fresh_conn();
let err = handle_subscribe(
&conn,
&json!({"url": "https://example.com/hook", "secret": "s"}),
None,
)
.unwrap_err();
assert!(err.contains("not registered"), "got: {err}");
}
#[test]
fn invalid_url_rejected() {
crate::config::set_active_hooks_hmac_secret(None);
let conn = fresh_conn();
let _ = register_agent(&conn);
let err =
handle_subscribe(&conn, &json!({"url": "not-a-url", "secret": "s"}), None).unwrap_err();
assert!(!err.is_empty());
}
#[test]
fn unsubscribe_unknown_id_returns_false() {
let conn = fresh_conn();
let resp = handle_unsubscribe(
&conn,
&json!({"id": "00000000-0000-0000-0000-000000000000"}),
None,
)
.expect("ok");
assert_eq!(resp["removed"], false);
}
#[test]
fn unsubscribe_missing_id_errors() {
let conn = fresh_conn();
let err = handle_unsubscribe(&conn, &json!({}), None).unwrap_err();
assert!(err.contains("id"), "got: {err}");
}
#[test]
fn list_subscriptions_empty() {
let conn = fresh_conn();
let resp = handle_list_subscriptions(&conn, None).expect("ok");
assert_eq!(resp["count"].as_u64(), Some(0));
}
#[test]
fn subscription_replay_missing_id_errors() {
let conn = fresh_conn();
let err =
handle_subscription_replay(&conn, &json!({"since": "2026-01-01T00:00:00Z"}), None)
.unwrap_err();
assert!(err.contains("subscription_id"), "got: {err}");
}
#[test]
fn subscription_replay_missing_since_errors() {
let conn = fresh_conn();
let err = handle_subscription_replay(&conn, &json!({"subscription_id": "sub-1"}), None)
.unwrap_err();
assert!(err.contains("since"), "got: {err}");
}
#[test]
fn subscription_replay_cross_tenant_returns_not_found_1115() {
crate::config::set_active_hooks_hmac_secret(None);
let conn = fresh_conn();
db::register_agent(&conn, "ai:alice", "test", &[]).expect("register alice");
let sid = crate::subscriptions::insert(
&conn,
&crate::subscriptions::NewSubscription {
url: "https://example.com/alice",
events: "memory_store",
secret: Some("sek-alice"),
namespace_filter: None,
agent_filter: None,
created_by: Some("ai:alice"),
event_types: None,
},
)
.expect("insert alice sub");
let resp = handle_subscription_replay(
&conn,
&json!({"subscription_id": sid, "since": "1970-01-01T00:00:00Z"}),
Some("ai:bob-client"),
)
.expect("ok");
assert_eq!(resp["count"].as_u64(), Some(0), "bob must see empty");
assert!(resp["events"].as_array().unwrap().is_empty());
}
}
#[cfg(test)]
mod d1_4_985_tests {
use super::*;
use crate::mcp::d1_4_985_helpers::{
assert_descriptions_match, assert_property_set_parity, derived_props_for,
};
#[test]
fn memory_subscribe_parity_985() {
let derived = derived_props_for::<SubscribeRequest>();
assert_property_set_parity("memory_subscribe", &derived);
assert_descriptions_match("memory_subscribe", &derived);
}
#[test]
fn memory_subscribe_tool_metadata_985() {
assert_eq!(SubscribeTool::name(), "memory_subscribe");
assert_eq!(SubscribeTool::family(), "governance");
}
#[test]
fn memory_unsubscribe_parity_985() {
let derived = derived_props_for::<UnsubscribeRequest>();
assert_property_set_parity("memory_unsubscribe", &derived);
assert_descriptions_match("memory_unsubscribe", &derived);
}
#[test]
fn memory_unsubscribe_tool_metadata_985() {
assert_eq!(UnsubscribeTool::name(), "memory_unsubscribe");
assert_eq!(UnsubscribeTool::family(), "governance");
}
}