#![allow(clippy::too_many_lines)]
#[cfg(feature = "sal")]
use crate::models::field_names;
use axum::{
Json,
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
};
use serde_json::json;
use std::sync::OnceLock;
use ed25519_dalek::VerifyingKey;
use crate::federation::identity::chain::{CHAIN_HEADER, CertChain, DEFAULT_MAX_CHAIN_DEPTH};
use crate::federation::identity::credential::{CREDENTIAL_HEADER, SignedCredential};
use crate::federation::identity::trust_bundle::TrustBundle;
use crate::federation::signing as fed_signing;
#[cfg(feature = "sal")]
use super::AppState;
#[cfg(feature = "sal")]
use super::federation_receive::{
SyncPushBody, attribute_agent_for_quota, check_sender_clock_skew, next_utc_midnight,
};
#[cfg(feature = "sal")]
use crate::validate;
#[cfg(feature = "sal")]
#[allow(clippy::too_many_lines)]
pub(super) async fn sync_push_via_store(
app: AppState,
_headers: HeaderMap,
body: SyncPushBody,
) -> Response {
if let Err(e) = validate::validate_agent_id(&body.sender_agent_id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid sender_agent_id: {e}")})),
)
.into_response();
}
let cap = app.max_page_size;
if body.memories.len() > cap
|| body.deletions.len() > cap
|| body.archives.len() > cap
|| body.restores.len() > cap
|| body.pendings.len() > cap
|| body.pending_decisions.len() > cap
|| body.namespace_meta.len() > cap
|| body.namespace_meta_clears.len() > cap
|| body.links.len() > cap
|| body.embeddings.len() > cap
{
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!("sync_push limited to {cap} entries per subcollection")
})),
)
.into_response();
}
let ctx = crate::store::CallerContext::for_agent(body.sender_agent_id.clone());
let mut applied = 0usize;
let mut noop = 0usize;
let mut skipped = 0usize;
let mut deleted = 0usize;
let mut links_applied = 0usize;
let mut latest_seen: Option<String> = None;
let mut unsupported_on_postgres = 0usize;
let mut quota_refused = 0usize;
let mut first_quota_refusal: Option<crate::quotas::QuotaError> = None;
check_sender_clock_skew(&body.sender_agent_id, &body);
let receiver_dim = app
.embedder
.as_ref()
.as_ref()
.map(crate::embeddings::Embedder::dim);
let shipped_by_id: std::collections::HashMap<&str, &crate::federation::ShippedEmbedding> = body
.embeddings
.iter()
.map(|se| (se.memory_id.as_str(), se))
.collect();
let mut deferred_embed: Vec<(String, String)> = Vec::new();
for mem in &body.memories {
if let Err(e) = validate::RequestValidator::validate_memory(mem) {
tracing::warn!("sync_push: skipping memory {} ({}): {e}", mem.id, mem.title);
skipped += 1;
continue;
}
if latest_seen
.as_deref()
.is_none_or(|current| mem.updated_at.as_str() > current)
{
latest_seen = Some(mem.updated_at.clone());
}
if body.dry_run {
noop += 1;
continue;
}
let attribute_agent = attribute_agent_for_quota(&body.sender_agent_id, mem);
let bytes_estimate =
i64::try_from(mem.title.len() + mem.content.len() + mem.metadata.to_string().len())
.unwrap_or(i64::MAX);
{
let conn = app.db.lock().await;
match crate::quotas::check_and_record(
&conn.0,
&attribute_agent,
&mem.namespace,
crate::quotas::QuotaOp::Memory {
bytes: bytes_estimate,
},
) {
Ok(()) => {}
Err(crate::quotas::QuotaCheckError::Quota(q)) => {
tracing::warn!(
target: "federation::quota",
peer = %body.sender_agent_id,
attribute_agent = %attribute_agent,
limit = q.limit.as_str(),
current = q.current,
max = q.max,
"sync_push (postgres): per-agent quota exceeded"
);
let _ = crate::signed_events::append_signed_event(
&conn.0,
&crate::signed_events::SignedEvent::with_daemon_signature(
crate::signed_events::payload_hash(
format!(
"peer={} agent={} limit={} current={} max={}",
body.sender_agent_id,
attribute_agent,
q.limit.as_str(),
q.current,
q.max,
)
.as_bytes(),
),
attribute_agent.clone(),
"federation.quota_refused".to_string(),
chrono::Utc::now().to_rfc3339(),
),
);
quota_refused += 1;
if first_quota_refusal.is_none() {
first_quota_refusal = Some(q);
}
drop(conn);
break;
}
Err(crate::quotas::QuotaCheckError::Sql(e)) => {
tracing::warn!(
"sync_push (postgres): quota substrate read failed for {}: {e}",
attribute_agent
);
skipped += 1;
continue;
}
}
}
let local_cap = app
.store
.resolve_governance_policy(&mem.namespace)
.await
.ok()
.flatten()
.unwrap_or_else(crate::models::GovernancePolicy::default)
.effective_max_reflection_depth();
let to_insert = crate::federation::reflection_bookkeeping::stamp_reflection_origin(
mem,
&body.sender_agent_id,
local_cap,
);
match app.store.apply_remote_memory(&ctx, &to_insert).await {
Ok(applied_id) => {
applied += 1;
let clean_shipped = shipped_by_id
.get(mem.id.as_str())
.filter(|se| receiver_dim == Some(se.dim) && se.vector.len() == se.dim)
.and_then(|se| {
crate::federation::sanitize_shipped_vector(&se.vector)
.map(|v| (v, se.model.clone()))
});
match clean_shipped {
Some((vector, model)) => {
if let Err(e) = app
.store
.update_embedding(&ctx, &applied_id, Some(&vector))
.await
{
tracing::warn!(
"sync_push (postgres): storing shipped embedding failed \
for {applied_id} (model {model}): {e} — deferring local embed",
);
deferred_embed.push((
applied_id.clone(),
crate::embeddings::embedding_document(&mem.title, &mem.content),
));
}
}
None => deferred_embed.push((
applied_id.clone(),
crate::embeddings::embedding_document(&mem.title, &mem.content),
)),
}
if crate::audit::is_enabled() {
let owner = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or(&body.sender_agent_id);
crate::audit::emit(
crate::audit::EventBuilder::new(
crate::audit::AuditAction::Store,
crate::audit::actor(owner, "federation_push", None),
crate::audit::target_memory(
mem.id.clone(),
mem.namespace.clone(),
Some(mem.title.clone()),
Some(mem.tier.as_str().to_string()),
None,
),
)
.outcome(crate::audit::AuditOutcome::Allow),
);
}
}
Err(e) => {
{
let conn = app.db.lock().await;
let _ = crate::quotas::refund_op(
&conn.0,
&attribute_agent,
&mem.namespace,
crate::quotas::QuotaOp::Memory {
bytes: bytes_estimate,
},
);
}
tracing::warn!("sync_push: apply_remote_memory failed for {}: {e}", mem.id);
skipped += 1;
}
}
}
if let Some(q) = first_quota_refusal.take() {
spawn_deferred_embedding_refresh_via_store(&app, &ctx, deferred_embed);
let reset_at = next_utc_midnight();
return (
StatusCode::TOO_MANY_REQUESTS,
[
("x-quota-reset-at", reset_at.as_str()),
("x-quota-limit", q.limit.as_str()),
],
Json(json!({
"error": "QUOTA_EXCEEDED",
"limit": q.limit.as_str(),
"current": q.current,
"max": q.max,
"agent_id": q.agent_id,
"applied_before_refusal": applied,
(crate::handlers::QUOTA_REFUSED_FIELD): quota_refused,
"reset_at": reset_at,
(field_names::STORAGE_BACKEND): "postgres",
})),
)
.into_response();
}
for del_id in &body.deletions {
if validate::validate_id(del_id).is_err() {
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
match app.store.apply_remote_deletion(&ctx, del_id).await {
Ok(true) => deleted += 1,
Ok(false) => noop += 1,
Err(e) => {
tracing::warn!("sync_push: apply_remote_deletion failed for {del_id}: {e}");
skipped += 1;
}
}
}
for link in &body.links {
if validate::RequestValidator::validate_link_triple(
&link.source_id,
&link.target_id,
link.relation.as_str(),
)
.is_err()
{
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
let attest_level = match (link.signature.as_deref(), link.observed_by.as_deref()) {
(Some(sig_bytes), Some(observed_by)) => {
match crate::identity::verify::lookup_peer_public_key(observed_by) {
Some(pubkey) => {
let signable = crate::identity::sign::SignableLink {
src_id: &link.source_id,
dst_id: &link.target_id,
relation: link.relation.as_str(),
observed_by: Some(observed_by),
valid_from: link.valid_from.as_deref(),
valid_until: link.valid_until.as_deref(),
};
match crate::identity::verify::verify(&pubkey, &signable, sig_bytes) {
Ok(()) => crate::models::AttestLevel::PeerAttested.as_str(),
Err(e) => {
tracing::warn!(
"sync_push: signature rejected for link \
({} -> {} / {}) from observed_by={}: {e}",
link.source_id,
link.target_id,
link.relation,
observed_by
);
skipped += 1;
continue;
}
}
}
None => crate::models::AttestLevel::Unsigned.as_str(),
}
}
_ => crate::models::AttestLevel::Unsigned.as_str(),
};
match app.store.apply_remote_link(&ctx, link, attest_level).await {
Ok(()) => links_applied += 1,
Err(e) => {
tracing::warn!(
"sync_push: apply_remote_link failed ({} -> {} / {}): {e}",
link.source_id,
link.target_id,
link.relation
);
skipped += 1;
}
}
}
unsupported_on_postgres += body.archives.len()
+ body.restores.len()
+ body.pendings.len()
+ body.pending_decisions.len()
+ body.namespace_meta.len()
+ body.namespace_meta_clears.len();
spawn_deferred_embedding_refresh_via_store(&app, &ctx, deferred_embed);
(
StatusCode::OK,
Json(json!({
"applied": applied,
"deleted": deleted,
"links_applied": links_applied,
"noop": noop,
"skipped": skipped,
(crate::handlers::QUOTA_REFUSED_FIELD): quota_refused,
"unsupported_on_postgres": unsupported_on_postgres,
"dry_run": body.dry_run,
"receiver_agent_id": body.sender_agent_id,
(field_names::STORAGE_BACKEND): "postgres",
"note": "pendings / archives / restores / namespace_meta are sqlite-only \
in v0.7.0; memories / deletions / links round-trip via the SAL trait",
})),
)
.into_response()
}
#[cfg(feature = "sal")]
fn spawn_deferred_embedding_refresh_via_store(
app: &AppState,
ctx: &crate::store::CallerContext,
rows: Vec<(String, String)>,
) {
if rows.is_empty() || app.embedder.as_ref().as_ref().is_none() {
return;
}
let store = app.store.clone();
let embedder = app.embedder.clone();
let ctx = ctx.clone();
tokio::spawn(async move {
for (id, text) in rows {
let emb = embedder.clone();
let embed_res =
tokio::task::spawn_blocking(move || emb.as_ref().as_ref().map(|e| e.embed(&text)))
.await;
let vec = match embed_res {
Ok(Some(Ok(v))) => v,
Ok(Some(Err(e))) => {
tracing::warn!("sync_push (postgres): deferred embed failed for {id}: {e}");
continue;
}
Ok(None) => return,
Err(e) => {
tracing::warn!("sync_push (postgres): deferred embed join error for {id}: {e}");
continue;
}
};
if let Err(e) = store.update_embedding(&ctx, &id, Some(&vec)).await {
tracing::warn!(
"sync_push (postgres): deferred update_embedding failed for {id}: {e}"
);
}
}
});
}
fn cached_trust_bundle() -> &'static TrustBundle {
static BUNDLE: OnceLock<TrustBundle> = OnceLock::new();
BUNDLE.get_or_init(|| {
TrustBundle::load_from_env().unwrap_or_else(|e| {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
error = %e,
"failed to load federation trust bundle; continuing with legacy per-peer verify"
);
TrustBundle::new()
})
})
}
fn resolve_peer_verifying_key(
headers: &HeaderMap,
peer_id: Option<&str>,
bundle: &TrustBundle,
now_unix: i64,
) -> Option<VerifyingKey> {
crate::metrics::record_federation_inbound_cred(headers.contains_key(CREDENTIAL_HEADER));
if !bundle.is_empty()
&& let Some(cred_value) = headers.get(CREDENTIAL_HEADER).and_then(|v| v.to_str().ok())
{
let chain_value = headers.get(CHAIN_HEADER).and_then(|v| v.to_str().ok());
match verify_credential_pubkey(cred_value, chain_value, peer_id, bundle, now_unix) {
Some(vk) => {
crate::metrics::record_federation_cred_verify(true);
return Some(vk);
}
None => crate::metrics::record_federation_cred_verify(false),
}
}
peer_id.and_then(|pid| {
crate::governance::audit::load_daemon_verifying_key(pid)
.ok()
.flatten()
})
}
fn verify_credential_pubkey(
cred_value: &str,
chain_value: Option<&str>,
peer_id: Option<&str>,
bundle: &TrustBundle,
now_unix: i64,
) -> Option<VerifyingKey> {
let leaf = match SignedCredential::from_header_value(cred_value) {
Ok(s) => s,
Err(e) => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = e.tag(),
peer_id = %peer_id.unwrap_or(""),
"sync: X-Memory-Cred malformed; falling back to legacy per-peer verify"
);
return None;
}
};
let intermediates = match chain_value {
Some(value) => match CertChain::intermediates_from_header_value(value) {
Ok(inters) => inters,
Err(e) => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = e.tag(),
peer_id = %peer_id.unwrap_or(""),
"sync: X-Memory-Cred-Chain malformed; falling back to legacy"
);
return None;
}
},
None => Vec::new(),
};
let chain = CertChain::new(leaf, intermediates);
let cred = match chain.verify(bundle, now_unix, DEFAULT_MAX_CHAIN_DEPTH) {
Ok(c) => c,
Err(e) => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = e.tag(),
peer_id = %peer_id.unwrap_or(""),
"sync: X-Memory-Cred chain failed trust-bundle verification; falling back to legacy"
);
return None;
}
};
if peer_id != Some(cred.subject_agent_id.as_str()) {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
cred_subject = %cred.subject_agent_id,
"sync: X-Memory-Cred subject does not match X-Peer-Id; refusing credential binding"
);
return None;
}
match cred.subject_verifying_key() {
Ok(vk) => Some(vk),
Err(e) => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = e.tag(),
peer_id = %peer_id.unwrap_or(""),
"sync: X-Memory-Cred subject key malformed; falling back to legacy"
);
None
}
}
}
pub(super) fn verify_signature_or_reject(
headers: &HeaderMap,
body_bytes: &[u8],
peer_id: Option<&str>,
federation_nonce_cache: &crate::identity::replay::FederationNonceCache,
) -> Option<Response> {
if !fed_signing::require_sig() {
return None;
}
let sig_header = headers
.get(fed_signing::SIGNATURE_HEADER)
.and_then(|v| v.to_str().ok());
let nonce_header = headers
.get(fed_signing::NONCE_HEADER)
.and_then(|v| v.to_str().ok());
let pubkey = resolve_peer_verifying_key(
headers,
peer_id,
cached_trust_bundle(),
chrono::Utc::now().timestamp(),
);
match (sig_header, pubkey.as_ref()) {
(Some(sig), Some(pk)) => {
let verify_result = if let Some(nonce) = nonce_header {
fed_signing::verify_header_with_nonce(Some(sig), body_bytes, nonce, pk)
} else {
fed_signing::verify_header(Some(sig), body_bytes, pk)
};
if let Err(e) = verify_result {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = e.tag(),
peer_id = %peer_id.unwrap_or(""),
"sync_push: X-Memory-Sig verification failed"
);
return Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": e.tag(),
"note": "AI_MEMORY_FED_REQUIRE_SIG=1 enforces per-message Ed25519 \
signatures on /sync/push; set =0 to revert to v0.6.x \
permissive",
})),
)
.into_response(),
);
}
let pid_for_cache = peer_id.unwrap_or("");
match nonce_header {
Some(nonce) if !nonce.is_empty() => {
match federation_nonce_cache.record_and_check(pid_for_cache, nonce) {
crate::identity::replay::ReplayDecision::Fresh => None,
crate::identity::replay::ReplayDecision::Replay => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = fed_signing::VerifyError::ReplayedNonce.tag(),
peer_id = %pid_for_cache,
"sync_push: X-Memory-Nonce replay detected"
);
Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": fed_signing::VerifyError::ReplayedNonce.tag(),
"note": "AI_MEMORY_FED_REQUIRE_NONCE=1 enforces per-message nonce freshness.",
})),
)
.into_response(),
)
}
}
}
_ => {
if fed_signing::require_nonce() {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = fed_signing::VerifyError::NonceMissing.tag(),
peer_id = %pid_for_cache,
"sync_push: X-Memory-Nonce header absent — strict refusal"
);
Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": fed_signing::VerifyError::NonceMissing.tag(),
"note": "AI_MEMORY_FED_REQUIRE_NONCE=1 requires X-Memory-Nonce; set =0 to bypass.",
})),
)
.into_response(),
)
} else {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %pid_for_cache,
"sync_push: X-Memory-Nonce absent — permissive, accepting"
);
None
}
}
}
}
(Some(_), None) => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
"sync_push: X-Memory-Sig present but no enrolled public key for peer-id"
);
Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": "x_memory_sig_no_enrolled_key",
"note": "AI_MEMORY_FED_REQUIRE_SIG=1 and the peer sent a signature, \
but no public key is enrolled for the peer-id; enrol via \
`ai-memory identity import` or set =0 to bypass.",
})),
)
.into_response(),
)
}
(None, Some(_)) => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
"sync_push: enrolled peer omitted X-Memory-Sig header"
);
Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": fed_signing::VerifyError::Missing.tag(),
"note": "AI_MEMORY_FED_REQUIRE_SIG=1 enforces per-message Ed25519 \
signatures for enrolled peers; set =0 to revert to v0.6.x \
permissive.",
})),
)
.into_response(),
)
}
(None, None) => {
if require_peer_enrollment_enabled() {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
"sync_push: refusing unenrolled peer-id (AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT=1 #1088)"
);
return Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": "peer_not_enrolled",
"note": "AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT=1 refuses \
X-Peer-Id without an enrolled key (#1088). Enroll \
the peer's Ed25519 key via the operator workflow, \
or unset the env var to revert to permissive.",
})),
)
.into_response(),
);
}
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
"sync_push: unsigned (no enrolled key for peer-id) — strict enforcement skipped"
);
None
}
}
}
fn require_peer_enrollment_enabled() -> bool {
std::env::var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
#[must_use]
pub(super) fn canonical_get_bytes(method: &str, path: &str, query: &str) -> Vec<u8> {
let mut out = Vec::with_capacity(method.len() + path.len() + query.len() + 2);
out.extend_from_slice(method.as_bytes());
out.push(b'\n');
out.extend_from_slice(path.as_bytes());
out.push(b'\n');
out.extend_from_slice(query.as_bytes());
out
}
pub(super) fn verify_get_signature_or_reject(
method: &str,
path: &str,
query: &str,
headers: &HeaderMap,
peer_id: Option<&str>,
federation_nonce_cache: &crate::identity::replay::FederationNonceCache,
) -> Option<Response> {
if !fed_signing::require_sig() {
return None;
}
let sig_header = headers
.get(fed_signing::SIGNATURE_HEADER)
.and_then(|v| v.to_str().ok());
let nonce_header = headers
.get(fed_signing::NONCE_HEADER)
.and_then(|v| v.to_str().ok());
let pubkey = resolve_peer_verifying_key(
headers,
peer_id,
cached_trust_bundle(),
chrono::Utc::now().timestamp(),
);
let canonical = canonical_get_bytes(method, path, query);
match (sig_header, pubkey.as_ref()) {
(Some(sig), Some(pk)) => {
let verify_result = if let Some(nonce) = nonce_header {
fed_signing::verify_header_with_nonce(Some(sig), &canonical, nonce, pk)
} else {
fed_signing::verify_header(Some(sig), &canonical, pk)
};
if let Err(e) = verify_result {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = e.tag(),
peer_id = %peer_id.unwrap_or(""),
"sync_since: X-Memory-Sig verification failed"
);
return Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": e.tag(),
"note": "AI_MEMORY_FED_REQUIRE_SIG=1 enforces per-message Ed25519 \
signatures on /sync/since; set =0 to revert to v0.6.x \
permissive (#1031)",
})),
)
.into_response(),
);
}
let pid_for_cache = peer_id.unwrap_or("");
match nonce_header {
Some(nonce) if !nonce.is_empty() => {
match federation_nonce_cache.record_and_check(pid_for_cache, nonce) {
crate::identity::replay::ReplayDecision::Fresh => None,
crate::identity::replay::ReplayDecision::Replay => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = fed_signing::VerifyError::ReplayedNonce.tag(),
peer_id = %pid_for_cache,
"sync_since: X-Memory-Nonce replay detected"
);
Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": fed_signing::VerifyError::ReplayedNonce.tag(),
"note": "AI_MEMORY_FED_REQUIRE_NONCE=1 enforces per-message nonce freshness on /sync/since (#1031).",
})),
)
.into_response(),
)
}
}
}
_ => {
if fed_signing::require_nonce() {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
tag = fed_signing::VerifyError::NonceMissing.tag(),
peer_id = %pid_for_cache,
"sync_since: X-Memory-Nonce header absent — strict refusal"
);
Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": fed_signing::VerifyError::NonceMissing.tag(),
"note": "AI_MEMORY_FED_REQUIRE_NONCE=1 requires X-Memory-Nonce on /sync/since (#1031); set =0 to bypass.",
})),
)
.into_response(),
)
} else {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %pid_for_cache,
"sync_since: X-Memory-Nonce absent — permissive, accepting"
);
None
}
}
}
}
(Some(_), None) => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
"sync_since: X-Memory-Sig present but no enrolled public key for peer-id (#1031)"
);
Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": "x_memory_sig_no_enrolled_key",
"note": "AI_MEMORY_FED_REQUIRE_SIG=1 and the peer sent a signature, \
but no public key is enrolled for the peer-id; enrol via \
`ai-memory identity import` or set =0 to bypass (#1031).",
})),
)
.into_response(),
)
}
(None, Some(_)) => {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
"sync_since: enrolled peer omitted X-Memory-Sig header (#1031)"
);
Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": fed_signing::VerifyError::Missing.tag(),
"note": "AI_MEMORY_FED_REQUIRE_SIG=1 enforces per-message Ed25519 \
signatures on /sync/since for enrolled peers; set =0 to revert \
to v0.6.x permissive (#1031).",
})),
)
.into_response(),
)
}
(None, None) => {
if require_peer_enrollment_enabled() {
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
"sync_since: refusing unenrolled peer-id (AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT=1 #1088)"
);
return Some(
(
StatusCode::UNAUTHORIZED,
Json(json!({
"error": "peer_not_enrolled",
"note": "AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT=1 refuses \
X-Peer-Id without an enrolled key on /sync/since (#1088).",
})),
)
.into_response(),
);
}
tracing::warn!(
target: crate::federation::SIGNING_TRACE_TARGET,
peer_id = %peer_id.unwrap_or(""),
"sync_since: unsigned (no enrolled key for peer-id) — strict enforcement skipped"
);
None
}
}
}
#[cfg(test)]
mod verify_arm_tests {
use super::{
canonical_get_bytes, require_peer_enrollment_enabled, verify_get_signature_or_reject,
verify_signature_or_reject,
};
use crate::federation::signing::{
ED25519_PREFIX, REQUIRE_NONCE_ENV, REQUIRE_SIG_ENV, SIGNATURE_HEADER,
};
use crate::identity::replay::FederationNonceCache;
use axum::http::{HeaderMap, HeaderValue};
use base64::Engine as _;
fn env_lock() -> std::sync::MutexGuard<'static, ()> {
use std::sync::{Mutex, OnceLock};
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
fn sig_header_value() -> HeaderValue {
let b64 = base64::engine::general_purpose::STANDARD.encode([0u8; 64]);
HeaderValue::from_str(&format!("{ED25519_PREFIX}{b64}")).unwrap()
}
#[test]
fn canonical_get_bytes_joins_with_newlines() {
let out = canonical_get_bytes("GET", "/api/v1/sync/since", "limit=10");
assert_eq!(out, b"GET\n/api/v1/sync/since\nlimit=10");
let out2 = canonical_get_bytes("GET", "/p", "");
assert_eq!(out2, b"GET\n/p\n");
}
#[test]
fn require_peer_enrollment_env_arms() {
let _g = env_lock();
unsafe {
std::env::remove_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT");
}
assert!(!require_peer_enrollment_enabled(), "unset → permissive");
unsafe {
std::env::set_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT", "1");
}
assert!(require_peer_enrollment_enabled(), "1 → strict");
unsafe {
std::env::set_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT", "true");
}
assert!(require_peer_enrollment_enabled(), "true → strict");
unsafe {
std::env::remove_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT");
}
}
#[test]
fn push_require_sig_off_bypasses() {
let _g = env_lock();
unsafe {
std::env::set_var(REQUIRE_SIG_ENV, "0");
}
let cache = FederationNonceCache::default();
let out = verify_signature_or_reject(&HeaderMap::new(), b"{}", Some("peer-x"), &cache);
unsafe {
std::env::remove_var(REQUIRE_SIG_ENV);
}
assert!(out.is_none(), "REQUIRE_SIG=0 bypasses every branch");
}
#[test]
fn push_sig_present_no_enrolled_key_refuses() {
let _g = env_lock();
unsafe {
std::env::set_var(REQUIRE_SIG_ENV, "1");
}
let mut headers = HeaderMap::new();
headers.insert(SIGNATURE_HEADER, sig_header_value());
let cache = FederationNonceCache::default();
let out = verify_signature_or_reject(&headers, b"{}", Some("no-such-peer"), &cache);
unsafe {
std::env::remove_var(REQUIRE_SIG_ENV);
}
let resp = out.expect("sig-present + no-key MUST refuse");
assert_eq!(resp.status(), axum::http::StatusCode::UNAUTHORIZED);
}
#[test]
fn push_none_none_permissive_when_not_strict() {
let _g = env_lock();
unsafe {
std::env::set_var(REQUIRE_SIG_ENV, "1");
std::env::remove_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT");
}
let cache = FederationNonceCache::default();
let out = verify_signature_or_reject(&HeaderMap::new(), b"{}", Some("unenrolled"), &cache);
unsafe {
std::env::remove_var(REQUIRE_SIG_ENV);
}
assert!(out.is_none(), "(None,None) permissive arm must allow");
}
#[test]
fn push_none_none_strict_refuses_1088() {
let _g = env_lock();
unsafe {
std::env::set_var(REQUIRE_SIG_ENV, "1");
std::env::set_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT", "1");
}
let cache = FederationNonceCache::default();
let out = verify_signature_or_reject(&HeaderMap::new(), b"{}", Some("unenrolled"), &cache);
unsafe {
std::env::remove_var(REQUIRE_SIG_ENV);
std::env::remove_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT");
}
let resp = out.expect("#1088 strict must refuse unenrolled peer");
assert_eq!(resp.status(), axum::http::StatusCode::UNAUTHORIZED);
}
#[test]
fn get_require_sig_off_bypasses() {
let _g = env_lock();
unsafe {
std::env::set_var(REQUIRE_SIG_ENV, "0");
}
let cache = FederationNonceCache::default();
let out = verify_get_signature_or_reject(
"GET",
"/api/v1/sync/since",
"limit=10",
&HeaderMap::new(),
Some("peer-x"),
&cache,
);
unsafe {
std::env::remove_var(REQUIRE_SIG_ENV);
}
assert!(out.is_none());
}
#[test]
fn get_sig_present_no_enrolled_key_refuses() {
let _g = env_lock();
unsafe {
std::env::set_var(REQUIRE_SIG_ENV, "1");
std::env::set_var(REQUIRE_NONCE_ENV, "0");
}
let mut headers = HeaderMap::new();
headers.insert(SIGNATURE_HEADER, sig_header_value());
let cache = FederationNonceCache::default();
let out = verify_get_signature_or_reject(
"GET",
"/api/v1/sync/since",
"limit=10",
&headers,
Some("no-such-peer"),
&cache,
);
unsafe {
std::env::remove_var(REQUIRE_SIG_ENV);
std::env::remove_var(REQUIRE_NONCE_ENV);
}
let resp = out.expect("get sig-present + no-key MUST refuse");
assert_eq!(resp.status(), axum::http::StatusCode::UNAUTHORIZED);
}
#[test]
fn get_none_none_strict_refuses_1088() {
let _g = env_lock();
unsafe {
std::env::set_var(REQUIRE_SIG_ENV, "1");
std::env::set_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT", "1");
}
let cache = FederationNonceCache::default();
let out = verify_get_signature_or_reject(
"GET",
"/api/v1/sync/since",
"limit=10",
&HeaderMap::new(),
Some("unenrolled"),
&cache,
);
unsafe {
std::env::remove_var(REQUIRE_SIG_ENV);
std::env::remove_var("AI_MEMORY_FED_REQUIRE_PEER_ENROLLMENT");
}
let resp = out.expect("#1088 strict must refuse on GET path");
assert_eq!(resp.status(), axum::http::StatusCode::UNAUTHORIZED);
}
}
#[cfg(all(test, feature = "sal"))]
mod sal_boundary_961_tests {
use crate::models::GovernancePolicy;
use crate::store::MemoryStore;
#[tokio::test]
async fn fresh_sqlite_store_resolve_governance_policy_returns_none_so_fallback_uses_default() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let path = tmp.path().to_path_buf();
std::mem::forget(tmp);
let store = crate::store::sqlite::SqliteStore::open(&path).expect("open SqliteStore");
let policy_opt = store
.resolve_governance_policy("any/ns")
.await
.expect("resolve_governance_policy");
assert!(policy_opt.is_none(), "fresh DB must have no policy");
let local_cap = policy_opt
.unwrap_or_else(GovernancePolicy::default)
.effective_max_reflection_depth();
assert_eq!(
local_cap,
GovernancePolicy::default().effective_max_reflection_depth(),
"fallback cap must match the compiled default (parity with pre-#961 behavior)"
);
}
}
#[cfg(test)]
mod fed_p2d_credential_resolution_tests {
use super::{resolve_peer_verifying_key, verify_credential_pubkey};
use crate::federation::identity::credential::CREDENTIAL_HEADER;
use crate::federation::identity::issuer::{FederationIssuer, IssuerConfig};
use crate::federation::identity::trust_bundle::TrustBundle;
use axum::http::{HeaderMap, HeaderValue};
use ed25519_dalek::SigningKey;
const TEST_ISSUER_ID: &str = "ca:test-issuer";
const TEST_TRUST_DOMAIN: &str = "test.federation.local";
const TEST_PEER_ID: &str = "host:peer-a";
const TEST_NOW_UNIX: i64 = 1_700_000_000;
fn fixed_signing_key(seed: u8) -> SigningKey {
SigningKey::from_bytes(&[seed; 32])
}
fn test_issuer() -> FederationIssuer {
FederationIssuer::new(
fixed_signing_key(7),
IssuerConfig::new(TEST_ISSUER_ID, TEST_TRUST_DOMAIN),
)
}
fn headers_with_cred(value: &str) -> HeaderMap {
let mut headers = HeaderMap::new();
headers.insert(
CREDENTIAL_HEADER,
HeaderValue::from_str(value).expect("valid header value"),
);
headers
}
#[test]
fn credential_path_returns_subject_key_when_issuer_trusted() {
let issuer = test_issuer();
let subject_signing = fixed_signing_key(42);
let subject_pub = subject_signing.verifying_key();
let signed = issuer
.issue(TEST_PEER_ID, &subject_pub, TEST_NOW_UNIX)
.expect("issue credential");
let bundle = TrustBundle::new().with_issuer(TEST_ISSUER_ID, issuer.verifying_key());
let resolved = verify_credential_pubkey(
&signed.to_header_value().expect("encode credential"),
None,
Some(TEST_PEER_ID),
&bundle,
TEST_NOW_UNIX,
);
assert_eq!(
resolved.map(|k| k.to_bytes()),
Some(subject_pub.to_bytes()),
"trusted credential must resolve to the subject's key"
);
}
#[test]
fn resolve_uses_credential_when_bundle_trusts_issuer() {
let issuer = test_issuer();
let subject_signing = fixed_signing_key(99);
let subject_pub = subject_signing.verifying_key();
let signed = issuer
.issue(TEST_PEER_ID, &subject_pub, TEST_NOW_UNIX)
.expect("issue credential");
let bundle = TrustBundle::new().with_issuer(TEST_ISSUER_ID, issuer.verifying_key());
let headers = headers_with_cred(&signed.to_header_value().expect("encode"));
let resolved =
resolve_peer_verifying_key(&headers, Some(TEST_PEER_ID), &bundle, TEST_NOW_UNIX);
assert_eq!(
resolved.map(|k| k.to_bytes()),
Some(subject_pub.to_bytes()),
"resolver must prefer the trusted credential's subject key"
);
}
#[test]
fn empty_bundle_falls_through_to_legacy_path() {
let issuer = test_issuer();
let subject_pub = fixed_signing_key(13).verifying_key();
let signed = issuer
.issue(TEST_PEER_ID, &subject_pub, TEST_NOW_UNIX)
.expect("issue credential");
let headers = headers_with_cred(&signed.to_header_value().expect("encode"));
let resolved = resolve_peer_verifying_key(
&headers,
Some(TEST_PEER_ID),
&TrustBundle::new(),
TEST_NOW_UNIX,
);
assert!(
resolved.is_none(),
"empty bundle must defer to the legacy path (no enrolment → None)"
);
}
#[test]
fn absent_credential_falls_through_to_legacy_path() {
let issuer = test_issuer();
let bundle = TrustBundle::new().with_issuer(TEST_ISSUER_ID, issuer.verifying_key());
let resolved = resolve_peer_verifying_key(
&HeaderMap::new(),
Some(TEST_PEER_ID),
&bundle,
TEST_NOW_UNIX,
);
assert!(
resolved.is_none(),
"no X-Memory-Cred header must defer to the legacy path"
);
}
#[test]
fn wrong_subject_credential_is_refused() {
let issuer = test_issuer();
let other_subject_pub = fixed_signing_key(55).verifying_key();
let signed = issuer
.issue("host:peer-b", &other_subject_pub, TEST_NOW_UNIX)
.expect("issue credential for peer-b");
let bundle = TrustBundle::new().with_issuer(TEST_ISSUER_ID, issuer.verifying_key());
let resolved = verify_credential_pubkey(
&signed.to_header_value().expect("encode"),
None,
Some(TEST_PEER_ID),
&bundle,
TEST_NOW_UNIX,
);
assert!(
resolved.is_none(),
"credential whose subject != peer-id must be refused (no cross-binding)"
);
}
#[test]
fn untrusted_issuer_credential_is_refused() {
let issuer = test_issuer();
let subject_pub = fixed_signing_key(21).verifying_key();
let signed = issuer
.issue(TEST_PEER_ID, &subject_pub, TEST_NOW_UNIX)
.expect("issue credential");
let bundle = TrustBundle::new()
.with_issuer("ca:other-issuer", fixed_signing_key(200).verifying_key());
let resolved = verify_credential_pubkey(
&signed.to_header_value().expect("encode"),
None,
Some(TEST_PEER_ID),
&bundle,
TEST_NOW_UNIX,
);
assert!(
resolved.is_none(),
"credential from an untrusted issuer must be refused"
);
}
#[test]
fn malformed_credential_value_is_refused() {
let issuer = test_issuer();
let bundle = TrustBundle::new().with_issuer(TEST_ISSUER_ID, issuer.verifying_key());
let resolved = verify_credential_pubkey(
"v1=not-valid-base64-cbor!!!",
None,
Some(TEST_PEER_ID),
&bundle,
TEST_NOW_UNIX,
);
assert!(
resolved.is_none(),
"a malformed X-Memory-Cred value must be refused, not panic"
);
}
#[test]
fn hierarchical_chain_resolves_leaf_key_against_root_only_bundle() {
use crate::federation::identity::chain::CHAIN_HEADER;
let root = FederationIssuer::new(
fixed_signing_key(70),
IssuerConfig::new("ca:root", TEST_TRUST_DOMAIN),
);
let region = FederationIssuer::new(
fixed_signing_key(71),
IssuerConfig::new("region/nyc/ca", TEST_TRUST_DOMAIN),
);
let leaf_id = "region/nyc/peer-a";
let anchor = root
.issue_intermediate(region.issuer_id(), ®ion.verifying_key(), TEST_NOW_UNIX)
.expect("issue intermediate");
let node_pub = fixed_signing_key(72).verifying_key();
let chain = region
.issue_chained(leaf_id, &node_pub, vec![anchor], TEST_NOW_UNIX)
.expect("issue chained");
let bundle = TrustBundle::new().with_issuer("ca:root", root.verifying_key());
let mut headers = HeaderMap::new();
headers.insert(
CREDENTIAL_HEADER,
HeaderValue::from_str(&chain.leaf.to_header_value().expect("encode leaf"))
.expect("header"),
);
headers.insert(
CHAIN_HEADER,
HeaderValue::from_str(
&chain
.intermediates_header_value()
.expect("encode chain")
.expect("two-level chain emits header"),
)
.expect("header"),
);
let resolved = resolve_peer_verifying_key(&headers, Some(leaf_id), &bundle, TEST_NOW_UNIX);
assert_eq!(
resolved.map(|k| k.to_bytes()),
Some(node_pub.to_bytes()),
"root-only bundle must resolve the leaf key through the presented chain"
);
}
#[test]
fn hierarchical_chain_with_untrusted_root_is_refused() {
use crate::federation::identity::chain::CHAIN_HEADER;
let rogue_root = FederationIssuer::new(
fixed_signing_key(80),
IssuerConfig::new("ca:root", TEST_TRUST_DOMAIN),
);
let region = FederationIssuer::new(
fixed_signing_key(81),
IssuerConfig::new("region/nyc/ca", TEST_TRUST_DOMAIN),
);
let anchor = rogue_root
.issue_intermediate(region.issuer_id(), ®ion.verifying_key(), TEST_NOW_UNIX)
.expect("issue intermediate");
let node_pub = fixed_signing_key(82).verifying_key();
let chain = region
.issue_chained(TEST_PEER_ID, &node_pub, vec![anchor], TEST_NOW_UNIX)
.expect("issue chained");
let genuine_root = FederationIssuer::new(
fixed_signing_key(83),
IssuerConfig::new("ca:root", TEST_TRUST_DOMAIN),
);
let bundle = TrustBundle::new().with_issuer("ca:root", genuine_root.verifying_key());
let mut headers = HeaderMap::new();
headers.insert(
CREDENTIAL_HEADER,
HeaderValue::from_str(&chain.leaf.to_header_value().expect("encode leaf"))
.expect("header"),
);
headers.insert(
CHAIN_HEADER,
HeaderValue::from_str(
&chain
.intermediates_header_value()
.expect("encode chain")
.expect("header present"),
)
.expect("header"),
);
let resolved =
resolve_peer_verifying_key(&headers, Some(TEST_PEER_ID), &bundle, TEST_NOW_UNIX);
assert!(
resolved.is_none(),
"a chain anchored in an untrusted root must be refused"
);
}
}