Skip to main content

ai_memory/federation/
sync.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Quorum-broadcast fan-out logic: post_once, post_and_classify,
5//! broadcast_*_quorum, bulk_catchup_push.
6
7use crate::models::field_names;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use tokio::sync::Mutex;
12use tokio::task::JoinSet;
13
14use crate::federation::identity::chain::CHAIN_HEADER;
15use crate::federation::identity::credential::CREDENTIAL_HEADER;
16use crate::federation::identity::outbound;
17use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
18use crate::replication::{AckTracker, QuorumError};
19
20use super::FederationConfig;
21
22/// #1558 batch 5 wave 2 — `QuorumError::LocalWriteFailed` detail used
23/// at every `Arc::try_unwrap(tracker)` finalise point in this file
24/// (one per fanout variant). File-local const; byte-identical detail.
25const TRACKER_ARC_STILL_REFERENCED: &str = "tracker arc still referenced at finalise";
26
27#[derive(Debug)]
28pub(super) enum AckOutcome {
29    Ack,
30    IdDrift,
31    Fail(String),
32}
33
34/// Single-attempt POST to a peer, classifying the response into an
35/// `AckOutcome`. No retries — callers that want retry-on-transient-fail
36/// should use [`post_and_classify`].
37///
38/// `api_key` (v0.7.0 fold-A2A1.4, #702) is the operator-configured
39/// `[api] api_key` from the local daemon's `AppConfig`. When `Some`,
40/// an `x-api-key: <value>` header is attached so peers that themselves
41/// run with api-key auth accept the outbound POST. When `None`, no
42/// header is attached — backwards-compatible with mTLS-only and
43/// no-auth deployments.
44pub(super) async fn post_once(
45    client: &reqwest::Client,
46    url: &str,
47    body: &serde_json::Value,
48    expected_id: &str,
49    idempotency_key: Option<&str>,
50    api_key: Option<&str>,
51    signing_key: Option<&ed25519_dalek::SigningKey>,
52) -> AckOutcome {
53    // Ultrareview #346: attach an idempotency key so peers can dedupe
54    // on retry. If a tokio::timeout fires locally but the HTTP POST
55    // already reached the peer, the peer applies the write once; a
56    // subsequent catchup sync carrying the same memory.id will be a
57    // no-op via `insert_if_newer`. The key is set from the outgoing
58    // memory id by default, which is stable across retries.
59    // v0.7.0 (issue #691 fold-1) — wire the NetworkRequest governance
60    // gate BEFORE the outbound HTTPS POST. A refuse rule
61    // (`{"host":"evil.example.com"}` etc.) short-circuits the fan-out
62    // for that peer with a typed `AckOutcome::Fail` carrying the
63    // refusal reason. The quorum combiner already treats `Fail` as
64    // "this peer did not ack", so a refusal counts as a peer-miss
65    // without crashing the broadcast (allowing the remaining peers to
66    // reach quorum). The audit chain records the refusal via the
67    // governance.check signed_events row emitted on the daemon side.
68    let host = reqwest::Url::parse(url)
69        .ok()
70        .and_then(|u| u.host_str().map(str::to_string))
71        .unwrap_or_else(|| url.to_string());
72    let scheme = reqwest::Url::parse(url)
73        .ok()
74        .map(|u| u.scheme().to_string())
75        .unwrap_or_default();
76    let net_action = crate::governance::agent_action::AgentAction::NetworkRequest {
77        host: host.clone(),
78        scheme,
79    };
80    if let Err(refusal) = crate::governance::wire_check::check(&net_action) {
81        return AckOutcome::Fail(format!(
82            "governance refused outbound to {host}: {}",
83            refusal.reason
84        ));
85    }
86    // v0.7.0 #791 — serialise the body ONCE so the signature input
87    // matches the wire bytes the receiver sees. Sending via
88    // `.body(bytes)` + explicit content-type bypasses reqwest's
89    // re-serialisation (which could perturb whitespace / key order
90    // across versions and break the signature).
91    let body_bytes = match serde_json::to_vec(body) {
92        Ok(b) => b,
93        Err(e) => {
94            return AckOutcome::Fail(format!("serialise body: {e}"));
95        }
96    };
97    let mut req = client
98        .post(url)
99        .header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
100        .body(body_bytes.clone());
101    if let Some(key) = idempotency_key {
102        req = req.header("Idempotency-Key", key);
103    }
104    // v0.7.0 fold-A2A1.4 (#702) — forward the operator-configured
105    // `[api] api_key` on every outbound federation POST. Peers that
106    // themselves run with api-key auth otherwise reject with 401 and
107    // cross-host quorum can never converge. Backwards-compatible:
108    // `None` means no header attached.
109    if let Some(key) = api_key {
110        req = req.header(crate::HEADER_API_KEY, key);
111    }
112    // v0.7.0 #791 + #922 — Ed25519 signature header + nonce header
113    // bound into signature input so byte-for-byte replays are refused.
114    if let Some(sk) = signing_key {
115        let nonce = uuid::Uuid::new_v4().to_string();
116        let sig_header =
117            crate::federation::signing::sign_body_with_nonce_header(sk, &body_bytes, &nonce);
118        req = req
119            .header(crate::federation::signing::SIGNATURE_HEADER, sig_header)
120            .header(crate::federation::signing::NONCE_HEADER, nonce);
121    }
122    // v0.7.0 #238 — attach `x-peer-id` carrying the body's
123    // `sender_agent_id` so the receiver's attestation step can
124    // cross-check the body claim against an explicit wire-level
125    // peer-id. The body's `sender_agent_id` is the canonical source
126    // (already in the payload); the header just lifts it to a
127    // request-shape position the receiver reads BEFORE deserialising
128    // the JSON envelope. Backwards-compatible: a missing field
129    // results in no header attached + the receiver enforces via the
130    // body field alone.
131    if let Some(peer_id) = body
132        .get(field_names::SENDER_AGENT_ID)
133        .and_then(|v| v.as_str())
134        .filter(|s| !s.is_empty())
135    {
136        req = req.header(crate::federation::peer_attestation::PEER_ID_HEADER, peer_id);
137    }
138    // FED-P3a — attach this node's CA-issued credential so the receiver
139    // can verify our per-message signature against the trust bundle
140    // instead of a manually enrolled `.pub` (the sender half of the
141    // receiver swap in `federation_signing_check::resolve_peer_verifying_key`).
142    // Held-credential absence is the normal pre-enrollment state; a
143    // malformed-encode is logged and skipped, never fatal — the wire
144    // still carries the legacy signature, so this degrades to per-peer.
145    if let Some(cred) = outbound::current() {
146        match cred.to_header_value() {
147            Ok(value) => req = req.header(CREDENTIAL_HEADER, value),
148            Err(e) => {
149                tracing::warn!(target: super::SIGNING_TRACE_TARGET, error = %e,
150                    "failed to encode outbound federation credential header; omitting");
151            }
152        }
153        // FED-P4d — when the leaf is signed by an intermediate rather than a
154        // root, also attach the anchor-first intermediate chain so a peer
155        // holding only the root in its trust bundle can verify the full
156        // chain. A node holding no intermediates (the P2/P3 one-level case)
157        // emits no chain header, so the wire stays byte-identical to pre-P4.
158        let intermediates = outbound::current_intermediates();
159        match crate::federation::identity::chain::intermediates_to_header_value(&intermediates) {
160            Ok(Some(value)) => req = req.header(CHAIN_HEADER, value),
161            Ok(None) => {}
162            Err(e) => {
163                tracing::warn!(target: super::SIGNING_TRACE_TARGET, error = %e,
164                    "failed to encode outbound federation chain header; omitting");
165            }
166        }
167    }
168    match req.send().await {
169        Ok(resp) if resp.status().is_success() => {
170            match resp.json::<serde_json::Value>().await {
171                Ok(v) => {
172                    // sync_push responses don't echo per-memory ids; any
173                    // success on a 1-memory push is treated as an ack
174                    // unless the response carries an explicit `ids` array
175                    // whose content disagrees.
176                    if let Some(ids) = v.get("ids").and_then(|v| v.as_array())
177                        && !ids.is_empty()
178                        && !ids.iter().any(|x| x.as_str() == Some(expected_id))
179                    {
180                        return AckOutcome::IdDrift;
181                    }
182                    AckOutcome::Ack
183                }
184                Err(_) => AckOutcome::Ack, // body unparseable but 2xx = ack
185            }
186        }
187        Ok(resp) => {
188            // #1579 B5 — drain the error body before classifying the
189            // failure so hyper can return the pooled connection to the
190            // keep-alive pool. Dropping a `Response` with an unread
191            // body tears the connection down, and the next POST to the
192            // same peer pays a fresh mTLS handshake (~4.3×RTT measured
193            // on do-1461 — the mechanism behind the 0.3s/row serial
194            // DLQ-replay floor during error regimes like the #1578
195            // 429 era). The body is small (a JSON error envelope) and
196            // already in flight; reading it is microseconds.
197            let status = resp.status();
198            let _ = resp.bytes().await;
199            AckOutcome::Fail(format!("http {status}"))
200        }
201        Err(e) => AckOutcome::Fail(crate::errors::msg::network(e)),
202    }
203}
204
205/// Backoff before the single retry attempt in [`post_and_classify`].
206/// Short enough to fit both attempts inside the default 2s ack deadline
207/// plus the per-request client timeout; long enough to let a transient
208/// peer-side SQLite-mutex contention or network flap clear.
209pub(super) const FANOUT_RETRY_BACKOFF: Duration = Duration::from_millis(250);
210
211/// POST to a peer with a single retry on transient failure.
212///
213/// v0.6.2 Patch 2 (S40): v3r26 hermes-tls scenario-40 had node-2 see
214/// 499/500 bulk rows. Same scenario on ironclaw-tls passed 500/500/500.
215/// Root cause: under W=2/N=4 quorum the leader returns 200 once two peers
216/// ack. The third peer's POST runs in the post-quorum detach task. If
217/// that POST fails (transient network flap, peer 5xx under concurrent
218/// SQLite-mutex contention, TLS handshake reset), it was previously
219/// fire-and-forget — the row stayed permanently missing on that peer
220/// until a sync-daemon caught it up. The harness runs no sync daemon,
221/// so one missed POST = one permanently missing row.
222///
223/// Fix: retry once on `AckOutcome::Fail`. The Idempotency-Key header
224/// ensures a partial-apply race (peer received the first POST but the
225/// response was lost) deduplicates to a no-op on the peer side via
226/// `insert_if_newer`. `IdDrift` is NOT retried — it indicates the peer
227/// semantically disagreed about the id, not a transient failure, so
228/// retrying would just observe the same disagreement.
229///
230/// Quorum contract is unchanged: callers still observe a single
231/// `AckOutcome` per peer, now reflecting the best of two attempts.
232pub(super) async fn post_and_classify(
233    client: &reqwest::Client,
234    url: &str,
235    body: &serde_json::Value,
236    expected_id: &str,
237    idempotency_key: Option<&str>,
238    api_key: Option<&str>,
239    signing_key: Option<&ed25519_dalek::SigningKey>,
240) -> AckOutcome {
241    match post_once(
242        client,
243        url,
244        body,
245        expected_id,
246        idempotency_key,
247        api_key,
248        signing_key,
249    )
250    .await
251    {
252        AckOutcome::Ack => AckOutcome::Ack,
253        AckOutcome::IdDrift => AckOutcome::IdDrift,
254        AckOutcome::Fail(first_reason) => {
255            tokio::time::sleep(FANOUT_RETRY_BACKOFF).await;
256            match post_once(
257                client,
258                url,
259                body,
260                expected_id,
261                idempotency_key,
262                api_key,
263                signing_key,
264            )
265            .await
266            {
267                AckOutcome::Ack => {
268                    tracing::debug!(
269                        "federation: peer POST retry succeeded for {expected_id} (first attempt: {first_reason})"
270                    );
271                    crate::metrics::registry()
272                        .federation_fanout_retry_total
273                        .with_label_values(&["ok"])
274                        .inc();
275                    AckOutcome::Ack
276                }
277                AckOutcome::IdDrift => {
278                    crate::metrics::registry()
279                        .federation_fanout_retry_total
280                        .with_label_values(&["id_drift"])
281                        .inc();
282                    AckOutcome::IdDrift
283                }
284                AckOutcome::Fail(retry_reason) => {
285                    crate::metrics::registry()
286                        .federation_fanout_retry_total
287                        .with_label_values(&["fail"])
288                        .inc();
289                    AckOutcome::Fail(format!("first: {first_reason}; retry: {retry_reason}"))
290                }
291            }
292        }
293    }
294}
295
296/// Fan out a just-committed memory to every configured peer. Returns
297/// an `AckTracker` whose `finalise()` you then call against the
298/// deadline to get the quorum outcome.
299///
300/// The local node's commit is recorded as soon as this function is
301/// called — callers pass in a memory that has already been persisted
302/// locally. Roll-back semantics on quorum failure are handled by the
303/// caller (see `handlers::create_memory` for the HTTP path contract).
304pub async fn broadcast_store_quorum(
305    config: &FederationConfig,
306    mem: &Memory,
307) -> Result<AckTracker, QuorumError> {
308    broadcast_store_quorum_with_embedding(config, mem, None).await
309}
310
311/// #1566 / #1579 B1 — [`broadcast_store_quorum`] variant that ships
312/// the source-side embedding vector alongside the memory row
313/// (embed-once-replicate-vector). When `shipped` is `Some`, the push
314/// body carries an `embeddings: [ShippedEmbedding]` array INSIDE the
315/// signed payload so dim-matching receivers store the vector directly
316/// instead of re-embedding (~1s/row via ollama, paid up to 9× across
317/// the fleet pre-#1566). `None` preserves the exact pre-#1566 wire
318/// bytes (no `embeddings` key at all) — older peers and embedder-less
319/// senders are unaffected.
320///
321/// # Errors
322///
323/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc
324/// cannot be unwrapped (only occurs under a pathological detach race).
325pub async fn broadcast_store_quorum_with_embedding(
326    config: &FederationConfig,
327    mem: &Memory,
328    shipped: Option<&super::ShippedEmbedding>,
329) -> Result<AckTracker, QuorumError> {
330    // #931 (v0.7.0 Track D, 2026-05-20) — entry-line debug + info
331    // logs so the silent-bypass case where this function is never
332    // called (e.g. `app.federation` resolves to `None` on a path
333    // where the handler thought it was `Some`) is immediately
334    // distinguishable from "function called but every peer fails".
335    // Pre-#931 NO log fired on the happy path at all (only at
336    // tracing::warn on a per-peer failure), so the Track D Docker
337    // probe couldn't tell whether the broadcast path was even
338    // exercised. The wire wording `federation::broadcast: store
339    // <mem-id> -> N peer(s)` is pinned by the regression test in
340    // `tests/federation_x_api_key.rs::*`. Info-level (not debug) so
341    // operators tailing `docker logs alice | grep federation` see
342    // it without flipping `RUST_LOG=debug`.
343    tracing::info!(
344        target: super::SYNC_TRACE_TARGET,
345        memory_id = %mem.id,
346        namespace = %mem.namespace,
347        peer_count = config.peers.len(),
348        quorum_w = config.policy.w,
349        "federation::broadcast: store {} -> {} peer(s) (quorum W={})",
350        mem.id,
351        config.peers.len(),
352        config.policy.w,
353    );
354    let now = Instant::now();
355    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
356    tracker.lock().await.record_local();
357
358    let mut body = serde_json::json!({
359        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
360        "memories": [mem],
361        "dry_run": false,
362    });
363    // #1566 / #1579 B1 — attach the shipped vector inside the signed
364    // body. Conditional insertion keeps the wire bytes IDENTICAL to
365    // the pre-#1566 shape when no vector is available.
366    if let Some(se) = shipped {
367        body[field_names::EMBEDDINGS] = serde_json::json!([se]);
368    }
369
370    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
371    // v0.7.0 Track D #933 — collect the set of peer ids that were
372    // dispatched so the DLQ landing pass at the bottom of this
373    // function can compute "configured ∖ acked = silently-failed"
374    // independent of whether each task reported a Fail outcome
375    // (deadline-evicted tasks never report; pre-#933 they were
376    // silently lost).
377    #[cfg(feature = "sal")]
378    let dispatched_peer_ids: Vec<String> = config.peers.iter().map(|p| p.id.clone()).collect();
379    for peer in &config.peers {
380        let client = config.client.clone();
381        let url = peer.sync_push_url.clone();
382        let id = peer.id.clone();
383        let mem_id = mem.id.clone();
384        let payload = body.clone();
385        let api_key = config.api_key.clone();
386        let signing_key = config.signing_key.clone();
387        joins.spawn(async move {
388            let outcome = post_and_classify(
389                &client,
390                &url,
391                &payload,
392                &mem_id,
393                Some(&mem_id),
394                api_key.as_deref(),
395                signing_key.as_deref(),
396            )
397            .await;
398            (id, outcome)
399        });
400    }
401
402    // v0.7.0 Track D #933 — track per-peer outcomes observed inside
403    // the deadline so the DLQ landing pass at the bottom can tell
404    // (a) acked peers (skip), (b) explicit-fail peers (DLQ with the
405    // failure reason), and (c) deadline-evicted peers (no outcome
406    // observed; DLQ with "deadline" as the failure reason). Pre-#933
407    // the (c) bucket was silently lost.
408    #[cfg(feature = "sal")]
409    let mut explicit_failures: Vec<(String, String)> = Vec::new();
410
411    // Deadline is computed ONCE here and never re-derived inside the
412    // loop. The tracker carries the same deadline internally — passing
413    // a single `Instant` through avoids the few-millisecond disagreement
414    // that previously caused `finalise()` to reject quorums met 1-2 ms
415    // earlier. (#299 item 1.)
416    let deadline = now + config.policy.ack_timeout;
417    loop {
418        let remaining = deadline.saturating_duration_since(Instant::now());
419        if remaining.is_zero() {
420            break;
421        }
422        match tokio::time::timeout(remaining, joins.join_next()).await {
423            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
424                tracker.lock().await.record_peer_ack(peer_id);
425            }
426            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
427                tracker.lock().await.record_id_drift(peer_id);
428            }
429            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
430                tracing::warn!("federation: peer {peer_id} failed for {}: {reason}", mem.id);
431                #[cfg(feature = "sal")]
432                explicit_failures.push((peer_id.clone(), reason.clone()));
433                #[cfg(not(feature = "sal"))]
434                {
435                    let _ = (peer_id, reason);
436                }
437            }
438            Ok(Some(Err(e))) => {
439                tracing::warn!("federation: peer join error: {e}");
440            }
441            Ok(None) | Err(_) => break, // joinset drained or timed out
442        }
443        // Early-exit once the tracker says quorum is met — we don't
444        // need to wait for stragglers.
445        if tracker.lock().await.is_quorum_met(Instant::now()) {
446            break;
447        }
448    }
449
450    // v0.6.0 correctness fix: once quorum is met, DETACH the remaining
451    // fanouts into a background task so they complete naturally rather
452    // than being aborted mid-flight. Ship-gate run 14 showed each peer
453    // receiving only ~50% of burst writes under W=2/N=3 — cause: when
454    // peer-B won the ack race, `joins.shutdown().await` aborted the
455    // in-flight POST to peer-C, which often reached reqwest's connect
456    // phase but never delivered the memory. Net effect: every write
457    // landed on leader + exactly one peer, leaving the other peer
458    // permanently behind until a sync-daemon (not running in the phase-2
459    // harness) caught it up.
460    //
461    // The spawned fanout tasks do NOT hold the tracker Arc (they only
462    // capture client/url/payload/id), so letting them outlive this
463    // function does not block the `Arc::try_unwrap` below. Errors inside
464    // the detached tasks are logged but otherwise ignored — the caller
465    // has already met quorum by the time we detach.
466    if !joins.is_empty() {
467        // Ultrareview #343: emit a metric on detach-task failures so
468        // mesh divergence is observable. The detach task itself is
469        // still fire-and-forget — a full shutdown-drain would require
470        // plumbing a shared JoinSet into AppState; tracked separately.
471        let mem_id = mem.id.clone();
472        tokio::spawn(async move {
473            while let Some(res) = joins.join_next().await {
474                match res {
475                    Ok((peer_id, AckOutcome::Ack)) => {
476                        tracing::debug!("federation: post-quorum ack from {peer_id}");
477                    }
478                    Ok((peer_id, AckOutcome::IdDrift)) => {
479                        tracing::warn!(
480                            "federation: post-quorum id-drift from {peer_id} (peer rewrote id)"
481                        );
482                        crate::metrics::registry()
483                            .federation_fanout_dropped_total
484                            .with_label_values(&["id_drift"])
485                            .inc();
486                    }
487                    Ok((peer_id, AckOutcome::Fail(reason))) => {
488                        tracing::warn!(
489                            "federation: post-quorum peer {peer_id} did not ack for {mem_id}: {reason}"
490                        );
491                        crate::metrics::registry()
492                            .federation_fanout_dropped_total
493                            .with_label_values(&["peer_fail"])
494                            .inc();
495                    }
496                    Err(e) => {
497                        tracing::warn!("federation: post-quorum join error for {mem_id}: {e}");
498                        crate::metrics::registry()
499                            .federation_fanout_dropped_total
500                            .with_label_values(&["join_error"])
501                            .inc();
502                    }
503                }
504            }
505        });
506    }
507
508    let tracker = Arc::try_unwrap(tracker)
509        .map_err(|_| QuorumError::LocalWriteFailed {
510            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
511        })?
512        .into_inner();
513    // H9 (v0.7.0 round-2) — partial-quorum WARN. When the leader returns
514    // success (quorum met) but some configured peers never ack-ed inside
515    // the deadline, operators need to see the gap in logs before a
516    // follow-up sync cycle catches the lagging peer up. This is the
517    // canonical observation point: the tracker is finalised, the peer
518    // set is known, and the configured-vs-acked subtraction surfaces
519    // exactly which urls fell behind.
520    if tracker.finalise(Instant::now()).is_ok() {
521        let acked = tracker.acked_peer_ids();
522        let mut missing: Vec<String> = config
523            .peers
524            .iter()
525            .filter(|p| !acked.contains(&p.id))
526            .map(|p| p.sync_push_url.clone())
527            .collect();
528        if !missing.is_empty() {
529            missing.sort();
530            tracing::warn!(
531                memory_id = %mem.id,
532                n_missing = missing.len(),
533                peer_urls = ?missing,
534                "federation: quorum met but {} peer(s) did not ack: {:?}",
535                missing.len(),
536                missing,
537            );
538            crate::metrics::registry()
539                .federation_partial_quorum_total
540                .inc();
541        }
542    }
543
544    // v0.7.0 Track D #933 — federation push DLQ landing. For every
545    // configured peer that did NOT ack inside the deadline (including
546    // explicit Fail outcomes AND deadline-evicted tasks whose outcome
547    // was never observed), insert a `federation_push_dlq` row so the
548    // `replay_federation_push_dlq` worker can re-attempt the push on
549    // peer recovery. Pre-#933 these failures were silently lost — see
550    // the issue body for the full RCA + reproduction.
551    //
552    // Best-effort: a sink error never propagates because the local
553    // commit already succeeded and the quorum verdict is already
554    // computed. Operators observe sink-side errors via the
555    // tracing::warn line below + the gauge.
556    //
557    // Feature-gated to `--features sal` because the trait surface
558    // requires `async-trait`. The default (sqlite-only) build path
559    // never reaches this branch and pre-#933 behaviour is preserved.
560    #[cfg(feature = "sal")]
561    if let Some(sink) = config.dlq_sink.as_ref() {
562        let acked = tracker.acked_peer_ids();
563        let explicit_map: std::collections::HashMap<String, String> =
564            explicit_failures.into_iter().collect();
565        for peer_id in &dispatched_peer_ids {
566            if acked.contains(peer_id) {
567                continue;
568            }
569            let reason = explicit_map
570                .get(peer_id)
571                .cloned()
572                .unwrap_or_else(|| "deadline_exceeded".to_string());
573            if let Err(e) = sink
574                .enqueue_push_failure(&mem.id, peer_id, &body, &reason)
575                .await
576            {
577                tracing::warn!(
578                    target: super::push_dlq::PUSH_DLQ_TRACE_TARGET,
579                    memory_id = %mem.id,
580                    peer_id = %peer_id,
581                    "federation: failed to enqueue push-failure DLQ row \
582                     for peer {peer_id} on memory {}: {e}",
583                    mem.id,
584                );
585            } else {
586                tracing::info!(
587                    target: super::push_dlq::PUSH_DLQ_TRACE_TARGET,
588                    memory_id = %mem.id,
589                    peer_id = %peer_id,
590                    reason = %reason,
591                    "federation: enqueued push-failure DLQ row for peer {peer_id} \
592                     on memory {} (reason: {reason})",
593                    mem.id,
594                );
595            }
596        }
597    }
598    Ok(tracker)
599}
600
601/// Fan out a tombstone for `id` to every configured peer via the extended
602/// `sync_push` body (`deletions: [id]`). Same quorum contract as
603/// `broadcast_store_quorum`: local delete is recorded immediately, peer acks
604/// counted against `policy.write_quorum`, deadline enforced, stragglers
605/// detached.
606///
607/// # Errors
608///
609/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc cannot
610/// be unwrapped (only occurs under a pathological detach race).
611pub async fn broadcast_delete_quorum(
612    config: &FederationConfig,
613    id: &str,
614) -> Result<AckTracker, QuorumError> {
615    let now = Instant::now();
616    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
617    tracker.lock().await.record_local();
618
619    let body = serde_json::json!({
620        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
621        "memories": [],
622        "deletions": [id],
623        "dry_run": false,
624    });
625
626    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
627    for peer in &config.peers {
628        let client = config.client.clone();
629        let url = peer.sync_push_url.clone();
630        let peer_id = peer.id.clone();
631        let payload = body.clone();
632        let target_id = id.to_string();
633        let api_key = config.api_key.clone();
634        let signing_key = config.signing_key.clone();
635        joins.spawn(async move {
636            let outcome = post_and_classify(
637                &client,
638                &url,
639                &payload,
640                &target_id,
641                Some(&target_id),
642                api_key.as_deref(),
643                signing_key.as_deref(),
644            )
645            .await;
646            (peer_id, outcome)
647        });
648    }
649
650    let deadline = now + config.policy.ack_timeout;
651    loop {
652        let remaining = deadline.saturating_duration_since(Instant::now());
653        if remaining.is_zero() {
654            break;
655        }
656        match tokio::time::timeout(remaining, joins.join_next()).await {
657            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
658                tracker.lock().await.record_peer_ack(peer_id);
659            }
660            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
661                tracker.lock().await.record_id_drift(peer_id);
662            }
663            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
664                tracing::warn!("federation: delete peer {peer_id} failed for {id}: {reason}");
665            }
666            Ok(Some(Err(e))) => {
667                tracing::warn!("federation: delete peer join error: {e}");
668            }
669            Ok(None) | Err(_) => break,
670        }
671        if tracker.lock().await.is_quorum_met(Instant::now()) {
672            break;
673        }
674    }
675
676    if !joins.is_empty() {
677        tokio::spawn(async move {
678            while let Some(res) = joins.join_next().await {
679                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
680                    tracing::debug!(
681                        "federation: post-quorum delete peer {peer_id} did not ack: {reason}"
682                    );
683                }
684            }
685        });
686    }
687
688    let tracker = Arc::try_unwrap(tracker)
689        .map_err(|_| QuorumError::LocalWriteFailed {
690            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
691        })?
692        .into_inner();
693    Ok(tracker)
694}
695
696/// v0.6.2 (S29): fan out a just-archived memory id to every peer. Payload
697/// rides on `sync_push` via `archives: [id]`, mirroring the shape used
698/// by `broadcast_delete_quorum` for deletions. On the receiving peer,
699/// `sync_push` calls `db::archive_memory` to move the row into
700/// `archived_memories` — unlike the delete path this is a soft removal
701/// (the row remains queryable via `/api/v1/archive`).
702///
703/// Same quorum contract as `broadcast_store_quorum` / `broadcast_delete_quorum`.
704///
705/// # Errors
706///
707/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc cannot
708/// be unwrapped (only occurs under a pathological detach race).
709pub async fn broadcast_archive_quorum(
710    config: &FederationConfig,
711    id: &str,
712) -> Result<AckTracker, QuorumError> {
713    let now = Instant::now();
714    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
715    tracker.lock().await.record_local();
716
717    let body = serde_json::json!({
718        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
719        "memories": [],
720        "archives": [id],
721        "dry_run": false,
722    });
723
724    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
725    for peer in &config.peers {
726        let client = config.client.clone();
727        let url = peer.sync_push_url.clone();
728        let peer_id = peer.id.clone();
729        let payload = body.clone();
730        let target_id = id.to_string();
731        let api_key = config.api_key.clone();
732        let signing_key = config.signing_key.clone();
733        joins.spawn(async move {
734            let outcome = post_and_classify(
735                &client,
736                &url,
737                &payload,
738                &target_id,
739                Some(&target_id),
740                api_key.as_deref(),
741                signing_key.as_deref(),
742            )
743            .await;
744            (peer_id, outcome)
745        });
746    }
747
748    let deadline = now + config.policy.ack_timeout;
749    loop {
750        let remaining = deadline.saturating_duration_since(Instant::now());
751        if remaining.is_zero() {
752            break;
753        }
754        match tokio::time::timeout(remaining, joins.join_next()).await {
755            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
756                tracker.lock().await.record_peer_ack(peer_id);
757            }
758            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
759                tracker.lock().await.record_id_drift(peer_id);
760            }
761            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
762                tracing::warn!("federation: archive peer {peer_id} failed for {id}: {reason}");
763            }
764            Ok(Some(Err(e))) => {
765                tracing::warn!("federation: archive peer join error: {e}");
766            }
767            Ok(None) | Err(_) => break,
768        }
769        if tracker.lock().await.is_quorum_met(Instant::now()) {
770            break;
771        }
772    }
773
774    if !joins.is_empty() {
775        tokio::spawn(async move {
776            while let Some(res) = joins.join_next().await {
777                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
778                    tracing::debug!(
779                        "federation: post-quorum archive peer {peer_id} did not ack: {reason}"
780                    );
781                }
782            }
783        });
784    }
785
786    let tracker = Arc::try_unwrap(tracker)
787        .map_err(|_| QuorumError::LocalWriteFailed {
788            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
789        })?
790        .into_inner();
791    Ok(tracker)
792}
793
794/// v0.6.2 (S29): fan out a just-restored memory id to every peer. Payload
795/// rides on `sync_push` via `restores: [id]`, mirroring the shape used by
796/// `broadcast_archive_quorum`. On the receiving peer, `sync_push` moves
797/// the row from `archived_memories` back into `memories` via
798/// `db::restore_archived`. If the peer never saw the archive or the row
799/// isn't in its archive table, the sync call no-ops (same missing-on-peer
800/// posture used for archives and deletions).
801///
802/// Same quorum contract as `broadcast_store_quorum` / `broadcast_archive_quorum`.
803///
804/// # Errors
805///
806/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc cannot
807/// be unwrapped (only occurs under a pathological detach race).
808pub async fn broadcast_restore_quorum(
809    config: &FederationConfig,
810    id: &str,
811) -> Result<AckTracker, QuorumError> {
812    let now = Instant::now();
813    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
814    tracker.lock().await.record_local();
815
816    let body = serde_json::json!({
817        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
818        "memories": [],
819        "restores": [id],
820        "dry_run": false,
821    });
822
823    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
824    for peer in &config.peers {
825        let client = config.client.clone();
826        let url = peer.sync_push_url.clone();
827        let peer_id = peer.id.clone();
828        let payload = body.clone();
829        let target_id = id.to_string();
830        let api_key = config.api_key.clone();
831        let signing_key = config.signing_key.clone();
832        joins.spawn(async move {
833            let outcome = post_and_classify(
834                &client,
835                &url,
836                &payload,
837                &target_id,
838                Some(&target_id),
839                api_key.as_deref(),
840                signing_key.as_deref(),
841            )
842            .await;
843            (peer_id, outcome)
844        });
845    }
846
847    let deadline = now + config.policy.ack_timeout;
848    loop {
849        let remaining = deadline.saturating_duration_since(Instant::now());
850        if remaining.is_zero() {
851            break;
852        }
853        match tokio::time::timeout(remaining, joins.join_next()).await {
854            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
855                tracker.lock().await.record_peer_ack(peer_id);
856            }
857            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
858                tracker.lock().await.record_id_drift(peer_id);
859            }
860            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
861                tracing::warn!("federation: restore peer {peer_id} failed for {id}: {reason}");
862            }
863            Ok(Some(Err(e))) => {
864                tracing::warn!("federation: restore peer join error: {e}");
865            }
866            Ok(None) | Err(_) => break,
867        }
868        if tracker.lock().await.is_quorum_met(Instant::now()) {
869            break;
870        }
871    }
872
873    if !joins.is_empty() {
874        tokio::spawn(async move {
875            while let Some(res) = joins.join_next().await {
876                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
877                    tracing::debug!(
878                        "federation: post-quorum restore peer {peer_id} did not ack: {reason}"
879                    );
880                }
881            }
882        });
883    }
884
885    let tracker = Arc::try_unwrap(tracker)
886        .map_err(|_| QuorumError::LocalWriteFailed {
887            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
888        })?
889        .into_inner();
890    Ok(tracker)
891}
892
893/// v0.6.2 (#325): fan out a just-committed memory link to every peer.
894/// Payload rides on `sync_push` via `links: [link]`. Same quorum contract
895/// as `broadcast_store_quorum`.
896///
897/// # Errors
898///
899/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc cannot
900/// be unwrapped (only occurs under a pathological detach race).
901pub async fn broadcast_link_quorum(
902    config: &FederationConfig,
903    link: &MemoryLink,
904) -> Result<AckTracker, QuorumError> {
905    let now = Instant::now();
906    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
907    tracker.lock().await.record_local();
908
909    let body = serde_json::json!({
910        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
911        "memories": [],
912        "links": [link],
913        "dry_run": false,
914    });
915    let log_id = format!("{}→{}", link.source_id, link.target_id);
916
917    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
918    for peer in &config.peers {
919        let client = config.client.clone();
920        let url = peer.sync_push_url.clone();
921        let peer_id = peer.id.clone();
922        let payload = body.clone();
923        let log_id = log_id.clone();
924        let api_key = config.api_key.clone();
925        let signing_key = config.signing_key.clone();
926        joins.spawn(async move {
927            let outcome = post_and_classify(
928                &client,
929                &url,
930                &payload,
931                &log_id,
932                Some(&log_id),
933                api_key.as_deref(),
934                signing_key.as_deref(),
935            )
936            .await;
937            (peer_id, outcome)
938        });
939    }
940
941    let deadline = now + config.policy.ack_timeout;
942    loop {
943        let remaining = deadline.saturating_duration_since(Instant::now());
944        if remaining.is_zero() {
945            break;
946        }
947        match tokio::time::timeout(remaining, joins.join_next()).await {
948            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
949                tracker.lock().await.record_peer_ack(peer_id);
950            }
951            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
952                tracker.lock().await.record_id_drift(peer_id);
953            }
954            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
955                tracing::warn!("federation: link peer {peer_id} failed for {log_id}: {reason}");
956            }
957            Ok(Some(Err(e))) => {
958                tracing::warn!("federation: link peer join error: {e}");
959            }
960            Ok(None) | Err(_) => break,
961        }
962        if tracker.lock().await.is_quorum_met(Instant::now()) {
963            break;
964        }
965    }
966
967    if !joins.is_empty() {
968        tokio::spawn(async move {
969            while let Some(res) = joins.join_next().await {
970                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
971                    tracing::debug!(
972                        "federation: post-quorum link peer {peer_id} did not ack: {reason}"
973                    );
974                }
975            }
976        });
977    }
978
979    let tracker = Arc::try_unwrap(tracker)
980        .map_err(|_| QuorumError::LocalWriteFailed {
981            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
982        })?
983        .into_inner();
984    Ok(tracker)
985}
986
987/// v0.6.2 (#326): fan out a consolidation in a single `sync_push` — the new
988/// consolidated memory + the source ids being deleted. Mirrors the local
989/// semantics of `db::consolidate` (insert new + delete sources) so peers
990/// end up in the same terminal state as the originator.
991///
992/// # Errors
993///
994/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
995pub async fn broadcast_consolidate_quorum(
996    config: &FederationConfig,
997    new_mem: &Memory,
998    source_ids: &[String],
999) -> Result<AckTracker, QuorumError> {
1000    let now = Instant::now();
1001    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1002    tracker.lock().await.record_local();
1003
1004    let body = serde_json::json!({
1005        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1006        "memories": [new_mem],
1007        "deletions": source_ids,
1008        "dry_run": false,
1009    });
1010
1011    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1012    for peer in &config.peers {
1013        let client = config.client.clone();
1014        let url = peer.sync_push_url.clone();
1015        let peer_id = peer.id.clone();
1016        let payload = body.clone();
1017        let target_id = new_mem.id.clone();
1018        let api_key = config.api_key.clone();
1019        let signing_key = config.signing_key.clone();
1020        joins.spawn(async move {
1021            let outcome = post_and_classify(
1022                &client,
1023                &url,
1024                &payload,
1025                &target_id,
1026                Some(&target_id),
1027                api_key.as_deref(),
1028                signing_key.as_deref(),
1029            )
1030            .await;
1031            (peer_id, outcome)
1032        });
1033    }
1034
1035    let deadline = now + config.policy.ack_timeout;
1036    loop {
1037        let remaining = deadline.saturating_duration_since(Instant::now());
1038        if remaining.is_zero() {
1039            break;
1040        }
1041        match tokio::time::timeout(remaining, joins.join_next()).await {
1042            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1043                tracker.lock().await.record_peer_ack(peer_id);
1044            }
1045            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1046                tracker.lock().await.record_id_drift(peer_id);
1047            }
1048            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1049                tracing::warn!(
1050                    "federation: consolidate peer {peer_id} failed for {}: {reason}",
1051                    new_mem.id
1052                );
1053            }
1054            Ok(Some(Err(e))) => {
1055                tracing::warn!("federation: consolidate peer join error: {e}");
1056            }
1057            Ok(None) | Err(_) => break,
1058        }
1059        if tracker.lock().await.is_quorum_met(Instant::now()) {
1060            break;
1061        }
1062    }
1063
1064    if !joins.is_empty() {
1065        tokio::spawn(async move {
1066            while let Some(res) = joins.join_next().await {
1067                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1068                    tracing::debug!(
1069                        "federation: post-quorum consolidate peer {peer_id} did not ack: {reason}"
1070                    );
1071                }
1072            }
1073        });
1074    }
1075
1076    let tracker = Arc::try_unwrap(tracker)
1077        .map_err(|_| QuorumError::LocalWriteFailed {
1078            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1079        })?
1080        .into_inner();
1081    Ok(tracker)
1082}
1083
1084/// v0.6.2 (S34): fan out a just-created pending-action row to every peer
1085/// via `sync_push.pendings`. Callers pass the fully-hydrated `PendingAction`
1086/// read from their local `pending_actions` table so peers can upsert it
1087/// with the same id / status / approvals tuple the originator has. Mirrors
1088/// the quorum semantics of `broadcast_store_quorum` — local pending row
1089/// is already persisted at call time; peer acks are counted against
1090/// `policy.write_quorum`.
1091///
1092/// # Errors
1093///
1094/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
1095pub async fn broadcast_pending_quorum(
1096    config: &FederationConfig,
1097    pending: &PendingAction,
1098) -> Result<AckTracker, QuorumError> {
1099    let now = Instant::now();
1100    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1101    tracker.lock().await.record_local();
1102
1103    let body = serde_json::json!({
1104        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1105        "memories": [],
1106        "pendings": [pending],
1107        "dry_run": false,
1108    });
1109
1110    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1111    for peer in &config.peers {
1112        let client = config.client.clone();
1113        let url = peer.sync_push_url.clone();
1114        let peer_id = peer.id.clone();
1115        let payload = body.clone();
1116        let target_id = pending.id.clone();
1117        let api_key = config.api_key.clone();
1118        let signing_key = config.signing_key.clone();
1119        joins.spawn(async move {
1120            let outcome = post_and_classify(
1121                &client,
1122                &url,
1123                &payload,
1124                &target_id,
1125                Some(&target_id),
1126                api_key.as_deref(),
1127                signing_key.as_deref(),
1128            )
1129            .await;
1130            (peer_id, outcome)
1131        });
1132    }
1133
1134    let deadline = now + config.policy.ack_timeout;
1135    loop {
1136        let remaining = deadline.saturating_duration_since(Instant::now());
1137        if remaining.is_zero() {
1138            break;
1139        }
1140        match tokio::time::timeout(remaining, joins.join_next()).await {
1141            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1142                tracker.lock().await.record_peer_ack(peer_id);
1143            }
1144            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1145                tracker.lock().await.record_id_drift(peer_id);
1146            }
1147            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1148                tracing::warn!(
1149                    "federation: pending peer {peer_id} failed for {}: {reason}",
1150                    pending.id
1151                );
1152            }
1153            Ok(Some(Err(e))) => {
1154                tracing::warn!("federation: pending peer join error: {e}");
1155            }
1156            Ok(None) | Err(_) => break,
1157        }
1158        if tracker.lock().await.is_quorum_met(Instant::now()) {
1159            break;
1160        }
1161    }
1162
1163    if !joins.is_empty() {
1164        tokio::spawn(async move {
1165            while let Some(res) = joins.join_next().await {
1166                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1167                    tracing::debug!(
1168                        "federation: post-quorum pending peer {peer_id} did not ack: {reason}"
1169                    );
1170                }
1171            }
1172        });
1173    }
1174
1175    let tracker = Arc::try_unwrap(tracker)
1176        .map_err(|_| QuorumError::LocalWriteFailed {
1177            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1178        })?
1179        .into_inner();
1180    Ok(tracker)
1181}
1182
1183/// v0.6.2 (S34): fan out a pending-action decision (approve/reject) to
1184/// peers via `sync_push.pending_decisions`. Without this, an approve on
1185/// node-2 leaves the row in `status='pending'` on node-1 and the caller
1186/// sees inconsistent governance state across the cluster. Peers apply
1187/// via `db::decide_pending_action` which is a no-op on already-decided
1188/// rows — replay-safe.
1189///
1190/// # Errors
1191///
1192/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
1193pub async fn broadcast_pending_decision_quorum(
1194    config: &FederationConfig,
1195    decision: &PendingDecision,
1196) -> Result<AckTracker, QuorumError> {
1197    let now = Instant::now();
1198    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1199    tracker.lock().await.record_local();
1200
1201    let body = serde_json::json!({
1202        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1203        "memories": [],
1204        "pending_decisions": [decision],
1205        "dry_run": false,
1206    });
1207
1208    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1209    for peer in &config.peers {
1210        let client = config.client.clone();
1211        let url = peer.sync_push_url.clone();
1212        let peer_id = peer.id.clone();
1213        let payload = body.clone();
1214        let target_id = decision.id.clone();
1215        let api_key = config.api_key.clone();
1216        let signing_key = config.signing_key.clone();
1217        joins.spawn(async move {
1218            let outcome = post_and_classify(
1219                &client,
1220                &url,
1221                &payload,
1222                &target_id,
1223                Some(&target_id),
1224                api_key.as_deref(),
1225                signing_key.as_deref(),
1226            )
1227            .await;
1228            (peer_id, outcome)
1229        });
1230    }
1231
1232    let deadline = now + config.policy.ack_timeout;
1233    loop {
1234        let remaining = deadline.saturating_duration_since(Instant::now());
1235        if remaining.is_zero() {
1236            break;
1237        }
1238        match tokio::time::timeout(remaining, joins.join_next()).await {
1239            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1240                tracker.lock().await.record_peer_ack(peer_id);
1241            }
1242            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1243                tracker.lock().await.record_id_drift(peer_id);
1244            }
1245            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1246                tracing::warn!(
1247                    "federation: pending-decision peer {peer_id} failed for {}: {reason}",
1248                    decision.id
1249                );
1250            }
1251            Ok(Some(Err(e))) => {
1252                tracing::warn!("federation: pending-decision peer join error: {e}");
1253            }
1254            Ok(None) | Err(_) => break,
1255        }
1256        if tracker.lock().await.is_quorum_met(Instant::now()) {
1257            break;
1258        }
1259    }
1260
1261    if !joins.is_empty() {
1262        tokio::spawn(async move {
1263            while let Some(res) = joins.join_next().await {
1264                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1265                    tracing::debug!(
1266                        "federation: post-quorum pending-decision peer {peer_id} did not ack: {reason}"
1267                    );
1268                }
1269            }
1270        });
1271    }
1272
1273    let tracker = Arc::try_unwrap(tracker)
1274        .map_err(|_| QuorumError::LocalWriteFailed {
1275            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1276        })?
1277        .into_inner();
1278    Ok(tracker)
1279}
1280
1281/// v0.6.2 (S35): fan out a `namespace_meta` row (the `(namespace,
1282/// standard_id, parent_namespace)` tuple set by `set_namespace_standard`)
1283/// to peers via `sync_push.namespace_meta`. Without this, peers see the
1284/// standard memory (already fanned out via `broadcast_store_quorum`) but
1285/// not the meta row tying it to a namespace + parent — so the
1286/// parent-chain walk on the peer falls through to `auto_detect_parent`
1287/// and can return a different ancestor than the originator.
1288///
1289/// # Errors
1290///
1291/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
1292pub async fn broadcast_namespace_meta_quorum(
1293    config: &FederationConfig,
1294    entry: &NamespaceMetaEntry,
1295) -> Result<AckTracker, QuorumError> {
1296    let now = Instant::now();
1297    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1298    tracker.lock().await.record_local();
1299
1300    let body = serde_json::json!({
1301        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1302        "memories": [],
1303        "namespace_meta": [entry],
1304        "dry_run": false,
1305    });
1306
1307    let target_id = entry.namespace.clone();
1308    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1309    for peer in &config.peers {
1310        let client = config.client.clone();
1311        let url = peer.sync_push_url.clone();
1312        let peer_id = peer.id.clone();
1313        let payload = body.clone();
1314        let target = target_id.clone();
1315        let api_key = config.api_key.clone();
1316        let signing_key = config.signing_key.clone();
1317        joins.spawn(async move {
1318            let outcome = post_and_classify(
1319                &client,
1320                &url,
1321                &payload,
1322                &target,
1323                Some(&target),
1324                api_key.as_deref(),
1325                signing_key.as_deref(),
1326            )
1327            .await;
1328            (peer_id, outcome)
1329        });
1330    }
1331
1332    let deadline = now + config.policy.ack_timeout;
1333    loop {
1334        let remaining = deadline.saturating_duration_since(Instant::now());
1335        if remaining.is_zero() {
1336            break;
1337        }
1338        match tokio::time::timeout(remaining, joins.join_next()).await {
1339            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1340                tracker.lock().await.record_peer_ack(peer_id);
1341            }
1342            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1343                tracker.lock().await.record_id_drift(peer_id);
1344            }
1345            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1346                tracing::warn!(
1347                    "federation: namespace_meta peer {peer_id} failed for {}: {reason}",
1348                    entry.namespace
1349                );
1350            }
1351            Ok(Some(Err(e))) => {
1352                tracing::warn!("federation: namespace_meta peer join error: {e}");
1353            }
1354            Ok(None) | Err(_) => break,
1355        }
1356        if tracker.lock().await.is_quorum_met(Instant::now()) {
1357            break;
1358        }
1359    }
1360
1361    if !joins.is_empty() {
1362        tokio::spawn(async move {
1363            while let Some(res) = joins.join_next().await {
1364                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1365                    tracing::debug!(
1366                        "federation: post-quorum namespace_meta peer {peer_id} did not ack: {reason}"
1367                    );
1368                }
1369            }
1370        });
1371    }
1372
1373    let tracker = Arc::try_unwrap(tracker)
1374        .map_err(|_| QuorumError::LocalWriteFailed {
1375            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1376        })?
1377        .into_inner();
1378    Ok(tracker)
1379}
1380
1381/// v0.6.2 (S35 follow-up): fan out a namespace-standard *clear* to peers
1382/// via `sync_push.namespace_meta_clears`. PR #363 shipped set-side fanout
1383/// via `broadcast_namespace_meta_quorum` but left the clear path local-only
1384/// — alice clearing on node-1 didn't propagate to bob on node-2, so the
1385/// scenario-35 cross-peer clear assertion failed.
1386///
1387/// Same quorum contract as the set broadcast: local-write pre-counted, one
1388/// POST per peer, `sync_push` bodies stuffed with the list of cleared
1389/// namespaces, first W-of-N acks win.
1390///
1391/// # Errors
1392///
1393/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
1394pub async fn broadcast_namespace_meta_clear_quorum(
1395    config: &FederationConfig,
1396    namespaces: &[String],
1397) -> Result<AckTracker, QuorumError> {
1398    let now = Instant::now();
1399    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1400    tracker.lock().await.record_local();
1401
1402    let body = serde_json::json!({
1403        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1404        "memories": [],
1405        "namespace_meta_clears": namespaces,
1406        "dry_run": false,
1407    });
1408
1409    // Use the joined namespace list as the ack-classifier's `target_id` so
1410    // post-quorum logs carry enough context to trace back to the operation.
1411    let target_id = namespaces.join(",");
1412    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1413    for peer in &config.peers {
1414        let client = config.client.clone();
1415        let url = peer.sync_push_url.clone();
1416        let peer_id = peer.id.clone();
1417        let payload = body.clone();
1418        let target = target_id.clone();
1419        let api_key = config.api_key.clone();
1420        let signing_key = config.signing_key.clone();
1421        joins.spawn(async move {
1422            let outcome = post_and_classify(
1423                &client,
1424                &url,
1425                &payload,
1426                &target,
1427                Some(&target),
1428                api_key.as_deref(),
1429                signing_key.as_deref(),
1430            )
1431            .await;
1432            (peer_id, outcome)
1433        });
1434    }
1435
1436    let deadline = now + config.policy.ack_timeout;
1437    loop {
1438        let remaining = deadline.saturating_duration_since(Instant::now());
1439        if remaining.is_zero() {
1440            break;
1441        }
1442        match tokio::time::timeout(remaining, joins.join_next()).await {
1443            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1444                tracker.lock().await.record_peer_ack(peer_id);
1445            }
1446            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1447                tracker.lock().await.record_id_drift(peer_id);
1448            }
1449            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1450                tracing::warn!(
1451                    "federation: namespace_meta_clear peer {peer_id} failed for [{}]: {reason}",
1452                    target_id
1453                );
1454            }
1455            Ok(Some(Err(e))) => {
1456                tracing::warn!("federation: namespace_meta_clear peer join error: {e}");
1457            }
1458            Ok(None) | Err(_) => break,
1459        }
1460        if tracker.lock().await.is_quorum_met(Instant::now()) {
1461            break;
1462        }
1463    }
1464
1465    if !joins.is_empty() {
1466        tokio::spawn(async move {
1467            while let Some(res) = joins.join_next().await {
1468                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1469                    tracing::debug!(
1470                        "federation: post-quorum namespace_meta_clear peer {peer_id} did not ack: {reason}"
1471                    );
1472                }
1473            }
1474        });
1475    }
1476
1477    let tracker = Arc::try_unwrap(tracker)
1478        .map_err(|_| QuorumError::LocalWriteFailed {
1479            detail: TRACKER_ARC_STILL_REFERENCED.to_string(),
1480        })?
1481        .into_inner();
1482    Ok(tracker)
1483}
1484
1485/// v0.6.2 Patch 2 (S40): post-fanout catchup for `bulk_create`.
1486///
1487/// After the per-row `broadcast_store_quorum` fanouts complete, issue a
1488/// single batched `sync_push` per peer with *every* row the leader just
1489/// committed. Peer-side `insert_if_newer` is idempotent, so rows that
1490/// already landed via the per-row fanout are no-ops on the peer; rows
1491/// that a peer missed (post-quorum detach failure + retry both failed,
1492/// or post-quorum detach timed out on that peer) are applied.
1493///
1494/// ## Why a catchup batch in addition to retry-once?
1495///
1496/// v3r26 hermes-tls S40 and v3r27 ironclaw-off S40 both showed a
1497/// single row missing on one specific peer (499/500) despite the
1498/// retry-once fix in [`post_and_classify`]. Retry-once is a probability
1499/// improver, not a guarantee: a peer under sustained SQLite-mutex
1500/// contention can drop two consecutive POSTs inside the ~250ms retry
1501/// window. A terminal batched catchup closes that last gap at O(1)
1502/// extra POST per peer instead of O(N) retries per row.
1503///
1504/// ## Safety
1505///
1506/// - Idempotent: peer's `insert_if_newer` matches on `id` + `updated_at`
1507///   and no-ops on already-applied rows.
1508/// - Quorum contract unchanged: the catchup runs AFTER quorum has been
1509///   met and the HTTP response shape decided. It cannot weaken any
1510///   guarantee; it only strengthens eventual consistency.
1511/// - Non-blocking for caller semantics: errors are logged and returned
1512///   but the leader still returns 200 to the client. The `bulk_create`
1513///   HTTP contract only promises local commit + W-1 peer acks, and
1514///   those have already landed by the time this is called.
1515///
1516/// Returns a map of `peer_id -> error string` for peers where the
1517/// catchup POST itself failed (logged by the caller). A successful
1518/// catchup POST appears in the map as an empty string or is omitted.
1519pub async fn bulk_catchup_push(
1520    config: &FederationConfig,
1521    memories: &[Memory],
1522) -> Vec<(String, String)> {
1523    if memories.is_empty() || config.peers.is_empty() {
1524        return Vec::new();
1525    }
1526    let body = serde_json::json!({
1527        (field_names::SENDER_AGENT_ID): config.sender_agent_id,
1528        "memories": memories,
1529        "dry_run": false,
1530    });
1531    let mut joins: JoinSet<(String, Result<(), String>)> = JoinSet::new();
1532    for peer in &config.peers {
1533        let client = config.client.clone();
1534        let url = peer.sync_push_url.clone();
1535        let id = peer.id.clone();
1536        let payload = body.clone();
1537        let api_key = config.api_key.clone();
1538        let signing_key = config.signing_key.clone();
1539        joins.spawn(async move {
1540            // v0.7.0 #791 — serialise once so the X-Memory-Sig
1541            // signature matches the wire bytes the receiver sees.
1542            let body_bytes = match serde_json::to_vec(&payload) {
1543                Ok(b) => b,
1544                Err(e) => {
1545                    return (id, Err(format!("serialise body: {e}")));
1546                }
1547            };
1548            let mut req = client
1549                .post(&url)
1550                .header(crate::HEADER_CONTENT_TYPE, crate::MIME_JSON)
1551                .body(body_bytes.clone());
1552            // No Idempotency-Key on the batch — the batch is itself an
1553            // idempotent replay, and the peer's `insert_if_newer`
1554            // dedupes per row by (id, updated_at).
1555            req = req.header("X-Catchup", "bulk");
1556            // v0.7.0 #791 + #922 — signature + nonce header.
1557            if let Some(sk) = signing_key.as_deref() {
1558                let nonce = uuid::Uuid::new_v4().to_string();
1559                let sig_header = crate::federation::signing::sign_body_with_nonce_header(
1560                    sk,
1561                    &body_bytes,
1562                    &nonce,
1563                );
1564                req = req
1565                    .header(crate::federation::signing::SIGNATURE_HEADER, sig_header)
1566                    .header(crate::federation::signing::NONCE_HEADER, nonce);
1567            }
1568            // v0.7.0 fold-A2A1.4 (#702) — forward the operator-configured
1569            // `x-api-key` on the catchup batch as well. Without this, a
1570            // catchup against a peer that runs with api-key auth fails
1571            // 401 and the row gap stays open.
1572            if let Some(key) = api_key.as_deref() {
1573                req = req.header(crate::HEADER_API_KEY, key);
1574            }
1575            // v0.7.0 #238 — attach `x-peer-id` so catchup batches
1576            // attest against the receiver's allowlist exactly like
1577            // the per-row fanout in `post_once`.
1578            if let Some(peer_id) = payload
1579                .get(field_names::SENDER_AGENT_ID)
1580                .and_then(|v| v.as_str())
1581                .filter(|s| !s.is_empty())
1582            {
1583                req = req.header(crate::federation::peer_attestation::PEER_ID_HEADER, peer_id);
1584            }
1585            let outcome = match req.send().await {
1586                Ok(resp) if resp.status().is_success() => {
1587                    // #1579 B5 — drain the (success) body so the
1588                    // connection returns to the keep-alive pool.
1589                    let _ = resp.bytes().await;
1590                    Ok(())
1591                }
1592                Ok(resp) => {
1593                    // #1579 B5 — same drain on the error arm; see
1594                    // `post_once` for the fresh-handshake rationale.
1595                    let status = resp.status();
1596                    let _ = resp.bytes().await;
1597                    Err(format!("http {status}"))
1598                }
1599                Err(e) => Err(crate::errors::msg::network(e)),
1600            };
1601            (id, outcome)
1602        });
1603    }
1604    let mut errors = Vec::new();
1605    while let Some(res) = joins.join_next().await {
1606        match res {
1607            Ok((peer_id, Err(err))) => {
1608                tracing::warn!("bulk_catchup_push: peer {peer_id} failed: {err}");
1609                errors.push((peer_id, err));
1610            }
1611            Ok((_, Ok(()))) => {}
1612            Err(e) => {
1613                tracing::warn!("bulk_catchup_push: join error: {e:?}");
1614                errors.push(("unknown".to_string(), e.to_string()));
1615            }
1616        }
1617    }
1618    errors
1619}