use axum::{
Json,
body::Bytes,
extract::State,
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
};
use serde::Deserialize;
use serde_json::json;
use crate::db;
use crate::federation::peer_attestation::{
self, AttestError, PEER_ID_HEADER, PeerAttestationConfig,
};
use crate::models::{Memory, MemoryLink};
use crate::validate;
use super::AppState;
#[cfg(feature = "sal")]
use super::StorageBackend;
#[cfg(feature = "sal")]
use super::federation_signing_check::sync_push_via_store;
use super::federation_signing_check::verify_signature_or_reject;
const ATTESTATION_TRACE_TARGET: &str = "federation::attestation";
pub(super) fn extract_peer_id(headers: &HeaderMap) -> Option<&str> {
let raw = headers.get(PEER_ID_HEADER).and_then(|v| v.to_str().ok())?;
if crate::validate::validate_agent_id(raw).is_err() {
tracing::warn!(
target: "federation::peer_id",
"extract_peer_id: dropped malformed X-Peer-Id header (#1049 validation gate)"
);
return None;
}
Some(raw)
}
fn attestation_refusal_response(err: &AttestError) -> Response {
let (claimed, peer_header) = match err {
AttestError::HeaderMissing => (String::new(), String::new()),
AttestError::Mismatch {
claimed,
peer_header,
} => (claimed.clone(), peer_header.clone()),
};
(
StatusCode::FORBIDDEN,
Json(json!({
"error": err.tag(),
"claimed": claimed,
"peer_header": peer_header,
"note": "set AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1 to opt out (legacy peers); \
pre-v0.7.0 federation peers must be upgraded to send `x-peer-id`.",
})),
)
.into_response()
}
const CLOCK_SKEW_WARN_THRESHOLD_SECS: i64 = 60;
pub(super) fn check_sender_clock_skew(sender_agent_id: &str, body: &SyncPushBody) {
let sender_ts_str: Option<&str> = body
.sender_wall_clock
.as_deref()
.or_else(|| body.sender_clock.entries.values().max().map(String::as_str));
let Some(ts_str) = sender_ts_str else {
return; };
let Ok(sender_at) = chrono::DateTime::parse_from_rfc3339(ts_str) else {
tracing::debug!(
sender = %sender_agent_id,
sender_ts = %ts_str,
"sync_push: sender clock not RFC3339; skipping skew check"
);
return;
};
let now = chrono::Utc::now();
let skew_secs = sender_at
.with_timezone(&chrono::Utc)
.signed_duration_since(now)
.num_seconds();
if skew_secs.abs() > CLOCK_SKEW_WARN_THRESHOLD_SECS {
tracing::warn!(
target: "federation::clock_skew",
sender = %sender_agent_id,
skew_secs,
sender_ts = %ts_str,
receiver_ts = %now.to_rfc3339(),
"sync_push: sender_clock skew exceeds {CLOCK_SKEW_WARN_THRESHOLD_SECS}s threshold \
(observability-only; push accepted)",
);
}
}
pub(super) fn attribute_agent_for_quota(sender_agent_id: &str, mem: &Memory) -> String {
mem.metadata
.get("agent_id")
.and_then(serde_json::Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| sender_agent_id.to_string())
}
pub(super) fn next_utc_midnight() -> String {
use chrono::{Duration, Timelike};
let now = chrono::Utc::now();
let next = now
.with_hour(0)
.and_then(|t| t.with_minute(0))
.and_then(|t| t.with_second(0))
.and_then(|t| t.with_nanosecond(0))
.map(|midnight_today| midnight_today + Duration::days(1))
.unwrap_or_else(|| now + Duration::days(1));
next.to_rfc3339()
}
fn spawn_deferred_embedding_refresh(app: &AppState, rows: Vec<(String, String)>) {
if rows.is_empty() || app.embedder.as_ref().as_ref().is_none() {
return;
}
let db = app.db.clone();
let embedder = app.embedder.clone();
let vector_index = app.vector_index.clone();
tokio::spawn(async move {
for (id, text) in rows {
let emb = embedder.clone();
let embed_res =
tokio::task::spawn_blocking(move || emb.as_ref().as_ref().map(|e| e.embed(&text)))
.await;
let vec = match embed_res {
Ok(Some(Ok(v))) => v,
Ok(Some(Err(e))) => {
tracing::warn!("sync_push: deferred embed failed for {id}: {e}");
continue;
}
Ok(None) => return,
Err(e) => {
tracing::warn!("sync_push: deferred embed join error for {id}: {e}");
continue;
}
};
{
let lock = db.lock().await;
if let Err(e) = db::set_embedding(&lock.0, &id, &vec) {
tracing::warn!("sync_push: set_embedding failed for {id}: {e}");
continue;
}
}
let mut idx_lock = vector_index.lock().await;
if let Some(idx) = idx_lock.as_mut() {
idx.remove(&id);
idx.insert(id.clone(), vec);
}
}
});
}
#[derive(Deserialize)]
pub struct SyncPushBody {
pub sender_agent_id: String,
#[serde(default)]
pub sender_clock: crate::models::VectorClock,
#[serde(default)]
pub sender_wall_clock: Option<String>,
pub memories: Vec<Memory>,
#[serde(default)]
pub embeddings: Vec<crate::federation::ShippedEmbedding>,
#[serde(default)]
pub deletions: Vec<String>,
#[serde(default)]
pub archives: Vec<String>,
#[serde(default)]
pub restores: Vec<String>,
#[serde(default)]
pub links: Vec<MemoryLink>,
#[serde(default)]
pub pendings: Vec<crate::models::PendingAction>,
#[serde(default)]
pub pending_decisions: Vec<crate::models::PendingDecision>,
#[serde(default)]
pub namespace_meta: Vec<crate::models::NamespaceMetaEntry>,
#[serde(default)]
pub namespace_meta_clears: Vec<String>,
#[serde(default)]
pub dry_run: bool,
}
#[derive(Deserialize)]
pub struct SyncSinceQuery {
pub since: Option<String>,
pub limit: Option<usize>,
pub peer: Option<String>,
}
pub async fn sync_push(
State(app): State<AppState>,
headers: HeaderMap,
body_bytes: Bytes,
) -> impl IntoResponse {
let peer_header_owned = extract_peer_id(&headers).map(str::to_string);
if let Some(peer_id) = peer_header_owned.as_deref() {
let attest_cfg = peer_attestation::PeerAttestationConfig::from_env();
if attest_cfg.has_allowlist() && attest_cfg.scope_for(peer_id).is_none() {
tracing::warn!(
target: ATTESTATION_TRACE_TARGET,
peer_id = %peer_id,
"sync_push: x-peer-id is not in operator allowlist — refusing (#1056 TOFU guard)"
);
return (
StatusCode::UNAUTHORIZED,
Json(json!({
"error": "x_peer_id_not_in_allowlist",
"note": "#1056: x-peer-id is not in AI_MEMORY_FED_PEER_ATTESTATION; \
enrol the peer or unset the env to restore zero-config posture.",
})),
)
.into_response();
}
}
if let Some(rejection) = verify_signature_or_reject(
&headers,
&body_bytes,
peer_header_owned.as_deref(),
&app.federation_nonce_cache,
) {
return rejection;
}
let body: SyncPushBody = match serde_json::from_slice(&body_bytes) {
Ok(b) => b,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("malformed sync_push body: {e}")})),
)
.into_response();
}
};
let state = app.db.clone();
let attest_cfg = PeerAttestationConfig::from_env();
if !peer_attestation::trust_body_agent_id_bypass() {
if let Err(e) = peer_attestation::attest_sender(
peer_header_owned.as_deref(),
Some(body.sender_agent_id.as_str()),
&attest_cfg,
) {
tracing::warn!(
target: ATTESTATION_TRACE_TARGET,
tag = e.tag(),
claimed = %body.sender_agent_id,
peer_header = %peer_header_owned.as_deref().unwrap_or(""),
"sync_push: sender_agent_id failed attestation against x-peer-id header"
);
return attestation_refusal_response(&e);
}
} else {
tracing::warn!(
target: ATTESTATION_TRACE_TARGET,
"sync_push: AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1 — bypassing #238 \
sender_agent_id attestation (legacy compat)"
);
}
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
return sync_push_via_store(app, headers, body).await;
}
if let Err(e) = validate::validate_agent_id(&body.sender_agent_id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid sender_agent_id: {e}")})),
)
.into_response();
}
if body.memories.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!("sync_push limited to {} memories per request", app.max_page_size)
})),
)
.into_response();
}
if body.embeddings.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!(
"sync_push limited to {} embeddings per request",
app.max_page_size
)
})),
)
.into_response();
}
if body.deletions.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!("sync_push limited to {} deletions per request", app.max_page_size)
})),
)
.into_response();
}
if body.archives.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!("sync_push limited to {} archives per request", app.max_page_size)
})),
)
.into_response();
}
if body.restores.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!("sync_push limited to {} restores per request", app.max_page_size)
})),
)
.into_response();
}
if body.pendings.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!("sync_push limited to {} pendings per request", app.max_page_size)
})),
)
.into_response();
}
if body.pending_decisions.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!(
"sync_push limited to {} pending_decisions per request",
app.max_page_size
)
})),
)
.into_response();
}
if body.namespace_meta.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!(
"sync_push limited to {} namespace_meta per request",
app.max_page_size
)
})),
)
.into_response();
}
if body.namespace_meta_clears.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!(
"sync_push limited to {} namespace_meta_clears per request",
app.max_page_size
)
})),
)
.into_response();
}
if body.links.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": format!(
"sync_push limited to {} links per request",
app.max_page_size
)
})),
)
.into_response();
}
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let local_agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
Ok(id) => id,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid x-agent-id: {e}")})),
)
.into_response();
}
};
check_sender_clock_skew(&body.sender_agent_id, &body);
let lock = state.lock().await;
let mut applied = 0usize;
let mut noop = 0usize;
let mut skipped = 0usize;
let mut deleted = 0usize;
let mut archived = 0usize;
let mut restored = 0usize;
let mut latest_seen: Option<String> = None;
let mut quota_refused = 0usize;
let mut first_quota_refusal: Option<crate::quotas::QuotaError> = None;
let receiver_dim = app
.embedder
.as_ref()
.as_ref()
.map(crate::embeddings::Embedder::dim);
let shipped_by_id: std::collections::HashMap<&str, &crate::federation::ShippedEmbedding> = body
.embeddings
.iter()
.map(|se| (se.memory_id.as_str(), se))
.collect();
let mut deferred_embed: Vec<(String, String)> = Vec::new();
let mut hnsw_updates: Vec<(String, Vec<f32>)> = Vec::new();
for mem in &body.memories {
if let Err(e) = validate::RequestValidator::validate_memory(mem) {
tracing::warn!("sync_push: skipping memory {} ({}): {e}", mem.id, mem.title);
skipped += 1;
continue;
}
if latest_seen
.as_deref()
.is_none_or(|current| mem.updated_at.as_str() > current)
{
latest_seen = Some(mem.updated_at.clone());
}
if body.dry_run {
noop += 1;
continue;
}
let attribute_agent = attribute_agent_for_quota(&body.sender_agent_id, mem);
let bytes_estimate =
i64::try_from(mem.title.len() + mem.content.len() + mem.metadata.to_string().len())
.unwrap_or(i64::MAX);
match crate::quotas::check_and_record(
&lock.0,
&attribute_agent,
&mem.namespace,
crate::quotas::QuotaOp::Memory {
bytes: bytes_estimate,
},
) {
Ok(()) => {}
Err(crate::quotas::QuotaCheckError::Quota(q)) => {
tracing::warn!(
target: "federation::quota",
peer = %body.sender_agent_id,
attribute_agent = %attribute_agent,
limit = q.limit.as_str(),
current = q.current,
max = q.max,
"sync_push: per-agent quota exceeded; refusing federation push"
);
let _ = crate::signed_events::append_signed_event(
&lock.0,
&crate::signed_events::SignedEvent::with_daemon_signature(
crate::signed_events::payload_hash(
format!(
"peer={} agent={} limit={} current={} max={}",
body.sender_agent_id,
attribute_agent,
q.limit.as_str(),
q.current,
q.max,
)
.as_bytes(),
),
attribute_agent.clone(),
"federation.quota_refused".to_string(),
chrono::Utc::now().to_rfc3339(),
),
);
quota_refused += 1;
if first_quota_refusal.is_none() {
first_quota_refusal = Some(q);
}
break;
}
Err(crate::quotas::QuotaCheckError::Sql(e)) => {
tracing::warn!(
"sync_push: quota substrate read failed for {}: {e}",
attribute_agent
);
skipped += 1;
continue;
}
}
let cap_for_namespace = db::resolve_governance_policy(&lock.0, &mem.namespace)
.unwrap_or_else(crate::models::GovernancePolicy::default)
.effective_max_reflection_depth();
let to_insert = crate::federation::reflection_bookkeeping::stamp_reflection_origin(
mem,
&body.sender_agent_id,
cap_for_namespace,
);
match db::insert_if_newer(&lock.0, &to_insert) {
Ok(actual_id) => {
applied += 1;
let clean_shipped = shipped_by_id
.get(mem.id.as_str())
.filter(|se| receiver_dim == Some(se.dim) && se.vector.len() == se.dim)
.and_then(|se| {
crate::federation::sanitize_shipped_vector(&se.vector)
.map(|v| (v, se.model.clone()))
});
match clean_shipped {
Some((vector, model)) => {
match db::set_embedding(&lock.0, &actual_id, &vector) {
Ok(()) => hnsw_updates.push((actual_id, vector)),
Err(e) => {
tracing::warn!(
"sync_push: storing shipped embedding failed for \
{actual_id} (model {model}): {e} — deferring local embed",
);
deferred_embed.push((
actual_id,
crate::embeddings::embedding_document(&mem.title, &mem.content),
));
}
}
}
None => deferred_embed.push((
actual_id,
crate::embeddings::embedding_document(&mem.title, &mem.content),
)),
}
}
Err(e) => {
let _ = crate::quotas::refund_op(
&lock.0,
&attribute_agent,
&mem.namespace,
crate::quotas::QuotaOp::Memory {
bytes: bytes_estimate,
},
);
tracing::warn!("sync_push: insert_if_newer failed for {}: {e}", mem.id);
skipped += 1;
}
}
}
if let Some(q) = first_quota_refusal.take() {
drop(lock);
if !hnsw_updates.is_empty() {
let mut idx_lock = app.vector_index.lock().await;
if let Some(idx) = idx_lock.as_mut() {
for (id, vec) in hnsw_updates {
idx.remove(&id);
idx.insert(id, vec);
}
}
}
spawn_deferred_embedding_refresh(&app, deferred_embed);
let reset_at = next_utc_midnight();
return (
StatusCode::TOO_MANY_REQUESTS,
[
("x-quota-reset-at", reset_at.as_str()),
("x-quota-limit", q.limit.as_str()),
],
Json(json!({
"error": "QUOTA_EXCEEDED",
"limit": q.limit.as_str(),
"current": q.current,
"max": q.max,
"agent_id": q.agent_id,
"applied_before_refusal": applied,
(crate::handlers::QUOTA_REFUSED_FIELD): quota_refused,
"reset_at": reset_at,
})),
)
.into_response();
}
for del_id in &body.deletions {
if validate::validate_id(del_id).is_err() {
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
match db::delete(&lock.0, del_id) {
Ok(true) => deleted += 1,
Ok(false) => noop += 1,
Err(e) => {
tracing::warn!("sync_push: delete failed for {del_id}: {e}");
skipped += 1;
}
}
}
for arch_id in &body.archives {
if validate::validate_id(arch_id).is_err() {
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
match db::archive_memory(&lock.0, arch_id, Some("sync_push")) {
Ok(true) => archived += 1,
Ok(false) => noop += 1,
Err(e) => {
tracing::warn!("sync_push: archive_memory failed for {arch_id}: {e}");
skipped += 1;
}
}
}
for res_id in &body.restores {
if validate::validate_id(res_id).is_err() {
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
match db::restore_archived(&lock.0, res_id) {
Ok(true) => restored += 1,
Ok(false) => noop += 1,
Err(e) => {
tracing::warn!("sync_push: restore_archived failed for {res_id}: {e}");
skipped += 1;
}
}
}
let mut links_applied = 0usize;
for link in &body.links {
if validate::RequestValidator::validate_link_triple(
&link.source_id,
&link.target_id,
link.relation.as_str(),
)
.is_err()
{
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
let attest_level = match (link.signature.as_deref(), link.observed_by.as_deref()) {
(Some(sig_bytes), Some(observed_by)) => {
match crate::identity::verify::lookup_peer_public_key(observed_by) {
Some(pubkey) => {
let signable = crate::identity::sign::SignableLink {
src_id: &link.source_id,
dst_id: &link.target_id,
relation: link.relation.as_str(),
observed_by: Some(observed_by),
valid_from: link.valid_from.as_deref(),
valid_until: link.valid_until.as_deref(),
};
match crate::identity::verify::verify(&pubkey, &signable, sig_bytes) {
Ok(()) => crate::models::AttestLevel::PeerAttested.as_str(),
Err(e) => {
tracing::warn!(
"sync_push: signature rejected for link \
({} -> {} / {}) from observed_by={}: {e}",
link.source_id,
link.target_id,
link.relation,
observed_by
);
skipped += 1;
continue;
}
}
}
None => {
crate::models::AttestLevel::Unsigned.as_str()
}
}
}
_ => crate::models::AttestLevel::Unsigned.as_str(),
};
match db::create_link_inbound(&lock.0, link, attest_level) {
Ok(()) => links_applied += 1,
Err(e) => {
tracing::warn!(
"sync_push: create_link_inbound failed ({} -> {} / {}): {e}",
link.source_id,
link.target_id,
link.relation
);
skipped += 1;
}
}
}
let mut pendings_applied = 0usize;
for pa in &body.pendings {
if validate::validate_id(&pa.id).is_err() {
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
match db::upsert_pending_action(&lock.0, pa) {
Ok(()) => {
pendings_applied += 1;
if pa.status == "pending" {
crate::subscriptions::dispatch_approval_requested(&lock.0, &pa.id, &lock.1);
}
}
Err(e) => {
tracing::warn!("sync_push: upsert_pending_action failed for {}: {e}", pa.id);
skipped += 1;
}
}
}
let mut pending_decisions_applied = 0usize;
for dec in &body.pending_decisions {
if validate::validate_id(&dec.id).is_err() {
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
match db::decide_pending_action(&lock.0, &dec.id, dec.approved, &dec.decider) {
Ok(true) => {
pending_decisions_applied += 1;
if dec.approved {
match db::execute_pending_action(&lock.0, &dec.id) {
Ok(_) => {}
Err(e) => {
tracing::warn!(
"sync_push: execute_pending_action failed for {}: {e}",
dec.id
);
}
}
}
}
Ok(false) => noop += 1, Err(e) => {
tracing::warn!(
"sync_push: decide_pending_action failed for {}: {e}",
dec.id
);
skipped += 1;
}
}
}
let mut namespace_meta_applied = 0usize;
for entry in &body.namespace_meta {
if validate::validate_namespace(&entry.namespace).is_err()
|| validate::validate_id(&entry.standard_id).is_err()
{
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
match db::set_namespace_standard(
&lock.0,
&entry.namespace,
&entry.standard_id,
entry.parent_namespace.as_deref(),
) {
Ok(()) => namespace_meta_applied += 1,
Err(e) => {
tracing::warn!(
"sync_push: set_namespace_standard failed for {}: {e}",
entry.namespace
);
skipped += 1;
}
}
}
let mut namespace_meta_cleared = 0usize;
for ns in &body.namespace_meta_clears {
if validate::validate_namespace(ns).is_err() {
skipped += 1;
continue;
}
if body.dry_run {
noop += 1;
continue;
}
match db::clear_namespace_standard(&lock.0, ns) {
Ok(true) => namespace_meta_cleared += 1,
Ok(false) => noop += 1,
Err(e) => {
tracing::warn!("sync_push: clear_namespace_standard failed for {ns}: {e}");
skipped += 1;
}
}
}
if !body.dry_run
&& let Some(at) = latest_seen.as_deref()
&& let Err(e) = db::sync_state_observe(&lock.0, &local_agent_id, &body.sender_agent_id, at)
{
tracing::warn!("sync_push: sync_state_observe failed: {e}");
}
let receiver_clock = db::sync_state_load(&lock.0, &local_agent_id)
.unwrap_or_else(|_| crate::models::VectorClock::default());
drop(lock);
if !hnsw_updates.is_empty() {
let mut idx_lock = app.vector_index.lock().await;
if let Some(idx) = idx_lock.as_mut() {
for (id, vec) in hnsw_updates {
idx.remove(&id);
idx.insert(id, vec);
}
}
}
spawn_deferred_embedding_refresh(&app, deferred_embed);
(
StatusCode::OK,
Json(json!({
"applied": applied,
"deleted": deleted,
"archived": archived,
"restored": restored,
"links_applied": links_applied,
"pendings_applied": pendings_applied,
"pending_decisions_applied": pending_decisions_applied,
"namespace_meta_applied": namespace_meta_applied,
"namespace_meta_cleared": namespace_meta_cleared,
"noop": noop,
"skipped": skipped,
(crate::handlers::QUOTA_REFUSED_FIELD): quota_refused,
"dry_run": body.dry_run,
"receiver_agent_id": local_agent_id,
"receiver_clock": receiver_clock,
})),
)
.into_response()
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::HeaderValue;
fn build_headers(value: &str) -> Option<HeaderMap> {
let hv = HeaderValue::from_bytes(value.as_bytes()).ok()?;
let mut h = HeaderMap::new();
h.insert(PEER_ID_HEADER, hv);
Some(h)
}
#[test]
fn extract_peer_id_accepts_legitimate_agent_id_1049() {
let h = build_headers("ai:peer-alpha").expect("legitimate value fits in HeaderValue");
assert_eq!(extract_peer_id(&h), Some("ai:peer-alpha"));
}
#[test]
fn extract_peer_id_accepts_hostname_form_1049() {
let h = build_headers("host:laptop.local:pid-42").expect("legitimate value fits");
assert_eq!(extract_peer_id(&h), Some("host:laptop.local:pid-42"));
}
#[test]
fn extract_peer_id_rejects_value_with_whitespace_1049() {
let h = build_headers("peer one").expect("space fits in HeaderValue");
assert_eq!(
extract_peer_id(&h),
None,
"#1049: whitespace in peer-id MUST be rejected by the validator"
);
}
#[test]
fn extract_peer_id_rejects_value_with_shell_metacharacters_1049() {
let h = build_headers("peer$attacker").expect("$ fits in HeaderValue");
assert_eq!(
extract_peer_id(&h),
None,
"#1049: shell metacharacters in peer-id MUST be rejected"
);
}
#[test]
fn extract_peer_id_rejects_oversized_value_1049() {
let oversized = "a".repeat(129);
let h = build_headers(&oversized).expect("129-byte ASCII fits in HeaderValue");
assert_eq!(
extract_peer_id(&h),
None,
"#1049: oversized peer-id (>128 bytes) MUST be rejected"
);
}
#[test]
fn extract_peer_id_absent_returns_none() {
let h = HeaderMap::new();
assert_eq!(extract_peer_id(&h), None);
}
}