use axum::{
Json,
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use serde_json::json;
use super::transport::AppState;
use crate::federation::QuorumNotMetPayload;
use crate::models::Memory;
use crate::validate;
pub(crate) fn quorum_not_met_response(payload: &QuorumNotMetPayload) -> axum::response::Response {
let body = super::to_value_or_500("quorum_not_met_response", payload);
match body {
Ok(v) => (
StatusCode::SERVICE_UNAVAILABLE,
[("Retry-After", "2")],
Json(v),
)
.into_response(),
Err(resp) => resp,
}
}
pub(crate) async fn fanout_or_503(
app: &AppState,
mem: &Memory,
) -> Option<axum::response::Response> {
let fed = app.federation.as_ref().as_ref()?;
match crate::federation::broadcast_store_quorum(fed, mem).await {
Ok(tracker) => match crate::federation::finalise_quorum(&tracker) {
Ok(_) => None,
Err(err) => {
let payload = QuorumNotMetPayload::from_err(&err);
Some(quorum_not_met_response(&payload))
}
},
Err(e) => {
tracing::warn!("fanout error (local committed): {e:?}");
None
}
}
}
pub(crate) fn resolve_caller_agent_id(
body: Option<&str>,
headers: &HeaderMap,
query: Option<&str>,
) -> Result<String, String> {
let header_val = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let resolved = crate::identity::resolve_http_agent_id(body, header_val)
.map_err(|e| crate::errors::msg::invalid("agent_id", e))?;
if let Some(claim) = query
&& !claim.is_empty()
{
validate::validate_agent_id(claim)
.map_err(|e| crate::errors::msg::invalid("agent_id", e))?;
if claim != resolved {
return Err(format!(
"agent_id_query_header_mismatch: query-supplied agent_id {claim:?} disagrees \
with authenticated header-resolved id {resolved:?}"
));
}
}
Ok(resolved)
}
#[cfg(feature = "sal")]
pub(crate) fn http_caller_ctx(
headers: &axum::http::HeaderMap,
body_agent_id: Option<&str>,
) -> crate::store::CallerContext {
let resolved = resolve_caller_agent_id(body_agent_id, headers, None).unwrap_or_else(|e| {
tracing::warn!(
target: "handlers::parity",
error = %e,
"http_caller_ctx: invalid X-Agent-Id / body.agent_id, falling back to anonymous:invalid"
);
crate::identity::sentinels::ANONYMOUS_INVALID.to_string()
});
crate::store::CallerContext::for_agent(resolved)
}
#[cfg(test)]
mod require_caller_owns_memory_tests {
use super::*;
use crate::models::{ConfidenceSource, Memory, MemoryKind, Tier};
use serde_json::json;
fn mem_with(metadata: serde_json::Value) -> Memory {
Memory {
id: "test-id".to_string(),
tier: Tier::Long,
namespace: "test-ns".to_string(),
title: "test".to_string(),
content: "test".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-05-20T00:00:00Z".to_string(),
updated_at: "2026-05-20T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[test]
fn owner_passes() {
let mem = mem_with(json!({"agent_id": "alice"}));
assert!(require_caller_owns_memory(&mem, "alice", false).is_none());
}
#[test]
fn non_owner_blocked() {
let mem = mem_with(json!({"agent_id": "alice"}));
assert!(require_caller_owns_memory(&mem, "bob", false).is_some());
}
#[test]
fn legacy_unowned_passes() {
let mem = mem_with(json!({}));
assert!(require_caller_owns_memory(&mem, "bob", false).is_none());
let mem = mem_with(json!({"agent_id": ""}));
assert!(require_caller_owns_memory(&mem, "bob", false).is_none());
}
#[test]
fn daemon_passes() {
let mem = mem_with(json!({"agent_id": "alice"}));
assert!(require_caller_owns_memory(&mem, "daemon", false).is_none());
}
#[test]
fn inbox_target_passes_when_allowed() {
let mem = mem_with(json!({
"agent_id": "alice",
"target_agent_id": "bob",
}));
assert!(require_caller_owns_memory(&mem, "bob", true).is_none());
}
#[test]
fn inbox_target_blocked_when_disallowed() {
let mem = mem_with(json!({
"agent_id": "alice",
"target_agent_id": "bob",
}));
assert!(require_caller_owns_memory(&mem, "bob", false).is_some());
}
#[test]
fn inbox_target_mismatch_blocked() {
let mem = mem_with(json!({
"agent_id": "alice",
"target_agent_id": "carol",
}));
assert!(require_caller_owns_memory(&mem, "bob", true).is_some());
}
}
#[must_use]
pub fn require_caller_owns_memory(
mem: &Memory,
caller: &str,
allow_inbox: bool,
) -> Option<axum::response::Response> {
let owner = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if owner.is_empty() || owner == caller || caller == crate::identity::sentinels::DAEMON_PRINCIPAL
{
return None;
}
if allow_inbox {
let target = mem
.metadata
.get("target_agent_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if !target.is_empty() && target == caller {
return None;
}
}
tracing::warn!(
target: super::AUTHZ_TRACE_TARGET,
"ownership-gate 403: caller {caller} != owner {owner} (id={})",
mem.id
);
Some(
(
StatusCode::FORBIDDEN,
Json(json!({
"error": "caller does not own this memory",
"owner": owner,
"caller": caller,
})),
)
.into_response(),
)
}