#![allow(clippy::too_many_lines)]
use crate::models::field_names;
use axum::{
Json,
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
#[cfg(feature = "sal")]
use chrono::Utc;
use serde::Deserialize;
use serde_json::json;
use crate::db;
use crate::identity::sentinels;
use crate::models::LinkBody;
#[cfg(feature = "sal")]
use crate::models::MemoryLink;
use crate::validate;
use super::AppState;
#[cfg(feature = "sal")]
use super::StorageBackend;
#[cfg(feature = "sal")]
use super::store_err_to_response;
#[derive(Debug, Deserialize)]
pub struct VerifyLinkBody {
#[serde(default)]
pub source_id: Option<String>,
#[serde(default)]
pub target_id: Option<String>,
#[serde(default)]
pub link_id: Option<String>,
#[serde(default)]
pub verification_nonce: Option<String>,
}
pub async fn verify_link_handler(
State(app): State<AppState>,
Json(body): Json<VerifyLinkBody>,
) -> impl IntoResponse {
if body.source_id.is_none() && body.link_id.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": crate::errors::msg::VERIFY_LINK_ARGS_REQUIRED,
"fields": ["source_id", "link_id"],
})),
)
.into_response();
}
if let Some(s) = body.source_id.as_deref()
&& let Err(e) = validate::validate_id(s)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
)
.into_response();
}
if let Some(t) = body.target_id.as_deref()
&& let Err(e) = validate::validate_id(t)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid target_id: {e}")})),
)
.into_response();
}
let nonce_opt: Option<&str> = body.verification_nonce.as_deref().filter(|s| !s.is_empty());
match (nonce_opt, app.verify_require_nonce) {
(None, true) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": "verification_nonce is required when [verify] require_nonce = true",
"fields": ["verification_nonce"],
})),
)
.into_response();
}
(None, false) => {
tracing::warn!(
target: "ai_memory::verify",
"POST /api/v1/links/verify called without verification_nonce — \
replay protection is disabled for this request. Add a fresh \
UUID-v4 nonce to opt into H5 dedup; flip [verify] require_nonce = true \
to enforce."
);
}
(Some(_), _) => {
}
}
#[cfg(feature = "sal")]
{
let filter = crate::store::VerifyFilter {
source_id: body.source_id.clone(),
target_id: body.target_id.clone(),
link_id: body.link_id.clone(),
};
return match app.store.verify_link(filter).await {
Ok(report) => {
if let Some(nonce) = nonce_opt {
let canonical_id = format!(
"{}|{}|{}",
report.source_id, report.target_id, report.relation
);
let decision = app.replay_cache.record_and_check(&canonical_id, b"", nonce);
if matches!(decision, crate::identity::replay::ReplayDecision::Replay) {
return (
StatusCode::CONFLICT,
Json(json!({"error": "verification replay detected"})),
)
.into_response();
}
}
if crate::audit::is_enabled() {
crate::audit::emit(crate::audit::EventBuilder::new(
crate::audit::AuditAction::Link,
crate::audit::actor(sentinels::AI_HTTP, "http_body", None),
crate::audit::target_memory(
report.source_id.clone(),
String::new(),
Some(format!(
"verify -> {} {}",
report.target_id, report.relation
)),
None,
None,
),
));
}
Json(json!(report)).into_response()
}
Err(e) => store_err_to_response(e),
};
}
#[cfg(not(feature = "sal"))]
{
let _ = app;
let _ = body;
(
StatusCode::NOT_IMPLEMENTED,
Json(json!({"error": "verify_link requires --features sal"})),
)
.into_response()
}
}
const ALLOWED_LINK_BODY_FIELDS: &[&str] = &[
"source_id",
"from",
"target_id",
"to",
"relation",
"rel_type",
];
const ALLOWED_LINK_RELATIONS: &[&str] = &[
crate::models::MemoryLinkRelation::RelatedTo.as_str(),
crate::models::MemoryLinkRelation::Supersedes.as_str(),
crate::models::MemoryLinkRelation::Contradicts.as_str(),
crate::models::MemoryLinkRelation::DerivedFrom.as_str(),
crate::models::MemoryLinkRelation::ReflectsOn.as_str(),
];
fn unknown_link_body_fields(raw: &serde_json::Value) -> Option<Vec<String>> {
let obj = raw.as_object()?;
let mut unknown: Vec<String> = obj
.keys()
.filter(|k| !ALLOWED_LINK_BODY_FIELDS.contains(&k.as_str()))
.cloned()
.collect();
if unknown.is_empty() {
None
} else {
unknown.sort();
Some(unknown)
}
}
pub async fn create_link(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Json(raw): Json<serde_json::Value>,
) -> impl IntoResponse {
if let Some(unknown) = unknown_link_body_fields(&raw) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "unknown_field", "fields": unknown})),
)
.into_response();
}
let body: LinkBody = match serde_json::from_value(raw) {
Ok(b) => b,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
};
let (source_id, target_id, relation) = body.resolved();
if let Err(e) =
validate::RequestValidator::validate_link_triple(&source_id, &target_id, &relation)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
if crate::models::MemoryLinkRelation::from_str(&relation).is_none() {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": "invalid_relation",
"got": relation,
"allowed": ALLOWED_LINK_RELATIONS,
})),
)
.into_response();
}
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let now = Utc::now().to_rfc3339();
let relation_typed =
crate::models::MemoryLinkRelation::from_str(&relation).unwrap_or_default();
let link = MemoryLink {
source_id: source_id.clone(),
target_id: target_id.clone(),
relation: relation_typed,
created_at: now,
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
let ctx = crate::handlers::parity::http_caller_ctx(&headers, None);
return match app
.store
.link_signed(&ctx, &link, app.active_keypair.as_ref().as_ref())
.await
{
Ok(attest_level) => {
if crate::audit::is_enabled() {
crate::audit::emit(crate::audit::EventBuilder::new(
crate::audit::AuditAction::Link,
crate::audit::actor(sentinels::AI_HTTP, "http_body", None),
crate::audit::target_memory(
source_id.clone(),
String::new(),
Some(format!("{target_id} -> {relation}")),
None,
None,
),
));
}
let (link_namespace, link_owner) = match app.store.get(&ctx, &source_id).await {
Ok(m) => {
let owner = m
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.map(str::to_string);
(Some(m.namespace), owner)
}
Err(_) => (None, None),
};
if let Some(ns) = link_namespace {
let details =
serde_json::to_value(crate::subscriptions::LinkCreatedEventDetails {
target_id: target_id.clone(),
relation: relation.clone(),
})
.ok();
super::dispatch_event_postgres(
&app,
crate::subscriptions::webhook_events::MEMORY_LINK_CREATED,
&source_id,
&ns,
link_owner.as_deref(),
details,
)
.await;
}
(
StatusCode::CREATED,
Json(json!({
"linked": true,
"source_id": source_id,
"target_id": target_id,
"relation": relation,
(field_names::ATTEST_LEVEL): attest_level,
})),
)
.into_response()
}
Err(e) => store_err_to_response(e),
};
}
let caller = match crate::handlers::parity::resolve_caller_agent_id(None, &headers, None) {
Ok(c) => c,
Err(err) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": err}))).into_response();
}
};
let lock = app.db.lock().await;
let source_mem = match db::get(&lock.0, &source_id) {
Ok(Some(m)) => m,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": crate::errors::msg::SOURCE_MEMORY_NOT_FOUND, "source_id": source_id})),
)
.into_response();
}
Err(e) => {
tracing::error!("create_link: source lookup failed: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response();
}
};
let source_owner = source_mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let source_target = source_mem
.metadata
.get(field_names::TARGET_AGENT_ID)
.and_then(|v| v.as_str())
.unwrap_or("");
let is_unowned_legacy = source_owner.is_empty();
if !is_unowned_legacy
&& source_owner != caller
&& source_target != caller
&& caller != sentinels::DAEMON_PRINCIPAL
{
tracing::warn!(
target: super::AUTHZ_TRACE_TARGET,
"POST /api/v1/links 403: caller {caller} != source owner {source_owner} (source_id={source_id})"
);
return (
StatusCode::FORBIDDEN,
Json(json!({
"error": crate::errors::msg::CALLER_NOT_SOURCE_MEMORY_OWNER,
"owner": source_owner,
"caller": caller,
"source_id": source_id,
})),
)
.into_response();
}
if !caller.is_empty() {
if let Err(e) = crate::quotas::check_and_record(
&lock.0,
&caller,
&source_mem.namespace,
crate::quotas::QuotaOp::Link,
) {
return match e {
crate::quotas::QuotaCheckError::Quota(qe) => (
StatusCode::TOO_MANY_REQUESTS,
Json(json!({
"code": crate::errors::error_codes::QUOTA_EXCEEDED,
"error": qe.to_string(),
"limit": qe.limit.as_str(),
"current": qe.current,
"max": qe.max,
"agent_id": qe.agent_id,
})),
)
.into_response(),
crate::quotas::QuotaCheckError::Sql(se) => {
tracing::error!("create_link: quota substrate error: {se}");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "quota check failed"})),
)
.into_response()
}
};
}
}
let create_result = db::create_link_signed(
&lock.0,
&source_id,
&target_id,
&relation,
app.active_keypair.as_ref().as_ref(),
);
if create_result.is_ok() {
let (link_namespace, link_owner) = db::get(&lock.0, &source_id).ok().flatten().map_or_else(
|| (crate::DEFAULT_NAMESPACE.to_string(), None),
|m| {
let owner = m
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.map(str::to_string);
(m.namespace, owner)
},
);
let details = serde_json::to_value(crate::subscriptions::LinkCreatedEventDetails {
target_id: target_id.clone(),
relation: relation.clone(),
})
.ok();
crate::subscriptions::dispatch_event_with_details(
&lock.0,
crate::subscriptions::webhook_events::MEMORY_LINK_CREATED,
&source_id,
&link_namespace,
link_owner.as_deref(),
&lock.1,
details,
);
}
drop(lock);
match create_result {
Ok(attest_level) => {
if let Some(fed) = app.federation.as_ref() {
let relation_typed =
crate::models::MemoryLinkRelation::from_str(&relation).unwrap_or_default();
let link = crate::models::MemoryLink {
source_id: source_id.clone(),
target_id: target_id.clone(),
relation: relation_typed,
created_at: chrono::Utc::now().to_rfc3339(),
signature: None,
observed_by: None,
valid_from: None,
valid_until: None,
attest_level: None,
};
match crate::federation::broadcast_link_quorum(fed, &link).await {
Ok(tracker) => {
if let Err(err) = crate::federation::finalise_quorum(&tracker) {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return super::quorum_not_met_response(&payload);
}
}
Err(e) => {
tracing::warn!("link fanout error (local committed): {e:?}");
}
}
}
(
StatusCode::CREATED,
Json(json!({"linked": true, (field_names::ATTEST_LEVEL): attest_level})),
)
.into_response()
}
Err(e) => {
if let Some(se) = e.downcast_ref::<db::StorageError>() {
match se {
db::StorageError::LinkReflectionCycle { .. } => {
return (StatusCode::CONFLICT, Json(json!({"error": se.to_string()})))
.into_response();
}
db::StorageError::LinkPermissionDenied { .. } => {
return (
StatusCode::FORBIDDEN,
Json(json!({"error": se.to_string()})),
)
.into_response();
}
_ => {}
}
}
crate::handlers::errors::handler_error_500(&e)
}
}
}
pub async fn delete_link(
State(app): State<AppState>,
headers: HeaderMap,
Json(raw): Json<serde_json::Value>,
) -> impl IntoResponse {
if let Some(unknown) = unknown_link_body_fields(&raw) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "unknown_field", "fields": unknown})),
)
.into_response();
}
let body: LinkBody = match serde_json::from_value(raw) {
Ok(b) => b,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
};
let (source_id, target_id, relation) = body.resolved();
if let Err(e) =
validate::RequestValidator::validate_link_triple(&source_id, &target_id, &relation)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
.unwrap_or_else(|_| sentinels::ANONYMOUS_INVALID.to_string());
crate::governance::audit::record_decision(
&caller,
"allow",
"link_delete",
"",
json!({
"source_id": source_id,
"target_id": target_id,
"relation": relation,
}),
);
let lock = app.db.lock().await;
let source_mem = match db::get(&lock.0, &source_id) {
Ok(Some(m)) => Some(m),
Ok(None) => None,
Err(e) => {
drop(lock);
tracing::error!("delete_link: source lookup failed: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response();
}
};
let source_mem = match source_mem {
Some(m) => m,
None => {
let delete_result = db::delete_link(&lock.0, &source_id, &target_id);
drop(lock);
return match delete_result {
Ok(removed) => Json(json!({"deleted": removed})).into_response(),
Err(e) => crate::handlers::errors::handler_error_500(&e),
};
}
};
let target_mem_owner = db::get(&lock.0, &target_id).ok().flatten().and_then(|m| {
m.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.map(str::to_string)
});
let source_owner = source_mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let source_target = source_mem
.metadata
.get(field_names::TARGET_AGENT_ID)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let is_unowned_legacy =
source_owner.is_empty() && target_mem_owner.as_deref().unwrap_or("").is_empty();
let owns_source = source_owner == caller || source_target == caller;
let owns_target = target_mem_owner.as_deref() == Some(caller.as_str());
if !is_unowned_legacy && !owns_source && !owns_target && caller != sentinels::DAEMON_PRINCIPAL {
drop(lock);
tracing::warn!(
target: super::AUTHZ_TRACE_TARGET,
"DELETE /api/v1/links 403: caller {caller} owns neither source {source_owner} nor target {} (source_id={source_id})",
target_mem_owner.as_deref().unwrap_or("")
);
return (
StatusCode::FORBIDDEN,
Json(json!({
"error": "caller does not own either endpoint of this link",
"source_owner": source_owner,
"target_owner": target_mem_owner.unwrap_or_default(),
"caller": caller,
"source_id": source_id,
"target_id": target_id,
})),
)
.into_response();
}
let delete_result = db::delete_link(&lock.0, &source_id, &target_id);
drop(lock);
match delete_result {
Ok(removed) => Json(json!({"deleted": removed})).into_response(),
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}
pub async fn get_links(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Path(id): Path<String>,
) -> impl IntoResponse {
if let Err(e) = validate::validate_id(&id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
let caller = {
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
crate::identity::resolve_http_agent_id(None, header_agent_id)
.unwrap_or_else(|_| crate::identity::anonymous_request_id())
};
let caller_is_admin = crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
return match app.store.get_links_for_anchor(&id).await {
Ok(edges) => {
let visible = if caller_is_admin {
edges
} else {
let ctx = crate::store::CallerContext::for_agent(&caller);
let mut keep: Vec<_> = Vec::with_capacity(edges.len());
for link in edges {
let src_ok = app
.store
.get(&ctx, &link.source_id)
.await
.map(|m| crate::visibility::is_visible_to_caller(&m, &caller))
.unwrap_or(false);
let tgt_ok = app
.store
.get(&ctx, &link.target_id)
.await
.map(|m| crate::visibility::is_visible_to_caller(&m, &caller))
.unwrap_or(false);
if src_ok && tgt_ok {
keep.push(link);
}
}
keep
};
Json(json!({"links": visible})).into_response()
}
Err(e) => store_err_to_response(e),
};
}
let lock = app.db.lock().await;
let links_for_anchor = db::get_links(&lock.0, &id);
drop(lock);
match links_for_anchor {
Ok(links) => {
let visible = if caller_is_admin {
links
} else {
#[cfg(feature = "sal")]
{
let ctx = crate::store::CallerContext::for_agent(&caller);
let mut keep: Vec<_> = Vec::with_capacity(links.len());
for link in links {
let src_ok = app
.store
.get(&ctx, &link.source_id)
.await
.map(|m| crate::visibility::is_visible_to_caller(&m, &caller))
.unwrap_or(false);
let tgt_ok = app
.store
.get(&ctx, &link.target_id)
.await
.map(|m| crate::visibility::is_visible_to_caller(&m, &caller))
.unwrap_or(false);
if src_ok && tgt_ok {
keep.push(link);
}
}
keep
}
#[cfg(not(feature = "sal"))]
{
let lock = app.db.lock().await;
links
.into_iter()
.filter(|link| {
let src_ok = db::get(&lock.0, &link.source_id)
.ok()
.flatten()
.as_ref()
.is_some_and(|m| {
crate::visibility::is_visible_to_caller(m, &caller)
});
let tgt_ok = db::get(&lock.0, &link.target_id)
.ok()
.flatten()
.as_ref()
.is_some_and(|m| {
crate::visibility::is_visible_to_caller(m, &caller)
});
src_ok && tgt_ok
})
.collect()
}
};
Json(json!({"links": visible})).into_response()
}
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}