ai_memory/handlers/federation_receive.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4use axum::{
5 Json,
6 body::Bytes,
7 extract::State,
8 http::{HeaderMap, StatusCode},
9 response::{IntoResponse, Response},
10};
11use serde::Deserialize;
12use serde_json::json;
13
14use crate::db;
15use crate::federation::peer_attestation::{
16 self, AttestError, PEER_ID_HEADER, PeerAttestationConfig,
17};
18use crate::models::{Memory, MemoryLink};
19use crate::validate;
20
21use super::AppState;
22#[cfg(feature = "sal")]
23use super::StorageBackend;
24#[cfg(feature = "sal")]
25use super::federation_signing_check::sync_push_via_store;
26use super::federation_signing_check::verify_signature_or_reject;
27
28/// Tracing target for receive-side peer-attestation checks
29/// (#1558 tracing-target SSOT).
30const ATTESTATION_TRACE_TARGET: &str = "federation::attestation";
31
32/// v0.7.0 federation security — extract the peer's self-claimed
33/// `x-peer-id` header. Lowercase form per HTTP/2 wire convention;
34/// axum's `HeaderMap` lookup is case-insensitive so callers can send
35/// the canonical `X-Peer-Id`.
36///
37/// v0.7.0 #1049 (Agent-5 #9) — validates the header value through
38/// `validate::validate_agent_id` before returning so the raw header
39/// content cannot inject CRLF/terminal-escape sequences into
40/// downstream tracing log files or be smuggled into the
41/// `FederationNonceCache` key (where exotic bytes would create
42/// per-peer cache fragmentation an attacker could weaponise to
43/// flood-evict legitimate peer entries). Returns `None` for any
44/// header that fails the agent_id shape check — same observable
45/// outcome as the header being absent.
46pub(super) fn extract_peer_id(headers: &HeaderMap) -> Option<&str> {
47 let raw = headers.get(PEER_ID_HEADER).and_then(|v| v.to_str().ok())?;
48 // Reject anything that fails the agent_id shape per CLAUDE.md
49 // §"Agent Identity": `^[A-Za-z0-9_\-:@./]{1,128}$`. The strict
50 // shape is the load-bearing property — no whitespace, no nulls,
51 // no control chars (CRLF), no shell metacharacters.
52 if crate::validate::validate_agent_id(raw).is_err() {
53 tracing::warn!(
54 target: "federation::peer_id",
55 "extract_peer_id: dropped malformed X-Peer-Id header (#1049 validation gate)"
56 );
57 return None;
58 }
59 Some(raw)
60}
61
62/// v0.7.0 #238 — render a 403 envelope when the body-claimed
63/// `sender_agent_id` does not attest to the wire-level `x-peer-id`
64/// header. Surfaces both values so the operator can diff exactly
65/// what the peer claimed against what the substrate expected.
66fn attestation_refusal_response(err: &AttestError) -> Response {
67 let (claimed, peer_header) = match err {
68 AttestError::HeaderMissing => (String::new(), String::new()),
69 AttestError::Mismatch {
70 claimed,
71 peer_header,
72 } => (claimed.clone(), peer_header.clone()),
73 };
74 (
75 StatusCode::FORBIDDEN,
76 Json(json!({
77 "error": err.tag(),
78 "claimed": claimed,
79 "peer_header": peer_header,
80 "note": "set AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1 to opt out (legacy peers); \
81 pre-v0.7.0 federation peers must be upgraded to send `x-peer-id`.",
82 })),
83 )
84 .into_response()
85}
86
87// ---------------------------------------------------------------------------
88// Phase 3 foundation (issue #224) — HTTP sync endpoints.
89//
90// These ship in v0.6.0 GA as SKELETONS running today's timestamp-aware merge
91// (`db::insert_if_newer`). Field-level CRDT-lite merge rules, streaming,
92// resume-on-interrupt, and per-peer auth tokens are v0.8.0 targets.
93// ---------------------------------------------------------------------------
94
95/// v0.7.0 S6-LOW2 — log a warning when the sender's claimed wall-clock
96/// is more than this many seconds ahead of the receiver's. Threshold is
97/// deliberately permissive: ~1 minute of skew is normal for hosts with
98/// NTP drift after a sleep cycle. Anything beyond that is operator-
99/// signal that the cluster's clocks need attention.
100const CLOCK_SKEW_WARN_THRESHOLD_SECS: i64 = 60;
101
102/// v0.7.0 S6-LOW2 — observability-only clock-skew check. Compares the
103/// sender's reported wall-clock (or the highest entry in
104/// `sender_clock.entries` when the wall-clock field is absent) against
105/// the receiver's `chrono::Utc::now()`. When the delta exceeds
106/// [`CLOCK_SKEW_WARN_THRESHOLD_SECS`] in either direction, emits a
107/// `tracing::warn!` so operators can spot a misconfigured peer. NEVER
108/// rejects the push — federation must be tolerant of clock drift; the
109/// log is the entire enforcement surface.
110pub(super) fn check_sender_clock_skew(sender_agent_id: &str, body: &SyncPushBody) {
111 let sender_ts_str: Option<&str> = body
112 .sender_wall_clock
113 .as_deref()
114 .or_else(|| body.sender_clock.entries.values().max().map(String::as_str));
115 let Some(ts_str) = sender_ts_str else {
116 return; // No clock signal at all → nothing to compare.
117 };
118 let Ok(sender_at) = chrono::DateTime::parse_from_rfc3339(ts_str) else {
119 tracing::debug!(
120 sender = %sender_agent_id,
121 sender_ts = %ts_str,
122 "sync_push: sender clock not RFC3339; skipping skew check"
123 );
124 return;
125 };
126 let now = chrono::Utc::now();
127 let skew_secs = sender_at
128 .with_timezone(&chrono::Utc)
129 .signed_duration_since(now)
130 .num_seconds();
131 if skew_secs.abs() > CLOCK_SKEW_WARN_THRESHOLD_SECS {
132 tracing::warn!(
133 target: "federation::clock_skew",
134 sender = %sender_agent_id,
135 skew_secs,
136 sender_ts = %ts_str,
137 receiver_ts = %now.to_rfc3339(),
138 "sync_push: sender_clock skew exceeds {CLOCK_SKEW_WARN_THRESHOLD_SECS}s threshold \
139 (observability-only; push accepted)",
140 );
141 }
142}
143
144/// v0.7.0 S6-M2 — per-agent quota gate for federation receive. Closes
145/// the F7 gap (#639) where mTLS-authenticated peers could push past
146/// the local `agent_quotas` storage caps that would have blocked an
147/// equivalent HTTP `POST /memories` from the same identity.
148///
149/// `attribute_agent` is the identity the substrate will charge for the
150/// row. Resolution precedence (mTLS-attested first; falls back to the
151/// claim chain when no cert peeking is available):
152/// 1. `mem.metadata.agent_id` — the original author of the row
153/// (NHI provenance preserved across federation). This is what
154/// `quota_status` reports against, so charging this id makes the
155/// receiver-side quota a true mirror of the originator's daily
156/// budget. A misbehaving peer cannot substitute another agent's
157/// id without crashing the upstream signature check (H3).
158/// 2. `sender_agent_id` — substrate identity of the peer that
159/// delivered the row. Used when the row carries no
160/// `metadata.agent_id` (legacy / unauthored federation push).
161///
162/// Returns `Ok(())` on a clean check + record (counters incremented),
163/// `Err(QuotaError)` on a refusal. The caller renders the refusal as
164/// `429 Too Many Requests` with an `X-Quota-Reset-At` header.
165pub(super) fn attribute_agent_for_quota(sender_agent_id: &str, mem: &Memory) -> String {
166 mem.metadata
167 .get("agent_id")
168 .and_then(serde_json::Value::as_str)
169 .map(str::to_string)
170 .unwrap_or_else(|| sender_agent_id.to_string())
171}
172
173/// v0.7.0 S6-M2 — compute the next UTC midnight in RFC3339, used as
174/// the `X-Quota-Reset-At` header value when a federation receive is
175/// refused for hitting `memories_per_day` or `links_per_day`. Storage
176/// caps reset on midnight UTC via `quotas::reset_daily`. The header
177/// matches the HTTP POST refusal surface so clients have one timer
178/// to consult regardless of which entry point hit the cap.
179pub(super) fn next_utc_midnight() -> String {
180 use chrono::{Duration, Timelike};
181 let now = chrono::Utc::now();
182 let next = now
183 .with_hour(0)
184 .and_then(|t| t.with_minute(0))
185 .and_then(|t| t.with_second(0))
186 .and_then(|t| t.with_nanosecond(0))
187 .map(|midnight_today| midnight_today + Duration::days(1))
188 .unwrap_or_else(|| now + Duration::days(1));
189 next.to_rfc3339()
190}
191
192/// #1566 / #1579 B1 — deferred receive-side embedding refresh.
193///
194/// Spawns a detached task that embeds each `(memory_id,
195/// embedding_document)` pair OFF the request path: the embed itself
196/// runs on the blocking pool (the embedder is CPU-/network-heavy —
197/// ~1s/row via ollama), the DB lock is held only for the per-row
198/// `set_embedding` UPDATE, and the HNSW index is touched last (its
199/// own mutex, never overlapping the DB lock). Errors are logged and
200/// the row is left for the boot-time embed backfill
201/// (`db::get_unembedded_ids` selects `embedding IS NULL`, which covers
202/// federation-applied rows) — same best-effort posture as the
203/// pre-#1566 inline loop, minus the quorum-window coupling.
204///
205/// No-op when `rows` is empty or the receiver runs keyword-only (no
206/// embedder): rows stay FTS-recallable, matching the pre-#1566
207/// behaviour where the embed loop was gated on `app.embedder`.
208fn spawn_deferred_embedding_refresh(app: &AppState, rows: Vec<(String, String)>) {
209 if rows.is_empty() || app.embedder.as_ref().as_ref().is_none() {
210 return;
211 }
212 let db = app.db.clone();
213 let embedder = app.embedder.clone();
214 let vector_index = app.vector_index.clone();
215 tokio::spawn(async move {
216 for (id, text) in rows {
217 let emb = embedder.clone();
218 let embed_res =
219 tokio::task::spawn_blocking(move || emb.as_ref().as_ref().map(|e| e.embed(&text)))
220 .await;
221 let vec = match embed_res {
222 Ok(Some(Ok(v))) => v,
223 Ok(Some(Err(e))) => {
224 tracing::warn!("sync_push: deferred embed failed for {id}: {e}");
225 continue;
226 }
227 // Embedder vanished (impossible — checked above and the
228 // Arc is immutable) — nothing left to do for any row.
229 Ok(None) => return,
230 Err(e) => {
231 tracing::warn!("sync_push: deferred embed join error for {id}: {e}");
232 continue;
233 }
234 };
235 {
236 let lock = db.lock().await;
237 if let Err(e) = db::set_embedding(&lock.0, &id, &vec) {
238 tracing::warn!("sync_push: set_embedding failed for {id}: {e}");
239 continue;
240 }
241 }
242 let mut idx_lock = vector_index.lock().await;
243 if let Some(idx) = idx_lock.as_mut() {
244 idx.remove(&id);
245 idx.insert(id.clone(), vec);
246 }
247 }
248 });
249}
250
251/// Request body for `POST /api/v1/sync/push`.
252#[derive(Deserialize)]
253pub struct SyncPushBody {
254 /// Claimed `agent_id` of the peer pushing data. Recorded in
255 /// `sync_state` for vector clock advancement.
256 ///
257 /// v0.7.0 #238 — this body field is now ATTESTED against the
258 /// wire-level `x-peer-id` HTTP header before any substrate write
259 /// fires. See `src/federation/peer_attestation.rs` for the
260 /// decision matrix, env bypass, and operator runbook. Pre-v0.7.0
261 /// federation clients that don't send `x-peer-id` are accepted
262 /// only when the operator opts in via
263 /// `AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1`.
264 pub sender_agent_id: String,
265 /// Vector clock the sender had at push time. v0.7.0 S6-LOW2: now
266 /// consulted for observability-only clock-skew detection — the
267 /// receiver logs a `tracing::warn!` when the sender's latest
268 /// claimed observation is >60s ahead of the receiver's wall clock.
269 /// Full clock reconciliation (CRDT-lite merge) lands with Task 3a.1.
270 #[serde(default)]
271 pub sender_clock: crate::models::VectorClock,
272 /// v0.7.0 S6-LOW2 — sender's wall-clock RFC3339 timestamp at push
273 /// time. Optional: when absent, skew detection falls back to the
274 /// highest timestamp in `sender_clock.entries`. Observability-only;
275 /// never enforced.
276 #[serde(default)]
277 pub sender_wall_clock: Option<String>,
278 /// Memories the sender is offering. Applied via the existing
279 /// timestamp-aware merge (`insert_if_newer`).
280 pub memories: Vec<Memory>,
281 /// #1566 / #1579 B1 — source-side embedding vectors for the rows
282 /// in `memories` (embed-once-replicate-vector). Inside the
283 /// Ed25519-signed body bytes, so vector integrity is covered by
284 /// the same `X-Memory-Sig` + nonce replay protection as the rows.
285 /// `#[serde(default)]` keeps decode TOLERANT of absence: pushes
286 /// from pre-#1566 peers parse identically (empty vec), and the
287 /// receive path falls back to the deferred background-embed for
288 /// any applied row without a dim-matching shipped vector.
289 #[serde(default)]
290 pub embeddings: Vec<crate::federation::ShippedEmbedding>,
291 /// Memory IDs the sender has deleted and wants propagated. Applied
292 /// via `db::delete`. v0.6.0.1: simple remove (no tombstone row); a
293 /// concurrent newer `insert_if_newer` from another peer could revive
294 /// the row — a Last-Writer-Wins quirk we live with until v0.7's
295 /// CRDT-lite tombstone table lands. In the common 4-node mesh, the
296 /// same delete reaches every peer well before any revival window.
297 #[serde(default)]
298 pub deletions: Vec<String>,
299 /// v0.6.2 (S29): memory IDs the sender has explicitly archived and
300 /// wants propagated. Applied via `db::archive_memory` — a soft move
301 /// from `memories` to `archived_memories`. Missing-on-peer IDs no-op.
302 /// Distinct from `deletions`, which is a hard DELETE.
303 #[serde(default)]
304 pub archives: Vec<String>,
305 /// v0.6.2 (S29): memory IDs the sender has restored from archive and
306 /// wants propagated. Applied via `db::restore_archived` — moves the
307 /// row from `archived_memories` back into `memories`. The inverse of
308 /// `archives`. Missing-on-peer IDs (no row in the peer's archive
309 /// table, or a live row already exists) no-op so replays are safe.
310 #[serde(default)]
311 pub restores: Vec<String>,
312 /// v0.6.2 (#325): memory links the sender wants propagated. Applied
313 /// via `db::create_link` on each peer. Duplicates are a no-op thanks
314 /// to the unique `(source_id, target_id, relation)` constraint on
315 /// `memory_links`.
316 #[serde(default)]
317 pub links: Vec<MemoryLink>,
318 /// v0.6.2 (S34): pending-action rows the sender wants propagated.
319 /// Applied via `db::upsert_pending_action` — preserves the originator's
320 /// id + status + approvals so the cluster agrees on pending state.
321 /// Without this, `POST /api/v1/pending/{id}/approve` on a peer 404s
322 /// because the row only exists on the originator.
323 #[serde(default)]
324 pub pendings: Vec<crate::models::PendingAction>,
325 /// v0.6.2 (S34): pending-action decisions the sender wants propagated
326 /// so approve/reject on any node lands consistently. Applied via
327 /// `db::decide_pending_action` — already-decided rows no-op, replay-safe.
328 #[serde(default)]
329 pub pending_decisions: Vec<crate::models::PendingDecision>,
330 /// v0.6.2 (S35): namespace-standard meta rows the sender wants
331 /// propagated. Applied via `db::set_namespace_standard(conn, ns,
332 /// standard_id, parent.as_deref())` so the peer's inheritance-chain
333 /// walk uses the originator's explicit parent (not a locally
334 /// auto-detected one).
335 #[serde(default)]
336 pub namespace_meta: Vec<crate::models::NamespaceMetaEntry>,
337 /// v0.6.2 (S35 follow-up): namespaces whose standard the sender has
338 /// *cleared* and wants propagated. Applied via `db::clear_namespace_standard`
339 /// — missing-on-peer namespaces no-op so replays are safe. Without
340 /// this, alice clearing a standard on node-1 left the row visible on
341 /// node-2's peer, breaking cross-peer rule-lifecycle assertions.
342 #[serde(default)]
343 pub namespace_meta_clears: Vec<String>,
344 /// Preview mode — classify and count, do not write.
345 #[serde(default)]
346 pub dry_run: bool,
347}
348
349#[derive(Deserialize)]
350pub struct SyncSinceQuery {
351 /// Return memories with `updated_at > since`. Absent = full snapshot.
352 pub since: Option<String>,
353 /// Pagination cap. Defaults to 500.
354 pub limit: Option<usize>,
355 /// Caller's claimed `agent_id`; optional but recorded in `sync_state`
356 /// so the caller can later push incremental updates.
357 pub peer: Option<String>,
358}
359
360/// v0.7.0 Wave-3 Continuation 2 — postgres-backed federation push.
361///
362/// Dispatches each `Memory` row through `app.store.apply_remote_memory`
363/// (idempotent insert-if-newer) and each link / deletion through the
364/// matching trait method. Other subcollections (pendings, archives,
365/// restores, namespace_meta, pending_decisions) are governance- /
366/// archive-state-machine concerns whose write paths live on tables
367/// not yet trait-covered; they surface as skipped with a structured
368/// `unsupported_on_postgres` count in the response envelope so a
369/// heterogeneous (sqlite ↔ postgres) federation degrades gracefully
370/// without silent drops.
371///
372/// Heterogeneous federation contract: a sqlite peer's push of N
373/// memories + M links + K deletions reaches steady-state on the
374/// postgres receiver via the trait calls. Audit emission for every
375/// accepted federation push fires through `audit::emit` regardless
376/// of backend (Phase 9).
377pub async fn sync_push(
378 State(app): State<AppState>,
379 headers: HeaderMap,
380 body_bytes: Bytes,
381) -> impl IntoResponse {
382 // v0.7.0 #791 — verify the per-message signature BEFORE
383 // deserialising the body. Keeps the verifier's input identical
384 // to the wire bytes (signer + verifier MUST agree byte-for-byte).
385 let peer_header_owned = extract_peer_id(&headers).map(str::to_string);
386
387 // v0.7.0 #1056 (Agent-2 #6) — TOFU spoofing guard. The
388 // (no sig, no enrolled key) arm of `verify_signature_or_reject`
389 // allows the request through with a WARN ("strict enforcement
390 // skipped") so an unenrolled federation pair stays operational.
391 // That permissive posture lets an attacker who knows a legitimate
392 // peer's id but has NOT yet been enrolled (heterogeneous rollout
393 // window — operator enrols half the mesh) impersonate the
394 // unenrolled half. Close the window by refusing any push whose
395 // claimed `x-peer-id` is NOT in the operator-configured peer
396 // allowlist (`AI_MEMORY_FED_PEER_ATTESTATION`). When NO allowlist
397 // is configured (the default zero-config state), this gate is a
398 // no-op and the legacy posture stands — so the security uplift
399 // only fires when the operator has explicitly enrolled peers.
400 if let Some(peer_id) = peer_header_owned.as_deref() {
401 let attest_cfg = peer_attestation::PeerAttestationConfig::from_env();
402 if attest_cfg.has_allowlist() && attest_cfg.scope_for(peer_id).is_none() {
403 tracing::warn!(
404 target: ATTESTATION_TRACE_TARGET,
405 peer_id = %peer_id,
406 "sync_push: x-peer-id is not in operator allowlist — refusing (#1056 TOFU guard)"
407 );
408 return (
409 StatusCode::UNAUTHORIZED,
410 Json(json!({
411 "error": "x_peer_id_not_in_allowlist",
412 "note": "#1056: x-peer-id is not in AI_MEMORY_FED_PEER_ATTESTATION; \
413 enrol the peer or unset the env to restore zero-config posture.",
414 })),
415 )
416 .into_response();
417 }
418 }
419 // v0.7.0 #922 — chained nonce-freshness check after signature verifies.
420 if let Some(rejection) = verify_signature_or_reject(
421 &headers,
422 &body_bytes,
423 peer_header_owned.as_deref(),
424 &app.federation_nonce_cache,
425 ) {
426 return rejection;
427 }
428
429 // Deserialise the body now that the signature has been verified.
430 let body: SyncPushBody = match serde_json::from_slice(&body_bytes) {
431 Ok(b) => b,
432 Err(e) => {
433 return (
434 StatusCode::BAD_REQUEST,
435 Json(json!({"error": format!("malformed sync_push body: {e}")})),
436 )
437 .into_response();
438 }
439 };
440
441 let state = app.db.clone();
442
443 // v0.7.0 #238 — body-claimed sender_agent_id MUST attest against
444 // the wire-level `x-peer-id` header (or be the unauthored-push
445 // legacy shape). Backwards-compat via
446 // `AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1`. Runs BEFORE the
447 // postgres-dispatch branch so both backends share the same
448 // refusal posture. See `src/federation/peer_attestation.rs`.
449 // (peer_header_owned already extracted above for signature check)
450 let attest_cfg = PeerAttestationConfig::from_env();
451 if !peer_attestation::trust_body_agent_id_bypass() {
452 if let Err(e) = peer_attestation::attest_sender(
453 peer_header_owned.as_deref(),
454 Some(body.sender_agent_id.as_str()),
455 &attest_cfg,
456 ) {
457 tracing::warn!(
458 target: ATTESTATION_TRACE_TARGET,
459 tag = e.tag(),
460 claimed = %body.sender_agent_id,
461 peer_header = %peer_header_owned.as_deref().unwrap_or(""),
462 "sync_push: sender_agent_id failed attestation against x-peer-id header"
463 );
464 return attestation_refusal_response(&e);
465 }
466 } else {
467 // Bypass set — log once per request at WARN so the operator
468 // can see the legacy posture is in effect.
469 tracing::warn!(
470 target: ATTESTATION_TRACE_TARGET,
471 "sync_push: AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1 — bypassing #238 \
472 sender_agent_id attestation (legacy compat)"
473 );
474 }
475
476 // v0.7.0 Wave-3 Continuation 2 — postgres-backed federation
477 // dispatches through the SAL trait for memories / deletions /
478 // links. Pendings / archives / restores / namespace_meta /
479 // pending_decisions remain sqlite-only (governance write paths
480 // and archive-state-machine state sit on tables not yet covered
481 // by the trait surface — those subcollections, when present in a
482 // push from a sqlite peer, surface in `skipped` with a structured
483 // note in the response envelope).
484 #[cfg(feature = "sal")]
485 if matches!(app.storage_backend, StorageBackend::Postgres) {
486 return sync_push_via_store(app, headers, body).await;
487 }
488
489 if let Err(e) = validate::validate_agent_id(&body.sender_agent_id) {
490 return (
491 StatusCode::BAD_REQUEST,
492 Json(json!({"error": format!("invalid sender_agent_id: {e}")})),
493 )
494 .into_response();
495 }
496 // Cap memories per push, matching the bulk-create limit. Without
497 // this a malicious peer with a valid mTLS cert could flood the
498 // receiver and bottleneck the shared SQLite Mutex (red-team #242).
499 if body.memories.len() > app.max_page_size {
500 return (
501 StatusCode::BAD_REQUEST,
502 Json(json!({
503 "error": format!("sync_push limited to {} memories per request", app.max_page_size)
504 })),
505 )
506 .into_response();
507 }
508 // #1566 / #1579 B1 — the shipped-vector array is bounded by the
509 // same cap as its sibling subcollections (red-team #242 posture:
510 // vectors are the LARGEST per-element payload on this surface, so
511 // an unbounded array would be the cheapest flood vector).
512 if body.embeddings.len() > app.max_page_size {
513 return (
514 StatusCode::BAD_REQUEST,
515 Json(json!({
516 "error": format!(
517 "sync_push limited to {} embeddings per request",
518 app.max_page_size
519 )
520 })),
521 )
522 .into_response();
523 }
524 if body.deletions.len() > app.max_page_size {
525 return (
526 StatusCode::BAD_REQUEST,
527 Json(json!({
528 "error": format!("sync_push limited to {} deletions per request", app.max_page_size)
529 })),
530 )
531 .into_response();
532 }
533 if body.archives.len() > app.max_page_size {
534 return (
535 StatusCode::BAD_REQUEST,
536 Json(json!({
537 "error": format!("sync_push limited to {} archives per request", app.max_page_size)
538 })),
539 )
540 .into_response();
541 }
542 if body.restores.len() > app.max_page_size {
543 return (
544 StatusCode::BAD_REQUEST,
545 Json(json!({
546 "error": format!("sync_push limited to {} restores per request", app.max_page_size)
547 })),
548 )
549 .into_response();
550 }
551 if body.pendings.len() > app.max_page_size {
552 return (
553 StatusCode::BAD_REQUEST,
554 Json(json!({
555 "error": format!("sync_push limited to {} pendings per request", app.max_page_size)
556 })),
557 )
558 .into_response();
559 }
560 if body.pending_decisions.len() > app.max_page_size {
561 return (
562 StatusCode::BAD_REQUEST,
563 Json(json!({
564 "error": format!(
565 "sync_push limited to {} pending_decisions per request",
566 app.max_page_size
567 )
568 })),
569 )
570 .into_response();
571 }
572 if body.namespace_meta.len() > app.max_page_size {
573 return (
574 StatusCode::BAD_REQUEST,
575 Json(json!({
576 "error": format!(
577 "sync_push limited to {} namespace_meta per request",
578 app.max_page_size
579 )
580 })),
581 )
582 .into_response();
583 }
584 if body.namespace_meta_clears.len() > app.max_page_size {
585 return (
586 StatusCode::BAD_REQUEST,
587 Json(json!({
588 "error": format!(
589 "sync_push limited to {} namespace_meta_clears per request",
590 app.max_page_size
591 )
592 })),
593 )
594 .into_response();
595 }
596 // #1556 — `links` was the sole subcollection missing this cap. The link
597 // loop below does a synchronous insert (and an Ed25519 verify when the
598 // link carries signature+observed_by) per element while holding the shared
599 // write Mutex; without a bound a peer could send ~15-20k links per 2 MiB
600 // body (15-20x every sibling cap) to saturate the lock — the red-team #242
601 // DoS the other caps exist to prevent. Checked pre-lock like its siblings.
602 if body.links.len() > app.max_page_size {
603 return (
604 StatusCode::BAD_REQUEST,
605 Json(json!({
606 "error": format!(
607 "sync_push limited to {} links per request",
608 app.max_page_size
609 )
610 })),
611 )
612 .into_response();
613 }
614 // Receiver's local identity — default to the caller-supplied header,
615 // fall back to the anonymous placeholder. Recorded in sync_state rows.
616 let header_agent_id = headers
617 .get(crate::HEADER_AGENT_ID)
618 .and_then(|v| v.to_str().ok());
619 let local_agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
620 Ok(id) => id,
621 Err(e) => {
622 return (
623 StatusCode::BAD_REQUEST,
624 Json(json!({"error": format!("invalid x-agent-id: {e}")})),
625 )
626 .into_response();
627 }
628 };
629
630 // v0.7.0 S6-LOW2 — observability-only sender_clock skew detection.
631 // Logs a warn when the sender's clock claim is >60s out from ours;
632 // does not gate the push. Federation must be tolerant of drift.
633 check_sender_clock_skew(&body.sender_agent_id, &body);
634
635 let lock = state.lock().await;
636 let mut applied = 0usize;
637 let mut noop = 0usize;
638 let mut skipped = 0usize;
639 let mut deleted = 0usize;
640 let mut archived = 0usize;
641 let mut restored = 0usize;
642 let mut latest_seen: Option<String> = None;
643 // v0.7.0 S6-M2 — federation quota refusals. Counted alongside
644 // `skipped` so the existing response envelope shape doesn't change,
645 // and surfaced as a distinct field so an operator can tell the
646 // difference between "peer pushed garbage" and "peer overran its
647 // daily cap". The first quota refusal also short-circuits the
648 // whole memory loop with a 429 response (matches the HTTP POST
649 // store refusal: callers MUST back off, not just skip the offender).
650 let mut quota_refused = 0usize;
651 let mut first_quota_refusal: Option<crate::quotas::QuotaError> = None;
652
653 // v0.6.0.1 (#322): peers that apply a synced memory must also refresh
654 // their embedding + HNSW index so downstream semantic recall surfaces
655 // the row. Without this, scenario-18 observed a2a-hermes r14 black-hole
656 // pattern: substrate CRUD fanout works, but semantic recall on peers
657 // silently misses propagated writes.
658 //
659 // #1566 / #1579 B1 (2026-06-10) — embed-once-replicate-vector +
660 // ack-after-commit. The pre-#1566 shape embedded every applied row
661 // synchronously (~1s/row via ollama) while STILL HOLDING the DB
662 // lock, inside the sender's quorum-ack window — the mechanism
663 // behind the deadline_exceeded → DLQ cascade (the 62k-row #1578
664 // event) and the up-to-9× duplicate embedding across the fleet.
665 // Now:
666 // - a dim-matching shipped vector is stored directly under the
667 // already-held lock (one cheap UPDATE, microseconds);
668 // - everything else is DEFERRED to a detached background task
669 // spawned after the response is decided (see
670 // `spawn_deferred_embedding_refresh`), so the ack never waits
671 // on the embedder. FTS keeps the rows keyword-recallable in
672 // the gap, and the boot-time embed backfill
673 // (`db::get_unembedded_ids` — `embedding IS NULL` covers
674 // federation-applied rows) is the restart safety net.
675 let receiver_dim = app
676 .embedder
677 .as_ref()
678 .as_ref()
679 .map(crate::embeddings::Embedder::dim);
680 let shipped_by_id: std::collections::HashMap<&str, &crate::federation::ShippedEmbedding> = body
681 .embeddings
682 .iter()
683 .map(|se| (se.memory_id.as_str(), se))
684 .collect();
685 let mut deferred_embed: Vec<(String, String)> = Vec::new();
686 let mut hnsw_updates: Vec<(String, Vec<f32>)> = Vec::new();
687 for mem in &body.memories {
688 if let Err(e) = validate::RequestValidator::validate_memory(mem) {
689 tracing::warn!("sync_push: skipping memory {} ({}): {e}", mem.id, mem.title);
690 skipped += 1;
691 continue;
692 }
693 if latest_seen
694 .as_deref()
695 .is_none_or(|current| mem.updated_at.as_str() > current)
696 {
697 latest_seen = Some(mem.updated_at.clone());
698 }
699 if body.dry_run {
700 noop += 1;
701 continue;
702 }
703 // v0.7.0 S6-M2 — per-agent quota gate. F7 (#639) closed this
704 // on the HTTP POST store path but federation receive was a
705 // back-door bypass: an mTLS peer could push N memories per
706 // second past the local `agent_quotas.max_memories_per_day`
707 // ceiling because `insert_if_newer` is the substrate-level
708 // upsert and doesn't consult quotas. Charge each accepted
709 // memory against the original author's quota row so the cap
710 // is a true cluster-wide budget. On refusal: emit a signed
711 // refusal event (for the cryptographic audit chain) and
712 // short-circuit the loop with `quota_refused`; the outer
713 // handler renders 429 + X-Quota-Reset-At so callers back off.
714 let attribute_agent = attribute_agent_for_quota(&body.sender_agent_id, mem);
715 let bytes_estimate =
716 i64::try_from(mem.title.len() + mem.content.len() + mem.metadata.to_string().len())
717 .unwrap_or(i64::MAX);
718 // v0.7.0 #1156 — charge against the per-namespace accounting
719 // row. Federation peers can no longer drain an agent's cap by
720 // fanning across namespaces (the per-namespace dimension keeps
721 // each namespace's allotment intact).
722 match crate::quotas::check_and_record(
723 &lock.0,
724 &attribute_agent,
725 &mem.namespace,
726 crate::quotas::QuotaOp::Memory {
727 bytes: bytes_estimate,
728 },
729 ) {
730 Ok(()) => {}
731 Err(crate::quotas::QuotaCheckError::Quota(q)) => {
732 tracing::warn!(
733 target: "federation::quota",
734 peer = %body.sender_agent_id,
735 attribute_agent = %attribute_agent,
736 limit = q.limit.as_str(),
737 current = q.current,
738 max = q.max,
739 "sync_push: per-agent quota exceeded; refusing federation push"
740 );
741 // Emit a signed audit event so the refusal lands in the
742 // tamper-evident chain alongside the F7-equivalent HTTP
743 // POST refusal. Best-effort: audit-write failure is
744 // logged but does not change the refusal control flow.
745 let _ = crate::signed_events::append_signed_event(
746 &lock.0,
747 // v0.7.0 #1099 (SR-1 #4, HIGH) — sign the quota-
748 // refusal audit row with the daemon's installed
749 // signing key when one is available. Pre-#1099 the
750 // row always landed unsigned.
751 &crate::signed_events::SignedEvent::with_daemon_signature(
752 crate::signed_events::payload_hash(
753 format!(
754 "peer={} agent={} limit={} current={} max={}",
755 body.sender_agent_id,
756 attribute_agent,
757 q.limit.as_str(),
758 q.current,
759 q.max,
760 )
761 .as_bytes(),
762 ),
763 attribute_agent.clone(),
764 "federation.quota_refused".to_string(),
765 chrono::Utc::now().to_rfc3339(),
766 ),
767 );
768 quota_refused += 1;
769 if first_quota_refusal.is_none() {
770 first_quota_refusal = Some(q);
771 }
772 // Short-circuit: any further memories in this push
773 // would only deepen the cap breach. The remainder of
774 // the loop posture (skipping the rest) matches the
775 // HTTP POST bulk-create refusal — first cap hit
776 // returns 429 with the remaining batch unprocessed.
777 break;
778 }
779 Err(crate::quotas::QuotaCheckError::Sql(e)) => {
780 tracing::warn!(
781 "sync_push: quota substrate read failed for {}: {e}",
782 attribute_agent
783 );
784 skipped += 1;
785 continue;
786 }
787 }
788 // v0.7.0 L2-2 (S6-M1) — stamp `metadata.reflection_origin` on
789 // inbound reflection rows before the insert. The stamped copy
790 // carries `peer_origin`, `original_depth`, and the receiver's
791 // local cap at arrival time; the substrate row preserves the
792 // original `reflection_depth` so derived-write cap enforcement
793 // (storage::reflect) sees the same value the source peer saw.
794 // Non-reflection rows (depth == 0) pass through unchanged.
795 //
796 // #961 (SAL-boundary cleanup): use the `db::` namespace alias
797 // (which re-exports `crate::storage` from `src/lib.rs:52`) so
798 // every sqlite-direct call in this branch reads as a single
799 // module surface — keeps the alias hygiene that the rest of
800 // this file already follows (`db::insert_if_newer`,
801 // `db::archive_memory`, etc.).
802 let cap_for_namespace = db::resolve_governance_policy(&lock.0, &mem.namespace)
803 .unwrap_or_else(crate::models::GovernancePolicy::default)
804 .effective_max_reflection_depth();
805 let to_insert = crate::federation::reflection_bookkeeping::stamp_reflection_origin(
806 mem,
807 &body.sender_agent_id,
808 cap_for_namespace,
809 );
810 match db::insert_if_newer(&lock.0, &to_insert) {
811 Ok(actual_id) => {
812 applied += 1;
813 // #1566 / #1579 B1 — store a dim-matching shipped
814 // vector directly (no local embed at all); anything
815 // else falls back to the deferred background embed.
816 // `se.vector.len() == se.dim` guards a malformed
817 // sender whose claimed dim disagrees with the payload.
818 // #1584 (SEC) — the dim gate is necessary but not
819 // sufficient: a shipped vector with NaN/±Inf components
820 // or a non-unit norm poisons cosine ranking.
821 // `sanitize_shipped_vector` rejects non-finite vectors
822 // and L2-normalizes the rest; `None` (or a dim mismatch)
823 // falls back to a local re-embed.
824 let clean_shipped = shipped_by_id
825 .get(mem.id.as_str())
826 .filter(|se| receiver_dim == Some(se.dim) && se.vector.len() == se.dim)
827 .and_then(|se| {
828 crate::federation::sanitize_shipped_vector(&se.vector)
829 .map(|v| (v, se.model.clone()))
830 });
831 match clean_shipped {
832 Some((vector, model)) => {
833 match db::set_embedding(&lock.0, &actual_id, &vector) {
834 Ok(()) => hnsw_updates.push((actual_id, vector)),
835 Err(e) => {
836 tracing::warn!(
837 "sync_push: storing shipped embedding failed for \
838 {actual_id} (model {model}): {e} — deferring local embed",
839 );
840 deferred_embed.push((
841 actual_id,
842 crate::embeddings::embedding_document(&mem.title, &mem.content),
843 ));
844 }
845 }
846 }
847 None => deferred_embed.push((
848 actual_id,
849 crate::embeddings::embedding_document(&mem.title, &mem.content),
850 )),
851 }
852 }
853 Err(e) => {
854 // Best-effort refund so a downstream insert failure
855 // doesn't leak quota counters. `refund_op` saturates at
856 // zero so a buggy double-refund cannot poison the row.
857 // #1156 — refund on the same `(agent_id, namespace)`
858 // row the check_and_record above incremented.
859 let _ = crate::quotas::refund_op(
860 &lock.0,
861 &attribute_agent,
862 &mem.namespace,
863 crate::quotas::QuotaOp::Memory {
864 bytes: bytes_estimate,
865 },
866 );
867 tracing::warn!("sync_push: insert_if_newer failed for {}: {e}", mem.id);
868 skipped += 1;
869 }
870 }
871 }
872
873 // v0.7.0 S6-M2 — quota refusal short-circuit. The first refusal in
874 // the loop produces a 429 with X-Quota-Reset-At so callers back off
875 // (matches the HTTP POST store refusal envelope from F7 / #639).
876 if let Some(q) = first_quota_refusal.take() {
877 drop(lock);
878 // #1566 / #1579 B1 — rows applied BEFORE the refusal are
879 // committed and stay committed (the 429 covers the remainder
880 // of the batch); index their stored shipped vectors and defer
881 // the rest exactly like the success path, instead of leaving
882 // them for the next boot backfill.
883 if !hnsw_updates.is_empty() {
884 let mut idx_lock = app.vector_index.lock().await;
885 if let Some(idx) = idx_lock.as_mut() {
886 for (id, vec) in hnsw_updates {
887 idx.remove(&id);
888 idx.insert(id, vec);
889 }
890 }
891 }
892 spawn_deferred_embedding_refresh(&app, deferred_embed);
893 let reset_at = next_utc_midnight();
894 return (
895 StatusCode::TOO_MANY_REQUESTS,
896 [
897 ("x-quota-reset-at", reset_at.as_str()),
898 ("x-quota-limit", q.limit.as_str()),
899 ],
900 Json(json!({
901 "error": "QUOTA_EXCEEDED",
902 "limit": q.limit.as_str(),
903 "current": q.current,
904 "max": q.max,
905 "agent_id": q.agent_id,
906 "applied_before_refusal": applied,
907 (crate::handlers::QUOTA_REFUSED_FIELD): quota_refused,
908 "reset_at": reset_at,
909 })),
910 )
911 .into_response();
912 }
913
914 // Process deletions (v0.6.0.1 — scenario 10 fanout). Invalid ids are
915 // skipped silently; missing rows count as no-op. Peers that have
916 // already GC'd the row see identical post-state.
917 for del_id in &body.deletions {
918 if validate::validate_id(del_id).is_err() {
919 skipped += 1;
920 continue;
921 }
922 if body.dry_run {
923 noop += 1;
924 continue;
925 }
926 match db::delete(&lock.0, del_id) {
927 Ok(true) => deleted += 1,
928 Ok(false) => noop += 1,
929 Err(e) => {
930 tracing::warn!("sync_push: delete failed for {del_id}: {e}");
931 skipped += 1;
932 }
933 }
934 }
935
936 // v0.6.2 (S29): process explicit archives. Soft-move from `memories`
937 // to `archived_memories` — distinct from deletions which hard-delete.
938 // Missing rows count as no-op (peer may have already archived or
939 // never received the original write).
940 for arch_id in &body.archives {
941 if validate::validate_id(arch_id).is_err() {
942 skipped += 1;
943 continue;
944 }
945 if body.dry_run {
946 noop += 1;
947 continue;
948 }
949 match db::archive_memory(&lock.0, arch_id, Some("sync_push")) {
950 Ok(true) => archived += 1,
951 Ok(false) => noop += 1,
952 Err(e) => {
953 tracing::warn!("sync_push: archive_memory failed for {arch_id}: {e}");
954 skipped += 1;
955 }
956 }
957 }
958
959 // v0.6.2 (S29): process explicit restores — the inverse of archives.
960 // Move the row from `archived_memories` back into `memories`.
961 // No-op posture matches archives: missing rows (peer hasn't received
962 // the archive, or the row is already live) count as noop so replays
963 // and out-of-order restore/archive pairs don't error.
964 for res_id in &body.restores {
965 if validate::validate_id(res_id).is_err() {
966 skipped += 1;
967 continue;
968 }
969 if body.dry_run {
970 noop += 1;
971 continue;
972 }
973 match db::restore_archived(&lock.0, res_id) {
974 Ok(true) => restored += 1,
975 Ok(false) => noop += 1,
976 Err(e) => {
977 tracing::warn!("sync_push: restore_archived failed for {res_id}: {e}");
978 skipped += 1;
979 }
980 }
981 }
982
983 // v0.6.2 (#325): process incoming links. Duplicates are expected on
984 // retry / re-sync and collapse to a no-op via the unique index on
985 // (source_id, target_id, relation). Invalid ids are skipped silently
986 // — same posture as deletions.
987 //
988 // v0.7 H3: when a link arrives with a signature + observed_by claim,
989 // verify it against the public key associated with that claim before
990 // landing the row. Tampered signatures → reject with a warn log.
991 // Unknown observed_by (no enrolled key on this host) → accept-and-
992 // flag as `unsigned` so federation back-compat holds for peers that
993 // haven't enrolled yet. Successful verify → land with attest_level
994 // `peer_attested`.
995 let mut links_applied = 0usize;
996 for link in &body.links {
997 if validate::RequestValidator::validate_link_triple(
998 &link.source_id,
999 &link.target_id,
1000 link.relation.as_str(),
1001 )
1002 .is_err()
1003 {
1004 skipped += 1;
1005 continue;
1006 }
1007 if body.dry_run {
1008 noop += 1;
1009 continue;
1010 }
1011
1012 // Decide attest_level via the H3 verify path before insert.
1013 let attest_level = match (link.signature.as_deref(), link.observed_by.as_deref()) {
1014 (Some(sig_bytes), Some(observed_by)) => {
1015 match crate::identity::verify::lookup_peer_public_key(observed_by) {
1016 Some(pubkey) => {
1017 let signable = crate::identity::sign::SignableLink {
1018 src_id: &link.source_id,
1019 dst_id: &link.target_id,
1020 relation: link.relation.as_str(),
1021 observed_by: Some(observed_by),
1022 valid_from: link.valid_from.as_deref(),
1023 valid_until: link.valid_until.as_deref(),
1024 };
1025 match crate::identity::verify::verify(&pubkey, &signable, sig_bytes) {
1026 Ok(()) => crate::models::AttestLevel::PeerAttested.as_str(),
1027 Err(e) => {
1028 // Tampered / malformed-sig: refuse to land
1029 // the row. The receiver-side warn log is
1030 // the operator's signal that a peer is
1031 // misbehaving (or that a key rotation
1032 // got out of sync).
1033 tracing::warn!(
1034 "sync_push: signature rejected for link \
1035 ({} -> {} / {}) from observed_by={}: {e}",
1036 link.source_id,
1037 link.target_id,
1038 link.relation,
1039 observed_by
1040 );
1041 skipped += 1;
1042 continue;
1043 }
1044 }
1045 }
1046 None => {
1047 // No public key enrolled for this peer →
1048 // accept-and-flag as unsigned. Operators can
1049 // later enroll the key (`identity import`) and
1050 // re-sync to upgrade the row's attest_level on
1051 // a subsequent re-send.
1052 crate::models::AttestLevel::Unsigned.as_str()
1053 }
1054 }
1055 }
1056 // No signature on the wire (legacy v0.6.x peer) or no
1057 // observed_by claim → treat as unsigned. Same posture as
1058 // pre-H3 federation.
1059 _ => crate::models::AttestLevel::Unsigned.as_str(),
1060 };
1061
1062 match db::create_link_inbound(&lock.0, link, attest_level) {
1063 Ok(()) => links_applied += 1,
1064 Err(e) => {
1065 tracing::warn!(
1066 "sync_push: create_link_inbound failed ({} -> {} / {}): {e}",
1067 link.source_id,
1068 link.target_id,
1069 link.relation
1070 );
1071 skipped += 1;
1072 }
1073 }
1074 }
1075
1076 // v0.6.2 (S34): process incoming pending-action rows. Uses
1077 // `upsert_pending_action` so replays / races converge on the
1078 // originator's canonical row. Invalid ids skipped silently.
1079 let mut pendings_applied = 0usize;
1080 for pa in &body.pendings {
1081 if validate::validate_id(&pa.id).is_err() {
1082 skipped += 1;
1083 continue;
1084 }
1085 if body.dry_run {
1086 noop += 1;
1087 continue;
1088 }
1089 match db::upsert_pending_action(&lock.0, pa) {
1090 Ok(()) => {
1091 pendings_applied += 1;
1092 // v0.7.0 K4 — peer-originated pending rows fire the
1093 // `approval_requested` event on this peer too so local
1094 // approval-API subscribers get a uniform view of the
1095 // queue regardless of which node minted the row.
1096 // `upsert_*` is idempotent (`ON CONFLICT(id) DO UPDATE`)
1097 // — replays of the same row currently re-fire the
1098 // event; that's the documented K4 behaviour and matches
1099 // the existing `pending_action_expired` semantics. K7
1100 // (subscription reliability) layers DLQ + dedup on top.
1101 if pa.status == "pending" {
1102 crate::subscriptions::dispatch_approval_requested(&lock.0, &pa.id, &lock.1);
1103 }
1104 }
1105 Err(e) => {
1106 tracing::warn!("sync_push: upsert_pending_action failed for {}: {e}", pa.id);
1107 skipped += 1;
1108 }
1109 }
1110 }
1111
1112 // v0.6.2 (S34): process incoming pending-action decisions. No-op on
1113 // already-decided rows; that's the steady-state when the originator
1114 // and this peer both saw the decision. Rejected decisions still
1115 // transition status so retries on either side see `status != 'pending'`.
1116 let mut pending_decisions_applied = 0usize;
1117 for dec in &body.pending_decisions {
1118 if validate::validate_id(&dec.id).is_err() {
1119 skipped += 1;
1120 continue;
1121 }
1122 if body.dry_run {
1123 noop += 1;
1124 continue;
1125 }
1126 match db::decide_pending_action(&lock.0, &dec.id, dec.approved, &dec.decider) {
1127 Ok(true) => {
1128 pending_decisions_applied += 1;
1129 // On approve, replay the pending payload so the target
1130 // write (store/delete/promote) actually lands on this
1131 // peer — matches the originator's post-approve state.
1132 if dec.approved {
1133 match db::execute_pending_action(&lock.0, &dec.id) {
1134 Ok(_) => {}
1135 Err(e) => {
1136 tracing::warn!(
1137 "sync_push: execute_pending_action failed for {}: {e}",
1138 dec.id
1139 );
1140 }
1141 }
1142 }
1143 }
1144 Ok(false) => noop += 1, // already decided — converged state
1145 Err(e) => {
1146 tracing::warn!(
1147 "sync_push: decide_pending_action failed for {}: {e}",
1148 dec.id
1149 );
1150 skipped += 1;
1151 }
1152 }
1153 }
1154
1155 // v0.6.2 (S35): process incoming namespace_meta rows. Applies via
1156 // `set_namespace_standard` so the peer's inheritance-chain walk has
1157 // the originator's explicit parent link. The standard memory itself
1158 // rides on the same push via `memories` (or arrived earlier through
1159 // `broadcast_store_quorum`); the namespace-meta row closes the gap.
1160 let mut namespace_meta_applied = 0usize;
1161 for entry in &body.namespace_meta {
1162 if validate::validate_namespace(&entry.namespace).is_err()
1163 || validate::validate_id(&entry.standard_id).is_err()
1164 {
1165 skipped += 1;
1166 continue;
1167 }
1168 if body.dry_run {
1169 noop += 1;
1170 continue;
1171 }
1172 match db::set_namespace_standard(
1173 &lock.0,
1174 &entry.namespace,
1175 &entry.standard_id,
1176 entry.parent_namespace.as_deref(),
1177 ) {
1178 Ok(()) => namespace_meta_applied += 1,
1179 Err(e) => {
1180 tracing::warn!(
1181 "sync_push: set_namespace_standard failed for {}: {e}",
1182 entry.namespace
1183 );
1184 skipped += 1;
1185 }
1186 }
1187 }
1188
1189 // v0.6.2 (S35 follow-up): process incoming namespace_meta_clears. Applies
1190 // via `db::clear_namespace_standard` so the peer drops its meta row and
1191 // subsequent `get_standard` returns empty. Missing-on-peer namespaces
1192 // no-op (`changed == 0`) — replays are safe.
1193 let mut namespace_meta_cleared = 0usize;
1194 for ns in &body.namespace_meta_clears {
1195 if validate::validate_namespace(ns).is_err() {
1196 skipped += 1;
1197 continue;
1198 }
1199 if body.dry_run {
1200 noop += 1;
1201 continue;
1202 }
1203 match db::clear_namespace_standard(&lock.0, ns) {
1204 Ok(true) => namespace_meta_cleared += 1,
1205 Ok(false) => noop += 1,
1206 Err(e) => {
1207 tracing::warn!("sync_push: clear_namespace_standard failed for {ns}: {e}");
1208 skipped += 1;
1209 }
1210 }
1211 }
1212
1213 // Advance the vector clock with the highest `updated_at` we observed.
1214 // Skipped in dry-run mode since the caller is only previewing.
1215 if !body.dry_run
1216 && let Some(at) = latest_seen.as_deref()
1217 && let Err(e) = db::sync_state_observe(&lock.0, &local_agent_id, &body.sender_agent_id, at)
1218 {
1219 tracing::warn!("sync_push: sync_state_observe failed: {e}");
1220 }
1221
1222 // #1566 / #1579 B1 — the pre-#1566 synchronous embed loop lived
1223 // here (one `emb.embed()` per applied row, ~1s/row via ollama,
1224 // WHILE HOLDING the DB lock and inside the sender's quorum-ack
1225 // window). It is gone: dim-matching shipped vectors were stored
1226 // inline above (cheap UPDATE under the already-held lock), and
1227 // every other applied row is handed to the detached background
1228 // task spawned after the response is decided below.
1229
1230 // Receiver's current clock, returned so the sender can learn which
1231 // peers the receiver has seen. Phase 3 Task 3a.1 will use this to
1232 // short-circuit redundant pushes.
1233 let receiver_clock = db::sync_state_load(&lock.0, &local_agent_id)
1234 .unwrap_or_else(|_| crate::models::VectorClock::default());
1235
1236 // Release DB lock before touching the HNSW index — the vector index
1237 // has its own mutex and holding both serializes unrelated writers.
1238 drop(lock);
1239 if !hnsw_updates.is_empty() {
1240 let mut idx_lock = app.vector_index.lock().await;
1241 if let Some(idx) = idx_lock.as_mut() {
1242 for (id, vec) in hnsw_updates {
1243 idx.remove(&id);
1244 idx.insert(id, vec);
1245 }
1246 }
1247 }
1248
1249 // #1566 / #1579 B1 — ack-after-commit: hand the rows that still
1250 // need a locally-computed vector to the detached background task.
1251 // The HTTP response (the sender's quorum ack) returns immediately.
1252 spawn_deferred_embedding_refresh(&app, deferred_embed);
1253
1254 (
1255 StatusCode::OK,
1256 Json(json!({
1257 "applied": applied,
1258 "deleted": deleted,
1259 "archived": archived,
1260 "restored": restored,
1261 "links_applied": links_applied,
1262 "pendings_applied": pendings_applied,
1263 "pending_decisions_applied": pending_decisions_applied,
1264 "namespace_meta_applied": namespace_meta_applied,
1265 "namespace_meta_cleared": namespace_meta_cleared,
1266 "noop": noop,
1267 "skipped": skipped,
1268 (crate::handlers::QUOTA_REFUSED_FIELD): quota_refused,
1269 "dry_run": body.dry_run,
1270 "receiver_agent_id": local_agent_id,
1271 "receiver_clock": receiver_clock,
1272 })),
1273 )
1274 .into_response()
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279 use super::*;
1280 use axum::http::HeaderValue;
1281
1282 /// v0.7.0 #1049 (Agent-5 #9) — `extract_peer_id` validates the
1283 /// header value through `crate::validate::validate_agent_id`
1284 /// before returning. The validator rejects whitespace, null
1285 /// bytes, control characters (CR/LF), shell metacharacters,
1286 /// and anything over 128 bytes. This unit suite pins both the
1287 /// happy path (legitimate agent-id-shaped values pass) and
1288 /// representative rejection arms.
1289 fn build_headers(value: &str) -> Option<HeaderMap> {
1290 // HeaderMap rejects some invalid bytes at insertion time
1291 // (e.g. ASCII control chars). Use HeaderValue::from_bytes
1292 // and ignore failures so the test can probe the validator
1293 // path; if HeaderValue refuses the byte sequence too, the
1294 // header is unreachable from the wire so we skip that case.
1295 let hv = HeaderValue::from_bytes(value.as_bytes()).ok()?;
1296 let mut h = HeaderMap::new();
1297 h.insert(PEER_ID_HEADER, hv);
1298 Some(h)
1299 }
1300
1301 #[test]
1302 fn extract_peer_id_accepts_legitimate_agent_id_1049() {
1303 let h = build_headers("ai:peer-alpha").expect("legitimate value fits in HeaderValue");
1304 assert_eq!(extract_peer_id(&h), Some("ai:peer-alpha"));
1305 }
1306
1307 #[test]
1308 fn extract_peer_id_accepts_hostname_form_1049() {
1309 let h = build_headers("host:laptop.local:pid-42").expect("legitimate value fits");
1310 assert_eq!(extract_peer_id(&h), Some("host:laptop.local:pid-42"));
1311 }
1312
1313 #[test]
1314 fn extract_peer_id_rejects_value_with_whitespace_1049() {
1315 let h = build_headers("peer one").expect("space fits in HeaderValue");
1316 assert_eq!(
1317 extract_peer_id(&h),
1318 None,
1319 "#1049: whitespace in peer-id MUST be rejected by the validator"
1320 );
1321 }
1322
1323 #[test]
1324 fn extract_peer_id_rejects_value_with_shell_metacharacters_1049() {
1325 let h = build_headers("peer$attacker").expect("$ fits in HeaderValue");
1326 assert_eq!(
1327 extract_peer_id(&h),
1328 None,
1329 "#1049: shell metacharacters in peer-id MUST be rejected"
1330 );
1331 }
1332
1333 #[test]
1334 fn extract_peer_id_rejects_oversized_value_1049() {
1335 // 129-byte string exceeds the validator's 1..=128 length cap.
1336 let oversized = "a".repeat(129);
1337 let h = build_headers(&oversized).expect("129-byte ASCII fits in HeaderValue");
1338 assert_eq!(
1339 extract_peer_id(&h),
1340 None,
1341 "#1049: oversized peer-id (>128 bytes) MUST be rejected"
1342 );
1343 }
1344
1345 #[test]
1346 fn extract_peer_id_absent_returns_none() {
1347 let h = HeaderMap::new();
1348 assert_eq!(extract_peer_id(&h), None);
1349 }
1350}