Skip to main content

ai_memory/handlers/
federation_receive.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4use axum::{
5    Json,
6    body::Bytes,
7    extract::State,
8    http::{HeaderMap, StatusCode},
9    response::{IntoResponse, Response},
10};
11use serde::Deserialize;
12use serde_json::json;
13
14use crate::db;
15use crate::federation::peer_attestation::{
16    self, AttestError, PEER_ID_HEADER, PeerAttestationConfig,
17};
18use crate::models::{Memory, MemoryLink};
19use crate::validate;
20
21use super::AppState;
22#[cfg(feature = "sal")]
23use super::StorageBackend;
24#[cfg(feature = "sal")]
25use super::federation_signing_check::sync_push_via_store;
26use super::federation_signing_check::verify_signature_or_reject;
27
28/// Tracing target for receive-side peer-attestation checks
29/// (#1558 tracing-target SSOT).
30const ATTESTATION_TRACE_TARGET: &str = "federation::attestation";
31
32/// v0.7.0 federation security — extract the peer's self-claimed
33/// `x-peer-id` header. Lowercase form per HTTP/2 wire convention;
34/// axum's `HeaderMap` lookup is case-insensitive so callers can send
35/// the canonical `X-Peer-Id`.
36///
37/// v0.7.0 #1049 (Agent-5 #9) — validates the header value through
38/// `validate::validate_agent_id` before returning so the raw header
39/// content cannot inject CRLF/terminal-escape sequences into
40/// downstream tracing log files or be smuggled into the
41/// `FederationNonceCache` key (where exotic bytes would create
42/// per-peer cache fragmentation an attacker could weaponise to
43/// flood-evict legitimate peer entries). Returns `None` for any
44/// header that fails the agent_id shape check — same observable
45/// outcome as the header being absent.
46pub(super) fn extract_peer_id(headers: &HeaderMap) -> Option<&str> {
47    let raw = headers.get(PEER_ID_HEADER).and_then(|v| v.to_str().ok())?;
48    // Reject anything that fails the agent_id shape per CLAUDE.md
49    // §"Agent Identity": `^[A-Za-z0-9_\-:@./]{1,128}$`. The strict
50    // shape is the load-bearing property — no whitespace, no nulls,
51    // no control chars (CRLF), no shell metacharacters.
52    if crate::validate::validate_agent_id(raw).is_err() {
53        tracing::warn!(
54            target: "federation::peer_id",
55            "extract_peer_id: dropped malformed X-Peer-Id header (#1049 validation gate)"
56        );
57        return None;
58    }
59    Some(raw)
60}
61
62/// v0.7.0 #238 — render a 403 envelope when the body-claimed
63/// `sender_agent_id` does not attest to the wire-level `x-peer-id`
64/// header. Surfaces both values so the operator can diff exactly
65/// what the peer claimed against what the substrate expected.
66fn attestation_refusal_response(err: &AttestError) -> Response {
67    let (claimed, peer_header) = match err {
68        AttestError::HeaderMissing => (String::new(), String::new()),
69        AttestError::Mismatch {
70            claimed,
71            peer_header,
72        } => (claimed.clone(), peer_header.clone()),
73    };
74    (
75        StatusCode::FORBIDDEN,
76        Json(json!({
77            "error": err.tag(),
78            "claimed": claimed,
79            "peer_header": peer_header,
80            "note": "set AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1 to opt out (legacy peers); \
81                     pre-v0.7.0 federation peers must be upgraded to send `x-peer-id`.",
82        })),
83    )
84        .into_response()
85}
86
87// ---------------------------------------------------------------------------
88// Phase 3 foundation (issue #224) — HTTP sync endpoints.
89//
90// These ship in v0.6.0 GA as SKELETONS running today's timestamp-aware merge
91// (`db::insert_if_newer`). Field-level CRDT-lite merge rules, streaming,
92// resume-on-interrupt, and per-peer auth tokens are v0.8.0 targets.
93// ---------------------------------------------------------------------------
94
95/// v0.7.0 S6-LOW2 — log a warning when the sender's claimed wall-clock
96/// is more than this many seconds ahead of the receiver's. Threshold is
97/// deliberately permissive: ~1 minute of skew is normal for hosts with
98/// NTP drift after a sleep cycle. Anything beyond that is operator-
99/// signal that the cluster's clocks need attention.
100const CLOCK_SKEW_WARN_THRESHOLD_SECS: i64 = 60;
101
102/// v0.7.0 S6-LOW2 — observability-only clock-skew check. Compares the
103/// sender's reported wall-clock (or the highest entry in
104/// `sender_clock.entries` when the wall-clock field is absent) against
105/// the receiver's `chrono::Utc::now()`. When the delta exceeds
106/// [`CLOCK_SKEW_WARN_THRESHOLD_SECS`] in either direction, emits a
107/// `tracing::warn!` so operators can spot a misconfigured peer. NEVER
108/// rejects the push — federation must be tolerant of clock drift; the
109/// log is the entire enforcement surface.
110pub(super) fn check_sender_clock_skew(sender_agent_id: &str, body: &SyncPushBody) {
111    let sender_ts_str: Option<&str> = body
112        .sender_wall_clock
113        .as_deref()
114        .or_else(|| body.sender_clock.entries.values().max().map(String::as_str));
115    let Some(ts_str) = sender_ts_str else {
116        return; // No clock signal at all → nothing to compare.
117    };
118    let Ok(sender_at) = chrono::DateTime::parse_from_rfc3339(ts_str) else {
119        tracing::debug!(
120            sender = %sender_agent_id,
121            sender_ts = %ts_str,
122            "sync_push: sender clock not RFC3339; skipping skew check"
123        );
124        return;
125    };
126    let now = chrono::Utc::now();
127    let skew_secs = sender_at
128        .with_timezone(&chrono::Utc)
129        .signed_duration_since(now)
130        .num_seconds();
131    if skew_secs.abs() > CLOCK_SKEW_WARN_THRESHOLD_SECS {
132        tracing::warn!(
133            target: "federation::clock_skew",
134            sender = %sender_agent_id,
135            skew_secs,
136            sender_ts = %ts_str,
137            receiver_ts = %now.to_rfc3339(),
138            "sync_push: sender_clock skew exceeds {CLOCK_SKEW_WARN_THRESHOLD_SECS}s threshold \
139             (observability-only; push accepted)",
140        );
141    }
142}
143
144/// v0.7.0 S6-M2 — per-agent quota gate for federation receive. Closes
145/// the F7 gap (#639) where mTLS-authenticated peers could push past
146/// the local `agent_quotas` storage caps that would have blocked an
147/// equivalent HTTP `POST /memories` from the same identity.
148///
149/// `attribute_agent` is the identity the substrate will charge for the
150/// row. Resolution precedence (mTLS-attested first; falls back to the
151/// claim chain when no cert peeking is available):
152///   1. `mem.metadata.agent_id` — the original author of the row
153///      (NHI provenance preserved across federation). This is what
154///      `quota_status` reports against, so charging this id makes the
155///      receiver-side quota a true mirror of the originator's daily
156///      budget. A misbehaving peer cannot substitute another agent's
157///      id without crashing the upstream signature check (H3).
158///   2. `sender_agent_id` — substrate identity of the peer that
159///      delivered the row. Used when the row carries no
160///      `metadata.agent_id` (legacy / unauthored federation push).
161///
162/// Returns `Ok(())` on a clean check + record (counters incremented),
163/// `Err(QuotaError)` on a refusal. The caller renders the refusal as
164/// `429 Too Many Requests` with an `X-Quota-Reset-At` header.
165pub(super) fn attribute_agent_for_quota(sender_agent_id: &str, mem: &Memory) -> String {
166    mem.metadata
167        .get("agent_id")
168        .and_then(serde_json::Value::as_str)
169        .map(str::to_string)
170        .unwrap_or_else(|| sender_agent_id.to_string())
171}
172
173/// v0.7.0 S6-M2 — compute the next UTC midnight in RFC3339, used as
174/// the `X-Quota-Reset-At` header value when a federation receive is
175/// refused for hitting `memories_per_day` or `links_per_day`. Storage
176/// caps reset on midnight UTC via `quotas::reset_daily`. The header
177/// matches the HTTP POST refusal surface so clients have one timer
178/// to consult regardless of which entry point hit the cap.
179pub(super) fn next_utc_midnight() -> String {
180    use chrono::{Duration, Timelike};
181    let now = chrono::Utc::now();
182    let next = now
183        .with_hour(0)
184        .and_then(|t| t.with_minute(0))
185        .and_then(|t| t.with_second(0))
186        .and_then(|t| t.with_nanosecond(0))
187        .map(|midnight_today| midnight_today + Duration::days(1))
188        .unwrap_or_else(|| now + Duration::days(1));
189    next.to_rfc3339()
190}
191
192/// #1566 / #1579 B1 — deferred receive-side embedding refresh.
193///
194/// Spawns a detached task that embeds each `(memory_id,
195/// embedding_document)` pair OFF the request path: the embed itself
196/// runs on the blocking pool (the embedder is CPU-/network-heavy —
197/// ~1s/row via ollama), the DB lock is held only for the per-row
198/// `set_embedding` UPDATE, and the HNSW index is touched last (its
199/// own mutex, never overlapping the DB lock). Errors are logged and
200/// the row is left for the boot-time embed backfill
201/// (`db::get_unembedded_ids` selects `embedding IS NULL`, which covers
202/// federation-applied rows) — same best-effort posture as the
203/// pre-#1566 inline loop, minus the quorum-window coupling.
204///
205/// No-op when `rows` is empty or the receiver runs keyword-only (no
206/// embedder): rows stay FTS-recallable, matching the pre-#1566
207/// behaviour where the embed loop was gated on `app.embedder`.
208fn spawn_deferred_embedding_refresh(app: &AppState, rows: Vec<(String, String)>) {
209    if rows.is_empty() || app.embedder.as_ref().as_ref().is_none() {
210        return;
211    }
212    let db = app.db.clone();
213    let embedder = app.embedder.clone();
214    let vector_index = app.vector_index.clone();
215    tokio::spawn(async move {
216        for (id, text) in rows {
217            let emb = embedder.clone();
218            let embed_res =
219                tokio::task::spawn_blocking(move || emb.as_ref().as_ref().map(|e| e.embed(&text)))
220                    .await;
221            let vec = match embed_res {
222                Ok(Some(Ok(v))) => v,
223                Ok(Some(Err(e))) => {
224                    tracing::warn!("sync_push: deferred embed failed for {id}: {e}");
225                    continue;
226                }
227                // Embedder vanished (impossible — checked above and the
228                // Arc is immutable) — nothing left to do for any row.
229                Ok(None) => return,
230                Err(e) => {
231                    tracing::warn!("sync_push: deferred embed join error for {id}: {e}");
232                    continue;
233                }
234            };
235            {
236                let lock = db.lock().await;
237                if let Err(e) = db::set_embedding(&lock.0, &id, &vec) {
238                    tracing::warn!("sync_push: set_embedding failed for {id}: {e}");
239                    continue;
240                }
241            }
242            let mut idx_lock = vector_index.lock().await;
243            if let Some(idx) = idx_lock.as_mut() {
244                idx.remove(&id);
245                idx.insert(id.clone(), vec);
246            }
247        }
248    });
249}
250
251/// Request body for `POST /api/v1/sync/push`.
252#[derive(Deserialize)]
253pub struct SyncPushBody {
254    /// Claimed `agent_id` of the peer pushing data. Recorded in
255    /// `sync_state` for vector clock advancement.
256    ///
257    /// v0.7.0 #238 — this body field is now ATTESTED against the
258    /// wire-level `x-peer-id` HTTP header before any substrate write
259    /// fires. See `src/federation/peer_attestation.rs` for the
260    /// decision matrix, env bypass, and operator runbook. Pre-v0.7.0
261    /// federation clients that don't send `x-peer-id` are accepted
262    /// only when the operator opts in via
263    /// `AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1`.
264    pub sender_agent_id: String,
265    /// Vector clock the sender had at push time. v0.7.0 S6-LOW2: now
266    /// consulted for observability-only clock-skew detection — the
267    /// receiver logs a `tracing::warn!` when the sender's latest
268    /// claimed observation is >60s ahead of the receiver's wall clock.
269    /// Full clock reconciliation (CRDT-lite merge) lands with Task 3a.1.
270    #[serde(default)]
271    pub sender_clock: crate::models::VectorClock,
272    /// v0.7.0 S6-LOW2 — sender's wall-clock RFC3339 timestamp at push
273    /// time. Optional: when absent, skew detection falls back to the
274    /// highest timestamp in `sender_clock.entries`. Observability-only;
275    /// never enforced.
276    #[serde(default)]
277    pub sender_wall_clock: Option<String>,
278    /// Memories the sender is offering. Applied via the existing
279    /// timestamp-aware merge (`insert_if_newer`).
280    pub memories: Vec<Memory>,
281    /// #1566 / #1579 B1 — source-side embedding vectors for the rows
282    /// in `memories` (embed-once-replicate-vector). Inside the
283    /// Ed25519-signed body bytes, so vector integrity is covered by
284    /// the same `X-Memory-Sig` + nonce replay protection as the rows.
285    /// `#[serde(default)]` keeps decode TOLERANT of absence: pushes
286    /// from pre-#1566 peers parse identically (empty vec), and the
287    /// receive path falls back to the deferred background-embed for
288    /// any applied row without a dim-matching shipped vector.
289    #[serde(default)]
290    pub embeddings: Vec<crate::federation::ShippedEmbedding>,
291    /// Memory IDs the sender has deleted and wants propagated. Applied
292    /// via `db::delete`. v0.6.0.1: simple remove (no tombstone row); a
293    /// concurrent newer `insert_if_newer` from another peer could revive
294    /// the row — a Last-Writer-Wins quirk we live with until v0.7's
295    /// CRDT-lite tombstone table lands. In the common 4-node mesh, the
296    /// same delete reaches every peer well before any revival window.
297    #[serde(default)]
298    pub deletions: Vec<String>,
299    /// v0.6.2 (S29): memory IDs the sender has explicitly archived and
300    /// wants propagated. Applied via `db::archive_memory` — a soft move
301    /// from `memories` to `archived_memories`. Missing-on-peer IDs no-op.
302    /// Distinct from `deletions`, which is a hard DELETE.
303    #[serde(default)]
304    pub archives: Vec<String>,
305    /// v0.6.2 (S29): memory IDs the sender has restored from archive and
306    /// wants propagated. Applied via `db::restore_archived` — moves the
307    /// row from `archived_memories` back into `memories`. The inverse of
308    /// `archives`. Missing-on-peer IDs (no row in the peer's archive
309    /// table, or a live row already exists) no-op so replays are safe.
310    #[serde(default)]
311    pub restores: Vec<String>,
312    /// v0.6.2 (#325): memory links the sender wants propagated. Applied
313    /// via `db::create_link` on each peer. Duplicates are a no-op thanks
314    /// to the unique `(source_id, target_id, relation)` constraint on
315    /// `memory_links`.
316    #[serde(default)]
317    pub links: Vec<MemoryLink>,
318    /// v0.6.2 (S34): pending-action rows the sender wants propagated.
319    /// Applied via `db::upsert_pending_action` — preserves the originator's
320    /// id + status + approvals so the cluster agrees on pending state.
321    /// Without this, `POST /api/v1/pending/{id}/approve` on a peer 404s
322    /// because the row only exists on the originator.
323    #[serde(default)]
324    pub pendings: Vec<crate::models::PendingAction>,
325    /// v0.6.2 (S34): pending-action decisions the sender wants propagated
326    /// so approve/reject on any node lands consistently. Applied via
327    /// `db::decide_pending_action` — already-decided rows no-op, replay-safe.
328    #[serde(default)]
329    pub pending_decisions: Vec<crate::models::PendingDecision>,
330    /// v0.6.2 (S35): namespace-standard meta rows the sender wants
331    /// propagated. Applied via `db::set_namespace_standard(conn, ns,
332    /// standard_id, parent.as_deref())` so the peer's inheritance-chain
333    /// walk uses the originator's explicit parent (not a locally
334    /// auto-detected one).
335    #[serde(default)]
336    pub namespace_meta: Vec<crate::models::NamespaceMetaEntry>,
337    /// v0.6.2 (S35 follow-up): namespaces whose standard the sender has
338    /// *cleared* and wants propagated. Applied via `db::clear_namespace_standard`
339    /// — missing-on-peer namespaces no-op so replays are safe. Without
340    /// this, alice clearing a standard on node-1 left the row visible on
341    /// node-2's peer, breaking cross-peer rule-lifecycle assertions.
342    #[serde(default)]
343    pub namespace_meta_clears: Vec<String>,
344    /// Preview mode — classify and count, do not write.
345    #[serde(default)]
346    pub dry_run: bool,
347}
348
349#[derive(Deserialize)]
350pub struct SyncSinceQuery {
351    /// Return memories with `updated_at > since`. Absent = full snapshot.
352    pub since: Option<String>,
353    /// Pagination cap. Defaults to 500.
354    pub limit: Option<usize>,
355    /// Caller's claimed `agent_id`; optional but recorded in `sync_state`
356    /// so the caller can later push incremental updates.
357    pub peer: Option<String>,
358}
359
360/// v0.7.0 Wave-3 Continuation 2 — postgres-backed federation push.
361///
362/// Dispatches each `Memory` row through `app.store.apply_remote_memory`
363/// (idempotent insert-if-newer) and each link / deletion through the
364/// matching trait method. Other subcollections (pendings, archives,
365/// restores, namespace_meta, pending_decisions) are governance- /
366/// archive-state-machine concerns whose write paths live on tables
367/// not yet trait-covered; they surface as skipped with a structured
368/// `unsupported_on_postgres` count in the response envelope so a
369/// heterogeneous (sqlite ↔ postgres) federation degrades gracefully
370/// without silent drops.
371///
372/// Heterogeneous federation contract: a sqlite peer's push of N
373/// memories + M links + K deletions reaches steady-state on the
374/// postgres receiver via the trait calls. Audit emission for every
375/// accepted federation push fires through `audit::emit` regardless
376/// of backend (Phase 9).
377pub async fn sync_push(
378    State(app): State<AppState>,
379    headers: HeaderMap,
380    body_bytes: Bytes,
381) -> impl IntoResponse {
382    // v0.7.0 #791 — verify the per-message signature BEFORE
383    // deserialising the body. Keeps the verifier's input identical
384    // to the wire bytes (signer + verifier MUST agree byte-for-byte).
385    let peer_header_owned = extract_peer_id(&headers).map(str::to_string);
386
387    // v0.7.0 #1056 (Agent-2 #6) — TOFU spoofing guard. The
388    // (no sig, no enrolled key) arm of `verify_signature_or_reject`
389    // allows the request through with a WARN ("strict enforcement
390    // skipped") so an unenrolled federation pair stays operational.
391    // That permissive posture lets an attacker who knows a legitimate
392    // peer's id but has NOT yet been enrolled (heterogeneous rollout
393    // window — operator enrols half the mesh) impersonate the
394    // unenrolled half. Close the window by refusing any push whose
395    // claimed `x-peer-id` is NOT in the operator-configured peer
396    // allowlist (`AI_MEMORY_FED_PEER_ATTESTATION`). When NO allowlist
397    // is configured (the default zero-config state), this gate is a
398    // no-op and the legacy posture stands — so the security uplift
399    // only fires when the operator has explicitly enrolled peers.
400    if let Some(peer_id) = peer_header_owned.as_deref() {
401        let attest_cfg = peer_attestation::PeerAttestationConfig::from_env();
402        if attest_cfg.has_allowlist() && attest_cfg.scope_for(peer_id).is_none() {
403            tracing::warn!(
404                target: ATTESTATION_TRACE_TARGET,
405                peer_id = %peer_id,
406                "sync_push: x-peer-id is not in operator allowlist — refusing (#1056 TOFU guard)"
407            );
408            return (
409                StatusCode::UNAUTHORIZED,
410                Json(json!({
411                    "error": "x_peer_id_not_in_allowlist",
412                    "note": "#1056: x-peer-id is not in AI_MEMORY_FED_PEER_ATTESTATION; \
413                             enrol the peer or unset the env to restore zero-config posture.",
414                })),
415            )
416                .into_response();
417        }
418    }
419    // v0.7.0 #922 — chained nonce-freshness check after signature verifies.
420    if let Some(rejection) = verify_signature_or_reject(
421        &headers,
422        &body_bytes,
423        peer_header_owned.as_deref(),
424        &app.federation_nonce_cache,
425    ) {
426        return rejection;
427    }
428
429    // Deserialise the body now that the signature has been verified.
430    let body: SyncPushBody = match serde_json::from_slice(&body_bytes) {
431        Ok(b) => b,
432        Err(e) => {
433            return (
434                StatusCode::BAD_REQUEST,
435                Json(json!({"error": format!("malformed sync_push body: {e}")})),
436            )
437                .into_response();
438        }
439    };
440
441    let state = app.db.clone();
442
443    // v0.7.0 #238 — body-claimed sender_agent_id MUST attest against
444    // the wire-level `x-peer-id` header (or be the unauthored-push
445    // legacy shape). Backwards-compat via
446    // `AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1`. Runs BEFORE the
447    // postgres-dispatch branch so both backends share the same
448    // refusal posture. See `src/federation/peer_attestation.rs`.
449    // (peer_header_owned already extracted above for signature check)
450    let attest_cfg = PeerAttestationConfig::from_env();
451    if !peer_attestation::trust_body_agent_id_bypass() {
452        if let Err(e) = peer_attestation::attest_sender(
453            peer_header_owned.as_deref(),
454            Some(body.sender_agent_id.as_str()),
455            &attest_cfg,
456        ) {
457            tracing::warn!(
458                target: ATTESTATION_TRACE_TARGET,
459                tag = e.tag(),
460                claimed = %body.sender_agent_id,
461                peer_header = %peer_header_owned.as_deref().unwrap_or(""),
462                "sync_push: sender_agent_id failed attestation against x-peer-id header"
463            );
464            return attestation_refusal_response(&e);
465        }
466    } else {
467        // Bypass set — log once per request at WARN so the operator
468        // can see the legacy posture is in effect.
469        tracing::warn!(
470            target: ATTESTATION_TRACE_TARGET,
471            "sync_push: AI_MEMORY_FED_TRUST_BODY_AGENT_ID=1 — bypassing #238 \
472             sender_agent_id attestation (legacy compat)"
473        );
474    }
475
476    // v0.7.0 Wave-3 Continuation 2 — postgres-backed federation
477    // dispatches through the SAL trait for memories / deletions /
478    // links. Pendings / archives / restores / namespace_meta /
479    // pending_decisions remain sqlite-only (governance write paths
480    // and archive-state-machine state sit on tables not yet covered
481    // by the trait surface — those subcollections, when present in a
482    // push from a sqlite peer, surface in `skipped` with a structured
483    // note in the response envelope).
484    #[cfg(feature = "sal")]
485    if matches!(app.storage_backend, StorageBackend::Postgres) {
486        return sync_push_via_store(app, headers, body).await;
487    }
488
489    if let Err(e) = validate::validate_agent_id(&body.sender_agent_id) {
490        return (
491            StatusCode::BAD_REQUEST,
492            Json(json!({"error": format!("invalid sender_agent_id: {e}")})),
493        )
494            .into_response();
495    }
496    // Cap memories per push, matching the bulk-create limit. Without
497    // this a malicious peer with a valid mTLS cert could flood the
498    // receiver and bottleneck the shared SQLite Mutex (red-team #242).
499    if body.memories.len() > app.max_page_size {
500        return (
501            StatusCode::BAD_REQUEST,
502            Json(json!({
503                "error": format!("sync_push limited to {} memories per request", app.max_page_size)
504            })),
505        )
506            .into_response();
507    }
508    // #1566 / #1579 B1 — the shipped-vector array is bounded by the
509    // same cap as its sibling subcollections (red-team #242 posture:
510    // vectors are the LARGEST per-element payload on this surface, so
511    // an unbounded array would be the cheapest flood vector).
512    if body.embeddings.len() > app.max_page_size {
513        return (
514            StatusCode::BAD_REQUEST,
515            Json(json!({
516                "error": format!(
517                    "sync_push limited to {} embeddings per request",
518                    app.max_page_size
519                )
520            })),
521        )
522            .into_response();
523    }
524    if body.deletions.len() > app.max_page_size {
525        return (
526            StatusCode::BAD_REQUEST,
527            Json(json!({
528                "error": format!("sync_push limited to {} deletions per request", app.max_page_size)
529            })),
530        )
531            .into_response();
532    }
533    if body.archives.len() > app.max_page_size {
534        return (
535            StatusCode::BAD_REQUEST,
536            Json(json!({
537                "error": format!("sync_push limited to {} archives per request", app.max_page_size)
538            })),
539        )
540            .into_response();
541    }
542    if body.restores.len() > app.max_page_size {
543        return (
544            StatusCode::BAD_REQUEST,
545            Json(json!({
546                "error": format!("sync_push limited to {} restores per request", app.max_page_size)
547            })),
548        )
549            .into_response();
550    }
551    if body.pendings.len() > app.max_page_size {
552        return (
553            StatusCode::BAD_REQUEST,
554            Json(json!({
555                "error": format!("sync_push limited to {} pendings per request", app.max_page_size)
556            })),
557        )
558            .into_response();
559    }
560    if body.pending_decisions.len() > app.max_page_size {
561        return (
562            StatusCode::BAD_REQUEST,
563            Json(json!({
564                "error": format!(
565                    "sync_push limited to {} pending_decisions per request",
566                    app.max_page_size
567                )
568            })),
569        )
570            .into_response();
571    }
572    if body.namespace_meta.len() > app.max_page_size {
573        return (
574            StatusCode::BAD_REQUEST,
575            Json(json!({
576                "error": format!(
577                    "sync_push limited to {} namespace_meta per request",
578                    app.max_page_size
579                )
580            })),
581        )
582            .into_response();
583    }
584    if body.namespace_meta_clears.len() > app.max_page_size {
585        return (
586            StatusCode::BAD_REQUEST,
587            Json(json!({
588                "error": format!(
589                    "sync_push limited to {} namespace_meta_clears per request",
590                    app.max_page_size
591                )
592            })),
593        )
594            .into_response();
595    }
596    // #1556 — `links` was the sole subcollection missing this cap. The link
597    // loop below does a synchronous insert (and an Ed25519 verify when the
598    // link carries signature+observed_by) per element while holding the shared
599    // write Mutex; without a bound a peer could send ~15-20k links per 2 MiB
600    // body (15-20x every sibling cap) to saturate the lock — the red-team #242
601    // DoS the other caps exist to prevent. Checked pre-lock like its siblings.
602    if body.links.len() > app.max_page_size {
603        return (
604            StatusCode::BAD_REQUEST,
605            Json(json!({
606                "error": format!(
607                    "sync_push limited to {} links per request",
608                    app.max_page_size
609                )
610            })),
611        )
612            .into_response();
613    }
614    // Receiver's local identity — default to the caller-supplied header,
615    // fall back to the anonymous placeholder. Recorded in sync_state rows.
616    let header_agent_id = headers
617        .get(crate::HEADER_AGENT_ID)
618        .and_then(|v| v.to_str().ok());
619    let local_agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
620        Ok(id) => id,
621        Err(e) => {
622            return (
623                StatusCode::BAD_REQUEST,
624                Json(json!({"error": format!("invalid x-agent-id: {e}")})),
625            )
626                .into_response();
627        }
628    };
629
630    // v0.7.0 S6-LOW2 — observability-only sender_clock skew detection.
631    // Logs a warn when the sender's clock claim is >60s out from ours;
632    // does not gate the push. Federation must be tolerant of drift.
633    check_sender_clock_skew(&body.sender_agent_id, &body);
634
635    let lock = state.lock().await;
636    let mut applied = 0usize;
637    let mut noop = 0usize;
638    let mut skipped = 0usize;
639    let mut deleted = 0usize;
640    let mut archived = 0usize;
641    let mut restored = 0usize;
642    let mut latest_seen: Option<String> = None;
643    // v0.7.0 S6-M2 — federation quota refusals. Counted alongside
644    // `skipped` so the existing response envelope shape doesn't change,
645    // and surfaced as a distinct field so an operator can tell the
646    // difference between "peer pushed garbage" and "peer overran its
647    // daily cap". The first quota refusal also short-circuits the
648    // whole memory loop with a 429 response (matches the HTTP POST
649    // store refusal: callers MUST back off, not just skip the offender).
650    let mut quota_refused = 0usize;
651    let mut first_quota_refusal: Option<crate::quotas::QuotaError> = None;
652
653    // v0.6.0.1 (#322): peers that apply a synced memory must also refresh
654    // their embedding + HNSW index so downstream semantic recall surfaces
655    // the row. Without this, scenario-18 observed a2a-hermes r14 black-hole
656    // pattern: substrate CRUD fanout works, but semantic recall on peers
657    // silently misses propagated writes.
658    //
659    // #1566 / #1579 B1 (2026-06-10) — embed-once-replicate-vector +
660    // ack-after-commit. The pre-#1566 shape embedded every applied row
661    // synchronously (~1s/row via ollama) while STILL HOLDING the DB
662    // lock, inside the sender's quorum-ack window — the mechanism
663    // behind the deadline_exceeded → DLQ cascade (the 62k-row #1578
664    // event) and the up-to-9× duplicate embedding across the fleet.
665    // Now:
666    //   - a dim-matching shipped vector is stored directly under the
667    //     already-held lock (one cheap UPDATE, microseconds);
668    //   - everything else is DEFERRED to a detached background task
669    //     spawned after the response is decided (see
670    //     `spawn_deferred_embedding_refresh`), so the ack never waits
671    //     on the embedder. FTS keeps the rows keyword-recallable in
672    //     the gap, and the boot-time embed backfill
673    //     (`db::get_unembedded_ids` — `embedding IS NULL` covers
674    //     federation-applied rows) is the restart safety net.
675    let receiver_dim = app
676        .embedder
677        .as_ref()
678        .as_ref()
679        .map(crate::embeddings::Embedder::dim);
680    let shipped_by_id: std::collections::HashMap<&str, &crate::federation::ShippedEmbedding> = body
681        .embeddings
682        .iter()
683        .map(|se| (se.memory_id.as_str(), se))
684        .collect();
685    let mut deferred_embed: Vec<(String, String)> = Vec::new();
686    let mut hnsw_updates: Vec<(String, Vec<f32>)> = Vec::new();
687    for mem in &body.memories {
688        if let Err(e) = validate::RequestValidator::validate_memory(mem) {
689            tracing::warn!("sync_push: skipping memory {} ({}): {e}", mem.id, mem.title);
690            skipped += 1;
691            continue;
692        }
693        if latest_seen
694            .as_deref()
695            .is_none_or(|current| mem.updated_at.as_str() > current)
696        {
697            latest_seen = Some(mem.updated_at.clone());
698        }
699        if body.dry_run {
700            noop += 1;
701            continue;
702        }
703        // v0.7.0 S6-M2 — per-agent quota gate. F7 (#639) closed this
704        // on the HTTP POST store path but federation receive was a
705        // back-door bypass: an mTLS peer could push N memories per
706        // second past the local `agent_quotas.max_memories_per_day`
707        // ceiling because `insert_if_newer` is the substrate-level
708        // upsert and doesn't consult quotas. Charge each accepted
709        // memory against the original author's quota row so the cap
710        // is a true cluster-wide budget. On refusal: emit a signed
711        // refusal event (for the cryptographic audit chain) and
712        // short-circuit the loop with `quota_refused`; the outer
713        // handler renders 429 + X-Quota-Reset-At so callers back off.
714        let attribute_agent = attribute_agent_for_quota(&body.sender_agent_id, mem);
715        let bytes_estimate =
716            i64::try_from(mem.title.len() + mem.content.len() + mem.metadata.to_string().len())
717                .unwrap_or(i64::MAX);
718        // v0.7.0 #1156 — charge against the per-namespace accounting
719        // row. Federation peers can no longer drain an agent's cap by
720        // fanning across namespaces (the per-namespace dimension keeps
721        // each namespace's allotment intact).
722        match crate::quotas::check_and_record(
723            &lock.0,
724            &attribute_agent,
725            &mem.namespace,
726            crate::quotas::QuotaOp::Memory {
727                bytes: bytes_estimate,
728            },
729        ) {
730            Ok(()) => {}
731            Err(crate::quotas::QuotaCheckError::Quota(q)) => {
732                tracing::warn!(
733                    target: "federation::quota",
734                    peer = %body.sender_agent_id,
735                    attribute_agent = %attribute_agent,
736                    limit = q.limit.as_str(),
737                    current = q.current,
738                    max = q.max,
739                    "sync_push: per-agent quota exceeded; refusing federation push"
740                );
741                // Emit a signed audit event so the refusal lands in the
742                // tamper-evident chain alongside the F7-equivalent HTTP
743                // POST refusal. Best-effort: audit-write failure is
744                // logged but does not change the refusal control flow.
745                let _ = crate::signed_events::append_signed_event(
746                    &lock.0,
747                    // v0.7.0 #1099 (SR-1 #4, HIGH) — sign the quota-
748                    // refusal audit row with the daemon's installed
749                    // signing key when one is available. Pre-#1099 the
750                    // row always landed unsigned.
751                    &crate::signed_events::SignedEvent::with_daemon_signature(
752                        crate::signed_events::payload_hash(
753                            format!(
754                                "peer={} agent={} limit={} current={} max={}",
755                                body.sender_agent_id,
756                                attribute_agent,
757                                q.limit.as_str(),
758                                q.current,
759                                q.max,
760                            )
761                            .as_bytes(),
762                        ),
763                        attribute_agent.clone(),
764                        "federation.quota_refused".to_string(),
765                        chrono::Utc::now().to_rfc3339(),
766                    ),
767                );
768                quota_refused += 1;
769                if first_quota_refusal.is_none() {
770                    first_quota_refusal = Some(q);
771                }
772                // Short-circuit: any further memories in this push
773                // would only deepen the cap breach. The remainder of
774                // the loop posture (skipping the rest) matches the
775                // HTTP POST bulk-create refusal — first cap hit
776                // returns 429 with the remaining batch unprocessed.
777                break;
778            }
779            Err(crate::quotas::QuotaCheckError::Sql(e)) => {
780                tracing::warn!(
781                    "sync_push: quota substrate read failed for {}: {e}",
782                    attribute_agent
783                );
784                skipped += 1;
785                continue;
786            }
787        }
788        // v0.7.0 L2-2 (S6-M1) — stamp `metadata.reflection_origin` on
789        // inbound reflection rows before the insert. The stamped copy
790        // carries `peer_origin`, `original_depth`, and the receiver's
791        // local cap at arrival time; the substrate row preserves the
792        // original `reflection_depth` so derived-write cap enforcement
793        // (storage::reflect) sees the same value the source peer saw.
794        // Non-reflection rows (depth == 0) pass through unchanged.
795        //
796        // #961 (SAL-boundary cleanup): use the `db::` namespace alias
797        // (which re-exports `crate::storage` from `src/lib.rs:52`) so
798        // every sqlite-direct call in this branch reads as a single
799        // module surface — keeps the alias hygiene that the rest of
800        // this file already follows (`db::insert_if_newer`,
801        // `db::archive_memory`, etc.).
802        let cap_for_namespace = db::resolve_governance_policy(&lock.0, &mem.namespace)
803            .unwrap_or_else(crate::models::GovernancePolicy::default)
804            .effective_max_reflection_depth();
805        let to_insert = crate::federation::reflection_bookkeeping::stamp_reflection_origin(
806            mem,
807            &body.sender_agent_id,
808            cap_for_namespace,
809        );
810        match db::insert_if_newer(&lock.0, &to_insert) {
811            Ok(actual_id) => {
812                applied += 1;
813                // #1566 / #1579 B1 — store a dim-matching shipped
814                // vector directly (no local embed at all); anything
815                // else falls back to the deferred background embed.
816                // `se.vector.len() == se.dim` guards a malformed
817                // sender whose claimed dim disagrees with the payload.
818                // #1584 (SEC) — the dim gate is necessary but not
819                // sufficient: a shipped vector with NaN/±Inf components
820                // or a non-unit norm poisons cosine ranking.
821                // `sanitize_shipped_vector` rejects non-finite vectors
822                // and L2-normalizes the rest; `None` (or a dim mismatch)
823                // falls back to a local re-embed.
824                let clean_shipped = shipped_by_id
825                    .get(mem.id.as_str())
826                    .filter(|se| receiver_dim == Some(se.dim) && se.vector.len() == se.dim)
827                    .and_then(|se| {
828                        crate::federation::sanitize_shipped_vector(&se.vector)
829                            .map(|v| (v, se.model.clone()))
830                    });
831                match clean_shipped {
832                    Some((vector, model)) => {
833                        match db::set_embedding(&lock.0, &actual_id, &vector) {
834                            Ok(()) => hnsw_updates.push((actual_id, vector)),
835                            Err(e) => {
836                                tracing::warn!(
837                                    "sync_push: storing shipped embedding failed for \
838                                 {actual_id} (model {model}): {e} — deferring local embed",
839                                );
840                                deferred_embed.push((
841                                    actual_id,
842                                    crate::embeddings::embedding_document(&mem.title, &mem.content),
843                                ));
844                            }
845                        }
846                    }
847                    None => deferred_embed.push((
848                        actual_id,
849                        crate::embeddings::embedding_document(&mem.title, &mem.content),
850                    )),
851                }
852            }
853            Err(e) => {
854                // Best-effort refund so a downstream insert failure
855                // doesn't leak quota counters. `refund_op` saturates at
856                // zero so a buggy double-refund cannot poison the row.
857                // #1156 — refund on the same `(agent_id, namespace)`
858                // row the check_and_record above incremented.
859                let _ = crate::quotas::refund_op(
860                    &lock.0,
861                    &attribute_agent,
862                    &mem.namespace,
863                    crate::quotas::QuotaOp::Memory {
864                        bytes: bytes_estimate,
865                    },
866                );
867                tracing::warn!("sync_push: insert_if_newer failed for {}: {e}", mem.id);
868                skipped += 1;
869            }
870        }
871    }
872
873    // v0.7.0 S6-M2 — quota refusal short-circuit. The first refusal in
874    // the loop produces a 429 with X-Quota-Reset-At so callers back off
875    // (matches the HTTP POST store refusal envelope from F7 / #639).
876    if let Some(q) = first_quota_refusal.take() {
877        drop(lock);
878        // #1566 / #1579 B1 — rows applied BEFORE the refusal are
879        // committed and stay committed (the 429 covers the remainder
880        // of the batch); index their stored shipped vectors and defer
881        // the rest exactly like the success path, instead of leaving
882        // them for the next boot backfill.
883        if !hnsw_updates.is_empty() {
884            let mut idx_lock = app.vector_index.lock().await;
885            if let Some(idx) = idx_lock.as_mut() {
886                for (id, vec) in hnsw_updates {
887                    idx.remove(&id);
888                    idx.insert(id, vec);
889                }
890            }
891        }
892        spawn_deferred_embedding_refresh(&app, deferred_embed);
893        let reset_at = next_utc_midnight();
894        return (
895            StatusCode::TOO_MANY_REQUESTS,
896            [
897                ("x-quota-reset-at", reset_at.as_str()),
898                ("x-quota-limit", q.limit.as_str()),
899            ],
900            Json(json!({
901                "error": "QUOTA_EXCEEDED",
902                "limit": q.limit.as_str(),
903                "current": q.current,
904                "max": q.max,
905                "agent_id": q.agent_id,
906                "applied_before_refusal": applied,
907                (crate::handlers::QUOTA_REFUSED_FIELD): quota_refused,
908                "reset_at": reset_at,
909            })),
910        )
911            .into_response();
912    }
913
914    // Process deletions (v0.6.0.1 — scenario 10 fanout). Invalid ids are
915    // skipped silently; missing rows count as no-op. Peers that have
916    // already GC'd the row see identical post-state.
917    for del_id in &body.deletions {
918        if validate::validate_id(del_id).is_err() {
919            skipped += 1;
920            continue;
921        }
922        if body.dry_run {
923            noop += 1;
924            continue;
925        }
926        match db::delete(&lock.0, del_id) {
927            Ok(true) => deleted += 1,
928            Ok(false) => noop += 1,
929            Err(e) => {
930                tracing::warn!("sync_push: delete failed for {del_id}: {e}");
931                skipped += 1;
932            }
933        }
934    }
935
936    // v0.6.2 (S29): process explicit archives. Soft-move from `memories`
937    // to `archived_memories` — distinct from deletions which hard-delete.
938    // Missing rows count as no-op (peer may have already archived or
939    // never received the original write).
940    for arch_id in &body.archives {
941        if validate::validate_id(arch_id).is_err() {
942            skipped += 1;
943            continue;
944        }
945        if body.dry_run {
946            noop += 1;
947            continue;
948        }
949        match db::archive_memory(&lock.0, arch_id, Some("sync_push")) {
950            Ok(true) => archived += 1,
951            Ok(false) => noop += 1,
952            Err(e) => {
953                tracing::warn!("sync_push: archive_memory failed for {arch_id}: {e}");
954                skipped += 1;
955            }
956        }
957    }
958
959    // v0.6.2 (S29): process explicit restores — the inverse of archives.
960    // Move the row from `archived_memories` back into `memories`.
961    // No-op posture matches archives: missing rows (peer hasn't received
962    // the archive, or the row is already live) count as noop so replays
963    // and out-of-order restore/archive pairs don't error.
964    for res_id in &body.restores {
965        if validate::validate_id(res_id).is_err() {
966            skipped += 1;
967            continue;
968        }
969        if body.dry_run {
970            noop += 1;
971            continue;
972        }
973        match db::restore_archived(&lock.0, res_id) {
974            Ok(true) => restored += 1,
975            Ok(false) => noop += 1,
976            Err(e) => {
977                tracing::warn!("sync_push: restore_archived failed for {res_id}: {e}");
978                skipped += 1;
979            }
980        }
981    }
982
983    // v0.6.2 (#325): process incoming links. Duplicates are expected on
984    // retry / re-sync and collapse to a no-op via the unique index on
985    // (source_id, target_id, relation). Invalid ids are skipped silently
986    // — same posture as deletions.
987    //
988    // v0.7 H3: when a link arrives with a signature + observed_by claim,
989    // verify it against the public key associated with that claim before
990    // landing the row. Tampered signatures → reject with a warn log.
991    // Unknown observed_by (no enrolled key on this host) → accept-and-
992    // flag as `unsigned` so federation back-compat holds for peers that
993    // haven't enrolled yet. Successful verify → land with attest_level
994    // `peer_attested`.
995    let mut links_applied = 0usize;
996    for link in &body.links {
997        if validate::RequestValidator::validate_link_triple(
998            &link.source_id,
999            &link.target_id,
1000            link.relation.as_str(),
1001        )
1002        .is_err()
1003        {
1004            skipped += 1;
1005            continue;
1006        }
1007        if body.dry_run {
1008            noop += 1;
1009            continue;
1010        }
1011
1012        // Decide attest_level via the H3 verify path before insert.
1013        let attest_level = match (link.signature.as_deref(), link.observed_by.as_deref()) {
1014            (Some(sig_bytes), Some(observed_by)) => {
1015                match crate::identity::verify::lookup_peer_public_key(observed_by) {
1016                    Some(pubkey) => {
1017                        let signable = crate::identity::sign::SignableLink {
1018                            src_id: &link.source_id,
1019                            dst_id: &link.target_id,
1020                            relation: link.relation.as_str(),
1021                            observed_by: Some(observed_by),
1022                            valid_from: link.valid_from.as_deref(),
1023                            valid_until: link.valid_until.as_deref(),
1024                        };
1025                        match crate::identity::verify::verify(&pubkey, &signable, sig_bytes) {
1026                            Ok(()) => crate::models::AttestLevel::PeerAttested.as_str(),
1027                            Err(e) => {
1028                                // Tampered / malformed-sig: refuse to land
1029                                // the row. The receiver-side warn log is
1030                                // the operator's signal that a peer is
1031                                // misbehaving (or that a key rotation
1032                                // got out of sync).
1033                                tracing::warn!(
1034                                    "sync_push: signature rejected for link \
1035                                     ({} -> {} / {}) from observed_by={}: {e}",
1036                                    link.source_id,
1037                                    link.target_id,
1038                                    link.relation,
1039                                    observed_by
1040                                );
1041                                skipped += 1;
1042                                continue;
1043                            }
1044                        }
1045                    }
1046                    None => {
1047                        // No public key enrolled for this peer →
1048                        // accept-and-flag as unsigned. Operators can
1049                        // later enroll the key (`identity import`) and
1050                        // re-sync to upgrade the row's attest_level on
1051                        // a subsequent re-send.
1052                        crate::models::AttestLevel::Unsigned.as_str()
1053                    }
1054                }
1055            }
1056            // No signature on the wire (legacy v0.6.x peer) or no
1057            // observed_by claim → treat as unsigned. Same posture as
1058            // pre-H3 federation.
1059            _ => crate::models::AttestLevel::Unsigned.as_str(),
1060        };
1061
1062        match db::create_link_inbound(&lock.0, link, attest_level) {
1063            Ok(()) => links_applied += 1,
1064            Err(e) => {
1065                tracing::warn!(
1066                    "sync_push: create_link_inbound failed ({} -> {} / {}): {e}",
1067                    link.source_id,
1068                    link.target_id,
1069                    link.relation
1070                );
1071                skipped += 1;
1072            }
1073        }
1074    }
1075
1076    // v0.6.2 (S34): process incoming pending-action rows. Uses
1077    // `upsert_pending_action` so replays / races converge on the
1078    // originator's canonical row. Invalid ids skipped silently.
1079    let mut pendings_applied = 0usize;
1080    for pa in &body.pendings {
1081        if validate::validate_id(&pa.id).is_err() {
1082            skipped += 1;
1083            continue;
1084        }
1085        if body.dry_run {
1086            noop += 1;
1087            continue;
1088        }
1089        match db::upsert_pending_action(&lock.0, pa) {
1090            Ok(()) => {
1091                pendings_applied += 1;
1092                // v0.7.0 K4 — peer-originated pending rows fire the
1093                // `approval_requested` event on this peer too so local
1094                // approval-API subscribers get a uniform view of the
1095                // queue regardless of which node minted the row.
1096                // `upsert_*` is idempotent (`ON CONFLICT(id) DO UPDATE`)
1097                // — replays of the same row currently re-fire the
1098                // event; that's the documented K4 behaviour and matches
1099                // the existing `pending_action_expired` semantics. K7
1100                // (subscription reliability) layers DLQ + dedup on top.
1101                if pa.status == "pending" {
1102                    crate::subscriptions::dispatch_approval_requested(&lock.0, &pa.id, &lock.1);
1103                }
1104            }
1105            Err(e) => {
1106                tracing::warn!("sync_push: upsert_pending_action failed for {}: {e}", pa.id);
1107                skipped += 1;
1108            }
1109        }
1110    }
1111
1112    // v0.6.2 (S34): process incoming pending-action decisions. No-op on
1113    // already-decided rows; that's the steady-state when the originator
1114    // and this peer both saw the decision. Rejected decisions still
1115    // transition status so retries on either side see `status != 'pending'`.
1116    let mut pending_decisions_applied = 0usize;
1117    for dec in &body.pending_decisions {
1118        if validate::validate_id(&dec.id).is_err() {
1119            skipped += 1;
1120            continue;
1121        }
1122        if body.dry_run {
1123            noop += 1;
1124            continue;
1125        }
1126        match db::decide_pending_action(&lock.0, &dec.id, dec.approved, &dec.decider) {
1127            Ok(true) => {
1128                pending_decisions_applied += 1;
1129                // On approve, replay the pending payload so the target
1130                // write (store/delete/promote) actually lands on this
1131                // peer — matches the originator's post-approve state.
1132                if dec.approved {
1133                    match db::execute_pending_action(&lock.0, &dec.id) {
1134                        Ok(_) => {}
1135                        Err(e) => {
1136                            tracing::warn!(
1137                                "sync_push: execute_pending_action failed for {}: {e}",
1138                                dec.id
1139                            );
1140                        }
1141                    }
1142                }
1143            }
1144            Ok(false) => noop += 1, // already decided — converged state
1145            Err(e) => {
1146                tracing::warn!(
1147                    "sync_push: decide_pending_action failed for {}: {e}",
1148                    dec.id
1149                );
1150                skipped += 1;
1151            }
1152        }
1153    }
1154
1155    // v0.6.2 (S35): process incoming namespace_meta rows. Applies via
1156    // `set_namespace_standard` so the peer's inheritance-chain walk has
1157    // the originator's explicit parent link. The standard memory itself
1158    // rides on the same push via `memories` (or arrived earlier through
1159    // `broadcast_store_quorum`); the namespace-meta row closes the gap.
1160    let mut namespace_meta_applied = 0usize;
1161    for entry in &body.namespace_meta {
1162        if validate::validate_namespace(&entry.namespace).is_err()
1163            || validate::validate_id(&entry.standard_id).is_err()
1164        {
1165            skipped += 1;
1166            continue;
1167        }
1168        if body.dry_run {
1169            noop += 1;
1170            continue;
1171        }
1172        match db::set_namespace_standard(
1173            &lock.0,
1174            &entry.namespace,
1175            &entry.standard_id,
1176            entry.parent_namespace.as_deref(),
1177        ) {
1178            Ok(()) => namespace_meta_applied += 1,
1179            Err(e) => {
1180                tracing::warn!(
1181                    "sync_push: set_namespace_standard failed for {}: {e}",
1182                    entry.namespace
1183                );
1184                skipped += 1;
1185            }
1186        }
1187    }
1188
1189    // v0.6.2 (S35 follow-up): process incoming namespace_meta_clears. Applies
1190    // via `db::clear_namespace_standard` so the peer drops its meta row and
1191    // subsequent `get_standard` returns empty. Missing-on-peer namespaces
1192    // no-op (`changed == 0`) — replays are safe.
1193    let mut namespace_meta_cleared = 0usize;
1194    for ns in &body.namespace_meta_clears {
1195        if validate::validate_namespace(ns).is_err() {
1196            skipped += 1;
1197            continue;
1198        }
1199        if body.dry_run {
1200            noop += 1;
1201            continue;
1202        }
1203        match db::clear_namespace_standard(&lock.0, ns) {
1204            Ok(true) => namespace_meta_cleared += 1,
1205            Ok(false) => noop += 1,
1206            Err(e) => {
1207                tracing::warn!("sync_push: clear_namespace_standard failed for {ns}: {e}");
1208                skipped += 1;
1209            }
1210        }
1211    }
1212
1213    // Advance the vector clock with the highest `updated_at` we observed.
1214    // Skipped in dry-run mode since the caller is only previewing.
1215    if !body.dry_run
1216        && let Some(at) = latest_seen.as_deref()
1217        && let Err(e) = db::sync_state_observe(&lock.0, &local_agent_id, &body.sender_agent_id, at)
1218    {
1219        tracing::warn!("sync_push: sync_state_observe failed: {e}");
1220    }
1221
1222    // #1566 / #1579 B1 — the pre-#1566 synchronous embed loop lived
1223    // here (one `emb.embed()` per applied row, ~1s/row via ollama,
1224    // WHILE HOLDING the DB lock and inside the sender's quorum-ack
1225    // window). It is gone: dim-matching shipped vectors were stored
1226    // inline above (cheap UPDATE under the already-held lock), and
1227    // every other applied row is handed to the detached background
1228    // task spawned after the response is decided below.
1229
1230    // Receiver's current clock, returned so the sender can learn which
1231    // peers the receiver has seen. Phase 3 Task 3a.1 will use this to
1232    // short-circuit redundant pushes.
1233    let receiver_clock = db::sync_state_load(&lock.0, &local_agent_id)
1234        .unwrap_or_else(|_| crate::models::VectorClock::default());
1235
1236    // Release DB lock before touching the HNSW index — the vector index
1237    // has its own mutex and holding both serializes unrelated writers.
1238    drop(lock);
1239    if !hnsw_updates.is_empty() {
1240        let mut idx_lock = app.vector_index.lock().await;
1241        if let Some(idx) = idx_lock.as_mut() {
1242            for (id, vec) in hnsw_updates {
1243                idx.remove(&id);
1244                idx.insert(id, vec);
1245            }
1246        }
1247    }
1248
1249    // #1566 / #1579 B1 — ack-after-commit: hand the rows that still
1250    // need a locally-computed vector to the detached background task.
1251    // The HTTP response (the sender's quorum ack) returns immediately.
1252    spawn_deferred_embedding_refresh(&app, deferred_embed);
1253
1254    (
1255        StatusCode::OK,
1256        Json(json!({
1257            "applied": applied,
1258            "deleted": deleted,
1259            "archived": archived,
1260            "restored": restored,
1261            "links_applied": links_applied,
1262            "pendings_applied": pendings_applied,
1263            "pending_decisions_applied": pending_decisions_applied,
1264            "namespace_meta_applied": namespace_meta_applied,
1265            "namespace_meta_cleared": namespace_meta_cleared,
1266            "noop": noop,
1267            "skipped": skipped,
1268            (crate::handlers::QUOTA_REFUSED_FIELD): quota_refused,
1269            "dry_run": body.dry_run,
1270            "receiver_agent_id": local_agent_id,
1271            "receiver_clock": receiver_clock,
1272        })),
1273    )
1274        .into_response()
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279    use super::*;
1280    use axum::http::HeaderValue;
1281
1282    /// v0.7.0 #1049 (Agent-5 #9) — `extract_peer_id` validates the
1283    /// header value through `crate::validate::validate_agent_id`
1284    /// before returning. The validator rejects whitespace, null
1285    /// bytes, control characters (CR/LF), shell metacharacters,
1286    /// and anything over 128 bytes. This unit suite pins both the
1287    /// happy path (legitimate agent-id-shaped values pass) and
1288    /// representative rejection arms.
1289    fn build_headers(value: &str) -> Option<HeaderMap> {
1290        // HeaderMap rejects some invalid bytes at insertion time
1291        // (e.g. ASCII control chars). Use HeaderValue::from_bytes
1292        // and ignore failures so the test can probe the validator
1293        // path; if HeaderValue refuses the byte sequence too, the
1294        // header is unreachable from the wire so we skip that case.
1295        let hv = HeaderValue::from_bytes(value.as_bytes()).ok()?;
1296        let mut h = HeaderMap::new();
1297        h.insert(PEER_ID_HEADER, hv);
1298        Some(h)
1299    }
1300
1301    #[test]
1302    fn extract_peer_id_accepts_legitimate_agent_id_1049() {
1303        let h = build_headers("ai:peer-alpha").expect("legitimate value fits in HeaderValue");
1304        assert_eq!(extract_peer_id(&h), Some("ai:peer-alpha"));
1305    }
1306
1307    #[test]
1308    fn extract_peer_id_accepts_hostname_form_1049() {
1309        let h = build_headers("host:laptop.local:pid-42").expect("legitimate value fits");
1310        assert_eq!(extract_peer_id(&h), Some("host:laptop.local:pid-42"));
1311    }
1312
1313    #[test]
1314    fn extract_peer_id_rejects_value_with_whitespace_1049() {
1315        let h = build_headers("peer one").expect("space fits in HeaderValue");
1316        assert_eq!(
1317            extract_peer_id(&h),
1318            None,
1319            "#1049: whitespace in peer-id MUST be rejected by the validator"
1320        );
1321    }
1322
1323    #[test]
1324    fn extract_peer_id_rejects_value_with_shell_metacharacters_1049() {
1325        let h = build_headers("peer$attacker").expect("$ fits in HeaderValue");
1326        assert_eq!(
1327            extract_peer_id(&h),
1328            None,
1329            "#1049: shell metacharacters in peer-id MUST be rejected"
1330        );
1331    }
1332
1333    #[test]
1334    fn extract_peer_id_rejects_oversized_value_1049() {
1335        // 129-byte string exceeds the validator's 1..=128 length cap.
1336        let oversized = "a".repeat(129);
1337        let h = build_headers(&oversized).expect("129-byte ASCII fits in HeaderValue");
1338        assert_eq!(
1339            extract_peer_id(&h),
1340            None,
1341            "#1049: oversized peer-id (>128 bytes) MUST be rejected"
1342        );
1343    }
1344
1345    #[test]
1346    fn extract_peer_id_absent_returns_none() {
1347        let h = HeaderMap::new();
1348        assert_eq!(extract_peer_id(&h), None);
1349    }
1350}