Skip to main content

ai_memory/
federation.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Federation autonomy — wires the quorum primitives from `replication`
5//! into the HTTP write path (v0.7 track C, PR 2 of N).
6//!
7//! ## Contract
8//!
9//! When the `ai-memory serve` daemon is started with `--quorum-writes N`
10//! and `--quorum-peers <url1,url2,…>`, every successful HTTP write
11//! fans out a 1-memory `/api/v1/sync/push` POST to each peer and counts
12//! 2xx responses as acks. The write returns OK to the HTTP caller only
13//! once the local commit plus `W - 1` peer acks land within the
14//! `--quorum-timeout-ms` deadline. Fewer acks → `503` with body
15//! `{"error":"quorum_not_met", "got":X, "needed":Y, "reason":…}`.
16//!
17//! ## Scope of this module
18//!
19//! - `FederationConfig` — the serve-time config parsed from CLI flags.
20//! - `broadcast_store_quorum` — async HTTP fan-out that builds an
21//!   `AckTracker` from `replication::QuorumPolicy`, spawns one task
22//!   per peer, and waits on either quorum-met or deadline.
23//! - Mock-peer integration tests covering the happy path, a dropped
24//!   ack pattern, and a total outage.
25//!
26//! ## NOT in scope of this module
27//!
28//! - The real multi-process chaos harness lives under `packaging/chaos/`
29//!   as an operator-facing shell script. A campaign report is produced
30//!   by `packaging/chaos/run-chaos.sh` — see that file for how to
31//!   measure the convergence bound committed to in ADR-0001.
32//! - MCP-over-stdio and CLI writes do NOT fan out to peers. The MCP
33//!   server is a single-tenant stdio client and the CLI is local; both
34//!   rely on the sync-daemon for eventual propagation. Only the HTTP
35//!   daemon is a federation node.
36
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use serde::{Deserialize, Serialize};
41use tokio::sync::Mutex;
42use tokio::task::JoinSet;
43
44use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
45use crate::replication::{AckTracker, QuorumError, QuorumFailureReason, QuorumPolicy};
46
47/// Configured-at-serve federation state. Parsed from
48/// `--quorum-writes` + `--quorum-peers` + `--quorum-timeout-ms`.
49#[derive(Clone)]
50pub struct FederationConfig {
51    pub policy: QuorumPolicy,
52    pub peers: Vec<PeerEndpoint>,
53    pub client: reqwest::Client,
54    pub sender_agent_id: String,
55}
56
57/// A single peer in the quorum mesh. The `id` is what we record in
58/// the ack tracker (typically the URL or the peer's mTLS fingerprint).
59#[derive(Clone, Debug)]
60pub struct PeerEndpoint {
61    pub id: String,
62    pub sync_push_url: String,
63}
64
65impl FederationConfig {
66    /// Build a `FederationConfig` from the serve-time CLI flags. Returns
67    /// `None` when federation is disabled (`quorum_writes == 0` or the
68    /// peer list is empty).
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the reqwest client cannot be constructed
73    /// with the supplied certificate material.
74    pub fn build(
75        quorum_writes: usize,
76        peer_urls: &[String],
77        timeout: Duration,
78        client_cert_path: Option<&std::path::Path>,
79        client_key_path: Option<&std::path::Path>,
80        ca_cert_path: Option<&std::path::Path>,
81        sender_agent_id: String,
82    ) -> anyhow::Result<Option<Self>> {
83        if quorum_writes == 0 || peer_urls.is_empty() {
84            return Ok(None);
85        }
86        // Ultrareview #341: reject duplicate peer URLs at build time.
87        // If the same peer URL appears twice under different indices,
88        // both would count as distinct ack sources and the quorum
89        // guarantee is violated. Normalize (trim trailing slash,
90        // lowercase scheme+host) before comparing.
91        let mut seen_urls: std::collections::HashSet<String> = std::collections::HashSet::new();
92        for raw in peer_urls {
93            let normalized = raw.trim_end_matches('/').to_ascii_lowercase();
94            if !seen_urls.insert(normalized.clone()) {
95                return Err(anyhow::anyhow!(
96                    "duplicate peer URL in --quorum-peers: {raw} (normalized: {normalized}) \
97                     — duplicates would let a single peer contribute to quorum more than once"
98                ));
99            }
100        }
101        let n = 1 + peer_urls.len(); // local node + remotes
102        let policy = QuorumPolicy::new(n, quorum_writes, timeout, Duration::from_secs(30))
103            .map_err(|e| anyhow::anyhow!("invalid quorum policy: {e}"))?;
104        let peers: Vec<PeerEndpoint> = peer_urls
105            .iter()
106            .enumerate()
107            .map(|(i, raw)| {
108                // `id` is used as a Prometheus metric label; keep it
109                // low-cardinality. The full URL is logged separately.
110                // (#304 nit — prior form `peer-{i}:{url}` blew up the
111                // label space as deployment size grew.)
112                let trimmed = raw.trim_end_matches('/');
113                tracing::debug!(
114                    target = "federation",
115                    peer_index = i,
116                    url = trimmed,
117                    "registered peer"
118                );
119                PeerEndpoint {
120                    id: format!("peer-{i}"),
121                    sync_push_url: format!("{trimmed}/api/v1/sync/push"),
122                }
123            })
124            .collect();
125
126        // Federation client tuning.
127        //
128        // An earlier PR #314 attempted tight `tcp_keepalive(1s)` +
129        // `pool_idle_timeout(5s)` on this builder to close the Phase
130        // 4 partition_minority convergence gap. Ship-gate run 21
131        // showed that combination caused Phase 4 to hang for 40+
132        // minutes — suspected cause was connection-pool churn on the
133        // chaos-client's local 3-process mesh exhausting ephemeral
134        // ports under continuous close+reopen cycles with the tight
135        // keepalive generating probe traffic on every idle socket.
136        //
137        // Reverted to the conservative-default client here. Partition-
138        // recovery under chaos is moved out of the required ship-gate
139        // and into an opt-in campaign shape. Real partition resilience
140        // is a v0.6.0.1+ investigation with instrumented cycle data
141        // (cycles_by_fault now landed in ship-gate, giving us per-cycle
142        // visibility the next time we attempt this).
143        let mut client_builder = reqwest::Client::builder()
144            .timeout(timeout)
145            .connect_timeout(Duration::from_secs(2))
146            .use_rustls_tls();
147        // --quorum-ca-cert: trust a caller-supplied root CA for outbound
148        // federation POSTs. Required whenever peers present a cert NOT
149        // rooted in webpki-roots (Mozilla CA bundle) — e.g. a self-
150        // signed / ephemeral CA generated for an isolated test fleet.
151        // Without this, reqwest's rustls-tls feature (webpki-roots
152        // only) rejects the peer cert and every quorum write times
153        // out as quorum_not_met. See alphaonedev/ai-memory-mcp#333.
154        if let Some(ca_path) = ca_cert_path {
155            let ca_pem = std::fs::read(ca_path)
156                .map_err(|e| anyhow::anyhow!("read --quorum-ca-cert: {e}"))?;
157            let ca = reqwest::Certificate::from_pem(&ca_pem)
158                .map_err(|e| anyhow::anyhow!("parse --quorum-ca-cert: {e}"))?;
159            client_builder = client_builder.add_root_certificate(ca);
160        }
161        if let (Some(cert), Some(key)) = (client_cert_path, client_key_path) {
162            let cert_pem =
163                std::fs::read(cert).map_err(|e| anyhow::anyhow!("read --client-cert: {e}"))?;
164            let key_pem =
165                std::fs::read(key).map_err(|e| anyhow::anyhow!("read --client-key: {e}"))?;
166            let mut pem = cert_pem;
167            pem.extend_from_slice(b"\n");
168            pem.extend_from_slice(&key_pem);
169            let identity = reqwest::Identity::from_pem(&pem)
170                .map_err(|e| anyhow::anyhow!("build mTLS identity: {e}"))?;
171            client_builder = client_builder.identity(identity);
172        }
173        let client = client_builder
174            .build()
175            .map_err(|e| anyhow::anyhow!("build federation client: {e}"))?;
176
177        Ok(Some(Self {
178            policy,
179            peers,
180            client,
181            sender_agent_id,
182        }))
183    }
184
185    /// Count of peers in the mesh (excludes the local node). Useful for
186    /// metrics labels.
187    #[must_use]
188    pub fn peer_count(&self) -> usize {
189        self.peers.len()
190    }
191}
192
193/// Fan out a just-committed memory to every configured peer. Returns
194/// an `AckTracker` whose `finalise()` you then call against the
195/// deadline to get the quorum outcome.
196///
197/// The local node's commit is recorded as soon as this function is
198/// called — callers pass in a memory that has already been persisted
199/// locally. Roll-back semantics on quorum failure are handled by the
200/// caller (see `handlers::create_memory` for the HTTP path contract).
201pub async fn broadcast_store_quorum(
202    config: &FederationConfig,
203    mem: &Memory,
204) -> Result<AckTracker, QuorumError> {
205    let now = Instant::now();
206    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
207    tracker.lock().await.record_local();
208
209    let body = serde_json::json!({
210        "sender_agent_id": config.sender_agent_id,
211        "memories": [mem],
212        "dry_run": false,
213    });
214
215    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
216    for peer in &config.peers {
217        let client = config.client.clone();
218        let url = peer.sync_push_url.clone();
219        let id = peer.id.clone();
220        let mem_id = mem.id.clone();
221        let payload = body.clone();
222        joins.spawn(async move {
223            let outcome = post_and_classify(&client, &url, &payload, &mem_id, Some(&mem_id)).await;
224            (id, outcome)
225        });
226    }
227
228    // Deadline is computed ONCE here and never re-derived inside the
229    // loop. The tracker carries the same deadline internally — passing
230    // a single `Instant` through avoids the few-millisecond disagreement
231    // that previously caused `finalise()` to reject quorums met 1-2 ms
232    // earlier. (#299 item 1.)
233    let deadline = now + config.policy.ack_timeout;
234    loop {
235        let remaining = deadline.saturating_duration_since(Instant::now());
236        if remaining.is_zero() {
237            break;
238        }
239        match tokio::time::timeout(remaining, joins.join_next()).await {
240            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
241                tracker.lock().await.record_peer_ack(peer_id);
242            }
243            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
244                tracker.lock().await.record_id_drift(peer_id);
245            }
246            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
247                tracing::warn!("federation: peer {peer_id} failed for {}: {reason}", mem.id);
248            }
249            Ok(Some(Err(e))) => {
250                tracing::warn!("federation: peer join error: {e}");
251            }
252            Ok(None) | Err(_) => break, // joinset drained or timed out
253        }
254        // Early-exit once the tracker says quorum is met — we don't
255        // need to wait for stragglers.
256        if tracker.lock().await.is_quorum_met(Instant::now()) {
257            break;
258        }
259    }
260
261    // v0.6.0 correctness fix: once quorum is met, DETACH the remaining
262    // fanouts into a background task so they complete naturally rather
263    // than being aborted mid-flight. Ship-gate run 14 showed each peer
264    // receiving only ~50% of burst writes under W=2/N=3 — cause: when
265    // peer-B won the ack race, `joins.shutdown().await` aborted the
266    // in-flight POST to peer-C, which often reached reqwest's connect
267    // phase but never delivered the memory. Net effect: every write
268    // landed on leader + exactly one peer, leaving the other peer
269    // permanently behind until a sync-daemon (not running in the phase-2
270    // harness) caught it up.
271    //
272    // The spawned fanout tasks do NOT hold the tracker Arc (they only
273    // capture client/url/payload/id), so letting them outlive this
274    // function does not block the `Arc::try_unwrap` below. Errors inside
275    // the detached tasks are logged but otherwise ignored — the caller
276    // has already met quorum by the time we detach.
277    if !joins.is_empty() {
278        // Ultrareview #343: emit a metric on detach-task failures so
279        // mesh divergence is observable. The detach task itself is
280        // still fire-and-forget — a full shutdown-drain would require
281        // plumbing a shared JoinSet into AppState; tracked separately.
282        let mem_id = mem.id.clone();
283        tokio::spawn(async move {
284            while let Some(res) = joins.join_next().await {
285                match res {
286                    Ok((peer_id, AckOutcome::Ack)) => {
287                        tracing::debug!("federation: post-quorum ack from {peer_id}");
288                    }
289                    Ok((peer_id, AckOutcome::IdDrift)) => {
290                        tracing::warn!(
291                            "federation: post-quorum id-drift from {peer_id} (peer rewrote id)"
292                        );
293                        crate::metrics::registry()
294                            .federation_fanout_dropped_total
295                            .with_label_values(&["id_drift"])
296                            .inc();
297                    }
298                    Ok((peer_id, AckOutcome::Fail(reason))) => {
299                        tracing::warn!(
300                            "federation: post-quorum peer {peer_id} did not ack for {mem_id}: {reason}"
301                        );
302                        crate::metrics::registry()
303                            .federation_fanout_dropped_total
304                            .with_label_values(&["peer_fail"])
305                            .inc();
306                    }
307                    Err(e) => {
308                        tracing::warn!("federation: post-quorum join error for {mem_id}: {e}");
309                        crate::metrics::registry()
310                            .federation_fanout_dropped_total
311                            .with_label_values(&["join_error"])
312                            .inc();
313                    }
314                }
315            }
316        });
317    }
318
319    let tracker = Arc::try_unwrap(tracker)
320        .map_err(|_| QuorumError::LocalWriteFailed {
321            detail: "tracker arc still referenced at finalise".to_string(),
322        })?
323        .into_inner();
324    Ok(tracker)
325}
326
327#[derive(Debug)]
328enum AckOutcome {
329    Ack,
330    IdDrift,
331    Fail(String),
332}
333
334/// Single-attempt POST to a peer, classifying the response into an
335/// `AckOutcome`. No retries — callers that want retry-on-transient-fail
336/// should use [`post_and_classify`].
337async fn post_once(
338    client: &reqwest::Client,
339    url: &str,
340    body: &serde_json::Value,
341    expected_id: &str,
342    idempotency_key: Option<&str>,
343) -> AckOutcome {
344    // Ultrareview #346: attach an idempotency key so peers can dedupe
345    // on retry. If a tokio::timeout fires locally but the HTTP POST
346    // already reached the peer, the peer applies the write once; a
347    // subsequent catchup sync carrying the same memory.id will be a
348    // no-op via `insert_if_newer`. The key is set from the outgoing
349    // memory id by default, which is stable across retries.
350    let mut req = client.post(url).json(body);
351    if let Some(key) = idempotency_key {
352        req = req.header("Idempotency-Key", key);
353    }
354    match req.send().await {
355        Ok(resp) if resp.status().is_success() => {
356            match resp.json::<serde_json::Value>().await {
357                Ok(v) => {
358                    // sync_push responses don't echo per-memory ids; any
359                    // success on a 1-memory push is treated as an ack
360                    // unless the response carries an explicit `ids` array
361                    // whose content disagrees.
362                    if let Some(ids) = v.get("ids").and_then(|v| v.as_array())
363                        && !ids.is_empty()
364                        && !ids.iter().any(|x| x.as_str() == Some(expected_id))
365                    {
366                        return AckOutcome::IdDrift;
367                    }
368                    AckOutcome::Ack
369                }
370                Err(_) => AckOutcome::Ack, // body unparseable but 2xx = ack
371            }
372        }
373        Ok(resp) => AckOutcome::Fail(format!("http {}", resp.status())),
374        Err(e) => AckOutcome::Fail(format!("network: {e}")),
375    }
376}
377
378/// Backoff before the single retry attempt in [`post_and_classify`].
379/// Short enough to fit both attempts inside the default 2s ack deadline
380/// plus the per-request client timeout; long enough to let a transient
381/// peer-side SQLite-mutex contention or network flap clear.
382const FANOUT_RETRY_BACKOFF: Duration = Duration::from_millis(250);
383
384/// POST to a peer with a single retry on transient failure.
385///
386/// v0.6.2 Patch 2 (S40): v3r26 hermes-tls scenario-40 had node-2 see
387/// 499/500 bulk rows. Same scenario on ironclaw-tls passed 500/500/500.
388/// Root cause: under W=2/N=4 quorum the leader returns 200 once two peers
389/// ack. The third peer's POST runs in the post-quorum detach task. If
390/// that POST fails (transient network flap, peer 5xx under concurrent
391/// SQLite-mutex contention, TLS handshake reset), it was previously
392/// fire-and-forget — the row stayed permanently missing on that peer
393/// until a sync-daemon caught it up. The harness runs no sync daemon,
394/// so one missed POST = one permanently missing row.
395///
396/// Fix: retry once on `AckOutcome::Fail`. The Idempotency-Key header
397/// ensures a partial-apply race (peer received the first POST but the
398/// response was lost) deduplicates to a no-op on the peer side via
399/// `insert_if_newer`. `IdDrift` is NOT retried — it indicates the peer
400/// semantically disagreed about the id, not a transient failure, so
401/// retrying would just observe the same disagreement.
402///
403/// Quorum contract is unchanged: callers still observe a single
404/// `AckOutcome` per peer, now reflecting the best of two attempts.
405async fn post_and_classify(
406    client: &reqwest::Client,
407    url: &str,
408    body: &serde_json::Value,
409    expected_id: &str,
410    idempotency_key: Option<&str>,
411) -> AckOutcome {
412    match post_once(client, url, body, expected_id, idempotency_key).await {
413        AckOutcome::Ack => AckOutcome::Ack,
414        AckOutcome::IdDrift => AckOutcome::IdDrift,
415        AckOutcome::Fail(first_reason) => {
416            tokio::time::sleep(FANOUT_RETRY_BACKOFF).await;
417            match post_once(client, url, body, expected_id, idempotency_key).await {
418                AckOutcome::Ack => {
419                    tracing::debug!(
420                        "federation: peer POST retry succeeded for {expected_id} (first attempt: {first_reason})"
421                    );
422                    crate::metrics::registry()
423                        .federation_fanout_retry_total
424                        .with_label_values(&["ok"])
425                        .inc();
426                    AckOutcome::Ack
427                }
428                AckOutcome::IdDrift => {
429                    crate::metrics::registry()
430                        .federation_fanout_retry_total
431                        .with_label_values(&["id_drift"])
432                        .inc();
433                    AckOutcome::IdDrift
434                }
435                AckOutcome::Fail(retry_reason) => {
436                    crate::metrics::registry()
437                        .federation_fanout_retry_total
438                        .with_label_values(&["fail"])
439                        .inc();
440                    AckOutcome::Fail(format!("first: {first_reason}; retry: {retry_reason}"))
441                }
442            }
443        }
444    }
445}
446
447/// v0.6.2 Patch 2 (S40): post-fanout catchup for `bulk_create`.
448///
449/// After the per-row `broadcast_store_quorum` fanouts complete, issue a
450/// single batched `sync_push` per peer with *every* row the leader just
451/// committed. Peer-side `insert_if_newer` is idempotent, so rows that
452/// already landed via the per-row fanout are no-ops on the peer; rows
453/// that a peer missed (post-quorum detach failure + retry both failed,
454/// or post-quorum detach timed out on that peer) are applied.
455///
456/// ## Why a catchup batch in addition to retry-once?
457///
458/// v3r26 hermes-tls S40 and v3r27 ironclaw-off S40 both showed a
459/// single row missing on one specific peer (499/500) despite the
460/// retry-once fix in [`post_and_classify`]. Retry-once is a probability
461/// improver, not a guarantee: a peer under sustained SQLite-mutex
462/// contention can drop two consecutive POSTs inside the ~250ms retry
463/// window. A terminal batched catchup closes that last gap at O(1)
464/// extra POST per peer instead of O(N) retries per row.
465///
466/// ## Safety
467///
468/// - Idempotent: peer's `insert_if_newer` matches on `id` + `updated_at`
469///   and no-ops on already-applied rows.
470/// - Quorum contract unchanged: the catchup runs AFTER quorum has been
471///   met and the HTTP response shape decided. It cannot weaken any
472///   guarantee; it only strengthens eventual consistency.
473/// - Non-blocking for caller semantics: errors are logged and returned
474///   but the leader still returns 200 to the client. The `bulk_create`
475///   HTTP contract only promises local commit + W-1 peer acks, and
476///   those have already landed by the time this is called.
477///
478/// Returns a map of `peer_id -> error string` for peers where the
479/// catchup POST itself failed (logged by the caller). A successful
480/// catchup POST appears in the map as an empty string or is omitted.
481pub async fn bulk_catchup_push(
482    config: &FederationConfig,
483    memories: &[Memory],
484) -> Vec<(String, String)> {
485    if memories.is_empty() || config.peers.is_empty() {
486        return Vec::new();
487    }
488    let body = serde_json::json!({
489        "sender_agent_id": config.sender_agent_id,
490        "memories": memories,
491        "dry_run": false,
492    });
493    let mut joins: JoinSet<(String, Result<(), String>)> = JoinSet::new();
494    for peer in &config.peers {
495        let client = config.client.clone();
496        let url = peer.sync_push_url.clone();
497        let id = peer.id.clone();
498        let payload = body.clone();
499        joins.spawn(async move {
500            let mut req = client.post(&url).json(&payload);
501            // No Idempotency-Key on the batch — the batch is itself an
502            // idempotent replay, and the peer's `insert_if_newer`
503            // dedupes per row by (id, updated_at).
504            req = req.header("X-Catchup", "bulk");
505            let outcome = match req.send().await {
506                Ok(resp) if resp.status().is_success() => Ok(()),
507                Ok(resp) => Err(format!("http {}", resp.status())),
508                Err(e) => Err(format!("network: {e}")),
509            };
510            (id, outcome)
511        });
512    }
513    let mut errors = Vec::new();
514    while let Some(res) = joins.join_next().await {
515        match res {
516            Ok((peer_id, Err(err))) => {
517                tracing::warn!("bulk_catchup_push: peer {peer_id} failed: {err}");
518                errors.push((peer_id, err));
519            }
520            Ok((_, Ok(()))) => {}
521            Err(e) => {
522                tracing::warn!("bulk_catchup_push: join error: {e:?}");
523                errors.push(("unknown".to_string(), e.to_string()));
524            }
525        }
526    }
527    errors
528}
529
530/// Classify an `AckTracker` into either a committed quorum (`Ok(n)`) or
531/// an error with a reason suitable for the `/503 quorum_not_met`
532/// payload. Consumes the tracker — call after the broadcast loop.
533///
534/// # Errors
535///
536/// Returns `QuorumError::QuorumNotMet` if the tracker did not meet
537/// its W threshold by the `now` tick.
538pub fn finalise_quorum(tracker: &AckTracker) -> Result<usize, QuorumError> {
539    tracker.finalise(Instant::now())
540}
541
542/// Fan out a tombstone for `id` to every configured peer via the extended
543/// `sync_push` body (`deletions: [id]`). Same quorum contract as
544/// `broadcast_store_quorum`: local delete is recorded immediately, peer acks
545/// counted against `policy.write_quorum`, deadline enforced, stragglers
546/// detached.
547///
548/// # Errors
549///
550/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc cannot
551/// be unwrapped (only occurs under a pathological detach race).
552pub async fn broadcast_delete_quorum(
553    config: &FederationConfig,
554    id: &str,
555) -> Result<AckTracker, QuorumError> {
556    let now = Instant::now();
557    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
558    tracker.lock().await.record_local();
559
560    let body = serde_json::json!({
561        "sender_agent_id": config.sender_agent_id,
562        "memories": [],
563        "deletions": [id],
564        "dry_run": false,
565    });
566
567    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
568    for peer in &config.peers {
569        let client = config.client.clone();
570        let url = peer.sync_push_url.clone();
571        let peer_id = peer.id.clone();
572        let payload = body.clone();
573        let target_id = id.to_string();
574        joins.spawn(async move {
575            let outcome =
576                post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
577            (peer_id, outcome)
578        });
579    }
580
581    let deadline = now + config.policy.ack_timeout;
582    loop {
583        let remaining = deadline.saturating_duration_since(Instant::now());
584        if remaining.is_zero() {
585            break;
586        }
587        match tokio::time::timeout(remaining, joins.join_next()).await {
588            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
589                tracker.lock().await.record_peer_ack(peer_id);
590            }
591            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
592                tracker.lock().await.record_id_drift(peer_id);
593            }
594            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
595                tracing::warn!("federation: delete peer {peer_id} failed for {id}: {reason}");
596            }
597            Ok(Some(Err(e))) => {
598                tracing::warn!("federation: delete peer join error: {e}");
599            }
600            Ok(None) | Err(_) => break,
601        }
602        if tracker.lock().await.is_quorum_met(Instant::now()) {
603            break;
604        }
605    }
606
607    if !joins.is_empty() {
608        tokio::spawn(async move {
609            while let Some(res) = joins.join_next().await {
610                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
611                    tracing::debug!(
612                        "federation: post-quorum delete peer {peer_id} did not ack: {reason}"
613                    );
614                }
615            }
616        });
617    }
618
619    let tracker = Arc::try_unwrap(tracker)
620        .map_err(|_| QuorumError::LocalWriteFailed {
621            detail: "tracker arc still referenced at finalise".to_string(),
622        })?
623        .into_inner();
624    Ok(tracker)
625}
626
627/// v0.6.2 (S29): fan out a just-archived memory id to every peer. Payload
628/// rides on `sync_push` via `archives: [id]`, mirroring the shape used
629/// by `broadcast_delete_quorum` for deletions. On the receiving peer,
630/// `sync_push` calls `db::archive_memory` to move the row into
631/// `archived_memories` — unlike the delete path this is a soft removal
632/// (the row remains queryable via `/api/v1/archive`).
633///
634/// Same quorum contract as `broadcast_store_quorum` / `broadcast_delete_quorum`.
635///
636/// # Errors
637///
638/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc cannot
639/// be unwrapped (only occurs under a pathological detach race).
640pub async fn broadcast_archive_quorum(
641    config: &FederationConfig,
642    id: &str,
643) -> Result<AckTracker, QuorumError> {
644    let now = Instant::now();
645    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
646    tracker.lock().await.record_local();
647
648    let body = serde_json::json!({
649        "sender_agent_id": config.sender_agent_id,
650        "memories": [],
651        "archives": [id],
652        "dry_run": false,
653    });
654
655    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
656    for peer in &config.peers {
657        let client = config.client.clone();
658        let url = peer.sync_push_url.clone();
659        let peer_id = peer.id.clone();
660        let payload = body.clone();
661        let target_id = id.to_string();
662        joins.spawn(async move {
663            let outcome =
664                post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
665            (peer_id, outcome)
666        });
667    }
668
669    let deadline = now + config.policy.ack_timeout;
670    loop {
671        let remaining = deadline.saturating_duration_since(Instant::now());
672        if remaining.is_zero() {
673            break;
674        }
675        match tokio::time::timeout(remaining, joins.join_next()).await {
676            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
677                tracker.lock().await.record_peer_ack(peer_id);
678            }
679            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
680                tracker.lock().await.record_id_drift(peer_id);
681            }
682            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
683                tracing::warn!("federation: archive peer {peer_id} failed for {id}: {reason}");
684            }
685            Ok(Some(Err(e))) => {
686                tracing::warn!("federation: archive peer join error: {e}");
687            }
688            Ok(None) | Err(_) => break,
689        }
690        if tracker.lock().await.is_quorum_met(Instant::now()) {
691            break;
692        }
693    }
694
695    if !joins.is_empty() {
696        tokio::spawn(async move {
697            while let Some(res) = joins.join_next().await {
698                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
699                    tracing::debug!(
700                        "federation: post-quorum archive peer {peer_id} did not ack: {reason}"
701                    );
702                }
703            }
704        });
705    }
706
707    let tracker = Arc::try_unwrap(tracker)
708        .map_err(|_| QuorumError::LocalWriteFailed {
709            detail: "tracker arc still referenced at finalise".to_string(),
710        })?
711        .into_inner();
712    Ok(tracker)
713}
714
715/// v0.6.2 (S29): fan out a just-restored memory id to every peer. Payload
716/// rides on `sync_push` via `restores: [id]`, mirroring the shape used by
717/// `broadcast_archive_quorum`. On the receiving peer, `sync_push` moves
718/// the row from `archived_memories` back into `memories` via
719/// `db::restore_archived`. If the peer never saw the archive or the row
720/// isn't in its archive table, the sync call no-ops (same missing-on-peer
721/// posture used for archives and deletions).
722///
723/// Same quorum contract as `broadcast_store_quorum` / `broadcast_archive_quorum`.
724///
725/// # Errors
726///
727/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc cannot
728/// be unwrapped (only occurs under a pathological detach race).
729pub async fn broadcast_restore_quorum(
730    config: &FederationConfig,
731    id: &str,
732) -> Result<AckTracker, QuorumError> {
733    let now = Instant::now();
734    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
735    tracker.lock().await.record_local();
736
737    let body = serde_json::json!({
738        "sender_agent_id": config.sender_agent_id,
739        "memories": [],
740        "restores": [id],
741        "dry_run": false,
742    });
743
744    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
745    for peer in &config.peers {
746        let client = config.client.clone();
747        let url = peer.sync_push_url.clone();
748        let peer_id = peer.id.clone();
749        let payload = body.clone();
750        let target_id = id.to_string();
751        joins.spawn(async move {
752            let outcome =
753                post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
754            (peer_id, outcome)
755        });
756    }
757
758    let deadline = now + config.policy.ack_timeout;
759    loop {
760        let remaining = deadline.saturating_duration_since(Instant::now());
761        if remaining.is_zero() {
762            break;
763        }
764        match tokio::time::timeout(remaining, joins.join_next()).await {
765            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
766                tracker.lock().await.record_peer_ack(peer_id);
767            }
768            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
769                tracker.lock().await.record_id_drift(peer_id);
770            }
771            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
772                tracing::warn!("federation: restore peer {peer_id} failed for {id}: {reason}");
773            }
774            Ok(Some(Err(e))) => {
775                tracing::warn!("federation: restore peer join error: {e}");
776            }
777            Ok(None) | Err(_) => break,
778        }
779        if tracker.lock().await.is_quorum_met(Instant::now()) {
780            break;
781        }
782    }
783
784    if !joins.is_empty() {
785        tokio::spawn(async move {
786            while let Some(res) = joins.join_next().await {
787                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
788                    tracing::debug!(
789                        "federation: post-quorum restore peer {peer_id} did not ack: {reason}"
790                    );
791                }
792            }
793        });
794    }
795
796    let tracker = Arc::try_unwrap(tracker)
797        .map_err(|_| QuorumError::LocalWriteFailed {
798            detail: "tracker arc still referenced at finalise".to_string(),
799        })?
800        .into_inner();
801    Ok(tracker)
802}
803
804/// v0.6.2 (#325): fan out a just-committed memory link to every peer.
805/// Payload rides on `sync_push` via `links: [link]`. Same quorum contract
806/// as `broadcast_store_quorum`.
807///
808/// # Errors
809///
810/// Returns `QuorumError::LocalWriteFailed` if the internal tracker Arc cannot
811/// be unwrapped (only occurs under a pathological detach race).
812pub async fn broadcast_link_quorum(
813    config: &FederationConfig,
814    link: &MemoryLink,
815) -> Result<AckTracker, QuorumError> {
816    let now = Instant::now();
817    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
818    tracker.lock().await.record_local();
819
820    let body = serde_json::json!({
821        "sender_agent_id": config.sender_agent_id,
822        "memories": [],
823        "links": [link],
824        "dry_run": false,
825    });
826    let log_id = format!("{}→{}", link.source_id, link.target_id);
827
828    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
829    for peer in &config.peers {
830        let client = config.client.clone();
831        let url = peer.sync_push_url.clone();
832        let peer_id = peer.id.clone();
833        let payload = body.clone();
834        let log_id = log_id.clone();
835        joins.spawn(async move {
836            let outcome = post_and_classify(&client, &url, &payload, &log_id, Some(&log_id)).await;
837            (peer_id, outcome)
838        });
839    }
840
841    let deadline = now + config.policy.ack_timeout;
842    loop {
843        let remaining = deadline.saturating_duration_since(Instant::now());
844        if remaining.is_zero() {
845            break;
846        }
847        match tokio::time::timeout(remaining, joins.join_next()).await {
848            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
849                tracker.lock().await.record_peer_ack(peer_id);
850            }
851            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
852                tracker.lock().await.record_id_drift(peer_id);
853            }
854            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
855                tracing::warn!("federation: link peer {peer_id} failed for {log_id}: {reason}");
856            }
857            Ok(Some(Err(e))) => {
858                tracing::warn!("federation: link peer join error: {e}");
859            }
860            Ok(None) | Err(_) => break,
861        }
862        if tracker.lock().await.is_quorum_met(Instant::now()) {
863            break;
864        }
865    }
866
867    if !joins.is_empty() {
868        tokio::spawn(async move {
869            while let Some(res) = joins.join_next().await {
870                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
871                    tracing::debug!(
872                        "federation: post-quorum link peer {peer_id} did not ack: {reason}"
873                    );
874                }
875            }
876        });
877    }
878
879    let tracker = Arc::try_unwrap(tracker)
880        .map_err(|_| QuorumError::LocalWriteFailed {
881            detail: "tracker arc still referenced at finalise".to_string(),
882        })?
883        .into_inner();
884    Ok(tracker)
885}
886
887/// v0.6.2 (#326): fan out a consolidation in a single `sync_push` — the new
888/// consolidated memory + the source ids being deleted. Mirrors the local
889/// semantics of `db::consolidate` (insert new + delete sources) so peers
890/// end up in the same terminal state as the originator.
891///
892/// # Errors
893///
894/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
895pub async fn broadcast_consolidate_quorum(
896    config: &FederationConfig,
897    new_mem: &Memory,
898    source_ids: &[String],
899) -> Result<AckTracker, QuorumError> {
900    let now = Instant::now();
901    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
902    tracker.lock().await.record_local();
903
904    let body = serde_json::json!({
905        "sender_agent_id": config.sender_agent_id,
906        "memories": [new_mem],
907        "deletions": source_ids,
908        "dry_run": false,
909    });
910
911    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
912    for peer in &config.peers {
913        let client = config.client.clone();
914        let url = peer.sync_push_url.clone();
915        let peer_id = peer.id.clone();
916        let payload = body.clone();
917        let target_id = new_mem.id.clone();
918        joins.spawn(async move {
919            let outcome =
920                post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
921            (peer_id, outcome)
922        });
923    }
924
925    let deadline = now + config.policy.ack_timeout;
926    loop {
927        let remaining = deadline.saturating_duration_since(Instant::now());
928        if remaining.is_zero() {
929            break;
930        }
931        match tokio::time::timeout(remaining, joins.join_next()).await {
932            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
933                tracker.lock().await.record_peer_ack(peer_id);
934            }
935            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
936                tracker.lock().await.record_id_drift(peer_id);
937            }
938            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
939                tracing::warn!(
940                    "federation: consolidate peer {peer_id} failed for {}: {reason}",
941                    new_mem.id
942                );
943            }
944            Ok(Some(Err(e))) => {
945                tracing::warn!("federation: consolidate peer join error: {e}");
946            }
947            Ok(None) | Err(_) => break,
948        }
949        if tracker.lock().await.is_quorum_met(Instant::now()) {
950            break;
951        }
952    }
953
954    if !joins.is_empty() {
955        tokio::spawn(async move {
956            while let Some(res) = joins.join_next().await {
957                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
958                    tracing::debug!(
959                        "federation: post-quorum consolidate peer {peer_id} did not ack: {reason}"
960                    );
961                }
962            }
963        });
964    }
965
966    let tracker = Arc::try_unwrap(tracker)
967        .map_err(|_| QuorumError::LocalWriteFailed {
968            detail: "tracker arc still referenced at finalise".to_string(),
969        })?
970        .into_inner();
971    Ok(tracker)
972}
973
974/// v0.6.2 (S34): fan out a just-created pending-action row to every peer
975/// via `sync_push.pendings`. Callers pass the fully-hydrated `PendingAction`
976/// read from their local `pending_actions` table so peers can upsert it
977/// with the same id / status / approvals tuple the originator has. Mirrors
978/// the quorum semantics of `broadcast_store_quorum` — local pending row
979/// is already persisted at call time; peer acks are counted against
980/// `policy.write_quorum`.
981///
982/// # Errors
983///
984/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
985pub async fn broadcast_pending_quorum(
986    config: &FederationConfig,
987    pending: &PendingAction,
988) -> Result<AckTracker, QuorumError> {
989    let now = Instant::now();
990    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
991    tracker.lock().await.record_local();
992
993    let body = serde_json::json!({
994        "sender_agent_id": config.sender_agent_id,
995        "memories": [],
996        "pendings": [pending],
997        "dry_run": false,
998    });
999
1000    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1001    for peer in &config.peers {
1002        let client = config.client.clone();
1003        let url = peer.sync_push_url.clone();
1004        let peer_id = peer.id.clone();
1005        let payload = body.clone();
1006        let target_id = pending.id.clone();
1007        joins.spawn(async move {
1008            let outcome =
1009                post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
1010            (peer_id, outcome)
1011        });
1012    }
1013
1014    let deadline = now + config.policy.ack_timeout;
1015    loop {
1016        let remaining = deadline.saturating_duration_since(Instant::now());
1017        if remaining.is_zero() {
1018            break;
1019        }
1020        match tokio::time::timeout(remaining, joins.join_next()).await {
1021            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1022                tracker.lock().await.record_peer_ack(peer_id);
1023            }
1024            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1025                tracker.lock().await.record_id_drift(peer_id);
1026            }
1027            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1028                tracing::warn!(
1029                    "federation: pending peer {peer_id} failed for {}: {reason}",
1030                    pending.id
1031                );
1032            }
1033            Ok(Some(Err(e))) => {
1034                tracing::warn!("federation: pending peer join error: {e}");
1035            }
1036            Ok(None) | Err(_) => break,
1037        }
1038        if tracker.lock().await.is_quorum_met(Instant::now()) {
1039            break;
1040        }
1041    }
1042
1043    if !joins.is_empty() {
1044        tokio::spawn(async move {
1045            while let Some(res) = joins.join_next().await {
1046                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1047                    tracing::debug!(
1048                        "federation: post-quorum pending peer {peer_id} did not ack: {reason}"
1049                    );
1050                }
1051            }
1052        });
1053    }
1054
1055    let tracker = Arc::try_unwrap(tracker)
1056        .map_err(|_| QuorumError::LocalWriteFailed {
1057            detail: "tracker arc still referenced at finalise".to_string(),
1058        })?
1059        .into_inner();
1060    Ok(tracker)
1061}
1062
1063/// v0.6.2 (S34): fan out a pending-action decision (approve/reject) to
1064/// peers via `sync_push.pending_decisions`. Without this, an approve on
1065/// node-2 leaves the row in `status='pending'` on node-1 and the caller
1066/// sees inconsistent governance state across the cluster. Peers apply
1067/// via `db::decide_pending_action` which is a no-op on already-decided
1068/// rows — replay-safe.
1069///
1070/// # Errors
1071///
1072/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
1073pub async fn broadcast_pending_decision_quorum(
1074    config: &FederationConfig,
1075    decision: &PendingDecision,
1076) -> Result<AckTracker, QuorumError> {
1077    let now = Instant::now();
1078    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1079    tracker.lock().await.record_local();
1080
1081    let body = serde_json::json!({
1082        "sender_agent_id": config.sender_agent_id,
1083        "memories": [],
1084        "pending_decisions": [decision],
1085        "dry_run": false,
1086    });
1087
1088    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1089    for peer in &config.peers {
1090        let client = config.client.clone();
1091        let url = peer.sync_push_url.clone();
1092        let peer_id = peer.id.clone();
1093        let payload = body.clone();
1094        let target_id = decision.id.clone();
1095        joins.spawn(async move {
1096            let outcome =
1097                post_and_classify(&client, &url, &payload, &target_id, Some(&target_id)).await;
1098            (peer_id, outcome)
1099        });
1100    }
1101
1102    let deadline = now + config.policy.ack_timeout;
1103    loop {
1104        let remaining = deadline.saturating_duration_since(Instant::now());
1105        if remaining.is_zero() {
1106            break;
1107        }
1108        match tokio::time::timeout(remaining, joins.join_next()).await {
1109            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1110                tracker.lock().await.record_peer_ack(peer_id);
1111            }
1112            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1113                tracker.lock().await.record_id_drift(peer_id);
1114            }
1115            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1116                tracing::warn!(
1117                    "federation: pending-decision peer {peer_id} failed for {}: {reason}",
1118                    decision.id
1119                );
1120            }
1121            Ok(Some(Err(e))) => {
1122                tracing::warn!("federation: pending-decision peer join error: {e}");
1123            }
1124            Ok(None) | Err(_) => break,
1125        }
1126        if tracker.lock().await.is_quorum_met(Instant::now()) {
1127            break;
1128        }
1129    }
1130
1131    if !joins.is_empty() {
1132        tokio::spawn(async move {
1133            while let Some(res) = joins.join_next().await {
1134                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1135                    tracing::debug!(
1136                        "federation: post-quorum pending-decision peer {peer_id} did not ack: {reason}"
1137                    );
1138                }
1139            }
1140        });
1141    }
1142
1143    let tracker = Arc::try_unwrap(tracker)
1144        .map_err(|_| QuorumError::LocalWriteFailed {
1145            detail: "tracker arc still referenced at finalise".to_string(),
1146        })?
1147        .into_inner();
1148    Ok(tracker)
1149}
1150
1151/// v0.6.2 (S35): fan out a `namespace_meta` row (the `(namespace,
1152/// standard_id, parent_namespace)` tuple set by `set_namespace_standard`)
1153/// to peers via `sync_push.namespace_meta`. Without this, peers see the
1154/// standard memory (already fanned out via `broadcast_store_quorum`) but
1155/// not the meta row tying it to a namespace + parent — so the
1156/// parent-chain walk on the peer falls through to `auto_detect_parent`
1157/// and can return a different ancestor than the originator.
1158///
1159/// # Errors
1160///
1161/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
1162pub async fn broadcast_namespace_meta_quorum(
1163    config: &FederationConfig,
1164    entry: &NamespaceMetaEntry,
1165) -> Result<AckTracker, QuorumError> {
1166    let now = Instant::now();
1167    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1168    tracker.lock().await.record_local();
1169
1170    let body = serde_json::json!({
1171        "sender_agent_id": config.sender_agent_id,
1172        "memories": [],
1173        "namespace_meta": [entry],
1174        "dry_run": false,
1175    });
1176
1177    let target_id = entry.namespace.clone();
1178    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1179    for peer in &config.peers {
1180        let client = config.client.clone();
1181        let url = peer.sync_push_url.clone();
1182        let peer_id = peer.id.clone();
1183        let payload = body.clone();
1184        let target = target_id.clone();
1185        joins.spawn(async move {
1186            let outcome = post_and_classify(&client, &url, &payload, &target, Some(&target)).await;
1187            (peer_id, outcome)
1188        });
1189    }
1190
1191    let deadline = now + config.policy.ack_timeout;
1192    loop {
1193        let remaining = deadline.saturating_duration_since(Instant::now());
1194        if remaining.is_zero() {
1195            break;
1196        }
1197        match tokio::time::timeout(remaining, joins.join_next()).await {
1198            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1199                tracker.lock().await.record_peer_ack(peer_id);
1200            }
1201            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1202                tracker.lock().await.record_id_drift(peer_id);
1203            }
1204            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1205                tracing::warn!(
1206                    "federation: namespace_meta peer {peer_id} failed for {}: {reason}",
1207                    entry.namespace
1208                );
1209            }
1210            Ok(Some(Err(e))) => {
1211                tracing::warn!("federation: namespace_meta peer join error: {e}");
1212            }
1213            Ok(None) | Err(_) => break,
1214        }
1215        if tracker.lock().await.is_quorum_met(Instant::now()) {
1216            break;
1217        }
1218    }
1219
1220    if !joins.is_empty() {
1221        tokio::spawn(async move {
1222            while let Some(res) = joins.join_next().await {
1223                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1224                    tracing::debug!(
1225                        "federation: post-quorum namespace_meta peer {peer_id} did not ack: {reason}"
1226                    );
1227                }
1228            }
1229        });
1230    }
1231
1232    let tracker = Arc::try_unwrap(tracker)
1233        .map_err(|_| QuorumError::LocalWriteFailed {
1234            detail: "tracker arc still referenced at finalise".to_string(),
1235        })?
1236        .into_inner();
1237    Ok(tracker)
1238}
1239
1240/// v0.6.2 (S35 follow-up): fan out a namespace-standard *clear* to peers
1241/// via `sync_push.namespace_meta_clears`. PR #363 shipped set-side fanout
1242/// via `broadcast_namespace_meta_quorum` but left the clear path local-only
1243/// — alice clearing on node-1 didn't propagate to bob on node-2, so the
1244/// scenario-35 cross-peer clear assertion failed.
1245///
1246/// Same quorum contract as the set broadcast: local-write pre-counted, one
1247/// POST per peer, `sync_push` bodies stuffed with the list of cleared
1248/// namespaces, first W-of-N acks win.
1249///
1250/// # Errors
1251///
1252/// Returns `QuorumError::LocalWriteFailed` on pathological detach race.
1253pub async fn broadcast_namespace_meta_clear_quorum(
1254    config: &FederationConfig,
1255    namespaces: &[String],
1256) -> Result<AckTracker, QuorumError> {
1257    let now = Instant::now();
1258    let tracker = Arc::new(Mutex::new(AckTracker::new(config.policy.clone(), now)));
1259    tracker.lock().await.record_local();
1260
1261    let body = serde_json::json!({
1262        "sender_agent_id": config.sender_agent_id,
1263        "memories": [],
1264        "namespace_meta_clears": namespaces,
1265        "dry_run": false,
1266    });
1267
1268    // Use the joined namespace list as the ack-classifier's `target_id` so
1269    // post-quorum logs carry enough context to trace back to the operation.
1270    let target_id = namespaces.join(",");
1271    let mut joins: JoinSet<(String, AckOutcome)> = JoinSet::new();
1272    for peer in &config.peers {
1273        let client = config.client.clone();
1274        let url = peer.sync_push_url.clone();
1275        let peer_id = peer.id.clone();
1276        let payload = body.clone();
1277        let target = target_id.clone();
1278        joins.spawn(async move {
1279            let outcome = post_and_classify(&client, &url, &payload, &target, Some(&target)).await;
1280            (peer_id, outcome)
1281        });
1282    }
1283
1284    let deadline = now + config.policy.ack_timeout;
1285    loop {
1286        let remaining = deadline.saturating_duration_since(Instant::now());
1287        if remaining.is_zero() {
1288            break;
1289        }
1290        match tokio::time::timeout(remaining, joins.join_next()).await {
1291            Ok(Some(Ok((peer_id, AckOutcome::Ack)))) => {
1292                tracker.lock().await.record_peer_ack(peer_id);
1293            }
1294            Ok(Some(Ok((peer_id, AckOutcome::IdDrift)))) => {
1295                tracker.lock().await.record_id_drift(peer_id);
1296            }
1297            Ok(Some(Ok((peer_id, AckOutcome::Fail(reason))))) => {
1298                tracing::warn!(
1299                    "federation: namespace_meta_clear peer {peer_id} failed for [{}]: {reason}",
1300                    target_id
1301                );
1302            }
1303            Ok(Some(Err(e))) => {
1304                tracing::warn!("federation: namespace_meta_clear peer join error: {e}");
1305            }
1306            Ok(None) | Err(_) => break,
1307        }
1308        if tracker.lock().await.is_quorum_met(Instant::now()) {
1309            break;
1310        }
1311    }
1312
1313    if !joins.is_empty() {
1314        tokio::spawn(async move {
1315            while let Some(res) = joins.join_next().await {
1316                if let Ok((peer_id, AckOutcome::Fail(reason))) = res {
1317                    tracing::debug!(
1318                        "federation: post-quorum namespace_meta_clear peer {peer_id} did not ack: {reason}"
1319                    );
1320                }
1321            }
1322        });
1323    }
1324
1325    let tracker = Arc::try_unwrap(tracker)
1326        .map_err(|_| QuorumError::LocalWriteFailed {
1327            detail: "tracker arc still referenced at finalise".to_string(),
1328        })?
1329        .into_inner();
1330    Ok(tracker)
1331}
1332
1333/// Serialised 503 payload for failed quorum writes.
1334#[derive(Debug, Serialize, Deserialize)]
1335pub struct QuorumNotMetPayload {
1336    pub error: &'static str,
1337    pub got: usize,
1338    pub needed: usize,
1339    pub reason: String,
1340}
1341
1342impl QuorumNotMetPayload {
1343    #[must_use]
1344    pub fn from_err(err: &QuorumError) -> Self {
1345        match err {
1346            QuorumError::QuorumNotMet {
1347                got,
1348                needed,
1349                reason,
1350            } => Self {
1351                error: "quorum_not_met",
1352                got: *got,
1353                needed: *needed,
1354                // InFlight shouldn't surface in the HTTP payload — the
1355                // broadcast loop waits until the deadline before
1356                // calling finalise(). If a caller somehow gets it here,
1357                // we map to "timeout" for the operator-facing 503 so
1358                // we don't leak a transient internal state as a fourth
1359                // public string.
1360                reason: match reason {
1361                    QuorumFailureReason::Unreachable => "unreachable".to_string(),
1362                    QuorumFailureReason::Timeout | QuorumFailureReason::InFlight => {
1363                        "timeout".to_string()
1364                    }
1365                    QuorumFailureReason::IdDrift => "id_drift".to_string(),
1366                },
1367            },
1368            QuorumError::InvalidPolicy { detail } => Self {
1369                error: "quorum_not_met",
1370                got: 0,
1371                needed: 0,
1372                reason: format!("invalid_policy:{detail}"),
1373            },
1374            QuorumError::LocalWriteFailed { detail } => Self {
1375                error: "quorum_not_met",
1376                got: 0,
1377                needed: 0,
1378                reason: format!("local_write_failed:{detail}"),
1379            },
1380        }
1381    }
1382}
1383
1384/// v0.6.0.1 (#320) — post-partition catchup poller.
1385///
1386/// Previously a node rejoining the mesh after SIGSTOP / network blip / restart
1387/// would only receive NEW writes that arrived AFTER resume; anything the
1388/// other peers wrote during the outage stayed on those peers. r14 scenario-14
1389/// observed this as node-3 seeing 2/20 writes post-SIGCONT.
1390///
1391/// This loop periodically calls `GET /api/v1/sync/since?peer=<local>` against
1392/// each configured peer, applying returned memories via `insert_if_newer`.
1393/// The `since` value is the receiver-side vector clock entry for that peer,
1394/// so we never re-pull already-applied rows. First catchup after a restart
1395/// runs with `since=None`, pulling a capped snapshot (limit=500).
1396///
1397/// Interval is operator-tunable via `--catchup-interval-secs`. 0 disables.
1398/// The loop is a best-effort background task: errors are logged but never
1399/// propagated. In the happy path a partitioned node converges within one
1400/// interval after resume.
1401///
1402/// This is deliberately NOT a substitute for the synchronous quorum-write
1403/// path — it's a safety net for the tail. Normal writes still fan out via
1404/// `broadcast_store_quorum`; catchup only fires for rows that DIDN'T land
1405/// during the original write deadline.
1406pub fn spawn_catchup_loop(
1407    config: FederationConfig,
1408    db: crate::handlers::Db,
1409    interval: Duration,
1410) -> tokio::task::JoinHandle<()> {
1411    tokio::spawn(async move {
1412        // Small upfront delay so the first catchup doesn't fire before the
1413        // HTTP server has bound — avoids spurious "connection refused" on
1414        // node-1 during rolling start of a fresh cluster.
1415        tokio::time::sleep(Duration::from_secs(5)).await;
1416        loop {
1417            catchup_once(&config, &db).await;
1418            tokio::time::sleep(interval).await;
1419        }
1420    })
1421}
1422
1423async fn catchup_once(config: &FederationConfig, db: &crate::handlers::Db) {
1424    let local_id = config.sender_agent_id.clone();
1425    for peer in &config.peers {
1426        // Rebuild the peer's base URL from sync_push_url to get the
1427        // /api/v1/sync/since endpoint without recomputing peer config.
1428        let base = peer
1429            .sync_push_url
1430            .trim_end_matches("/api/v1/sync/push")
1431            .to_string();
1432
1433        // Load our local vector-clock entry for this peer so we only pull
1434        // the delta. First-time-ever runs with no prior clock pull a full
1435        // snapshot (capped below by ?limit=500 on the peer side).
1436        let since_opt: Option<String> = {
1437            let lock = db.lock().await;
1438            match crate::db::sync_state_load(&lock.0, &local_id) {
1439                Ok(clock) => clock.entries.get(&peer.id).cloned(),
1440                Err(_) => None,
1441            }
1442        };
1443
1444        let url = match since_opt.as_deref() {
1445            Some(s) => format!(
1446                "{base}/api/v1/sync/since?since={}&peer={local_id}",
1447                urlencoding_encode(s)
1448            ),
1449            None => format!("{base}/api/v1/sync/since?peer={local_id}"),
1450        };
1451
1452        let resp = match config.client.get(&url).send().await {
1453            Ok(r) if r.status().is_success() => r,
1454            Ok(r) => {
1455                tracing::debug!(
1456                    "catchup: peer {} returned HTTP {} — skipping this tick",
1457                    peer.id,
1458                    r.status()
1459                );
1460                continue;
1461            }
1462            Err(e) => {
1463                tracing::debug!("catchup: peer {} unreachable: {e}", peer.id);
1464                continue;
1465            }
1466        };
1467
1468        let body: serde_json::Value = match resp.json().await {
1469            Ok(v) => v,
1470            Err(e) => {
1471                tracing::warn!("catchup: peer {} returned unparseable body: {e}", peer.id);
1472                continue;
1473            }
1474        };
1475
1476        let memories = match body.get("memories").and_then(|v| v.as_array()) {
1477            Some(arr) => arr.clone(),
1478            None => continue,
1479        };
1480
1481        if memories.is_empty() {
1482            continue;
1483        }
1484
1485        let mut applied = 0usize;
1486        let mut latest_ts: Option<String> = None;
1487        {
1488            let lock = db.lock().await;
1489            for raw in &memories {
1490                let mem: crate::models::Memory = match serde_json::from_value(raw.clone()) {
1491                    Ok(m) => m,
1492                    Err(e) => {
1493                        tracing::warn!("catchup: unparseable memory from peer {}: {e}", peer.id);
1494                        continue;
1495                    }
1496                };
1497                if crate::validate::validate_memory(&mem).is_err() {
1498                    continue;
1499                }
1500                if latest_ts
1501                    .as_deref()
1502                    .is_none_or(|cur| mem.updated_at.as_str() > cur)
1503                {
1504                    latest_ts = Some(mem.updated_at.clone());
1505                }
1506                if crate::db::insert_if_newer(&lock.0, &mem).is_ok() {
1507                    applied += 1;
1508                }
1509            }
1510            if let Some(ts) = latest_ts.as_deref()
1511                && let Err(e) = crate::db::sync_state_observe(&lock.0, &local_id, &peer.id, ts)
1512            {
1513                tracing::warn!("catchup: sync_state_observe failed for {}: {e}", peer.id);
1514            }
1515        }
1516
1517        if applied > 0 {
1518            tracing::info!(
1519                "catchup: applied {applied} memories from peer {} (since={})",
1520                peer.id,
1521                since_opt.as_deref().unwrap_or("<full-snapshot>"),
1522            );
1523        }
1524    }
1525}
1526
1527// Minimal RFC 3986 percent-encoder for the `since` timestamp. Only covers
1528// what RFC 3339 + our namespace/id charsets can produce. We intentionally
1529// avoid pulling in a url-encoding crate for a 12-character string.
1530fn urlencoding_encode(s: &str) -> String {
1531    let mut out = String::with_capacity(s.len() + 6);
1532    for b in s.bytes() {
1533        match b {
1534            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
1535                out.push(b as char);
1536            }
1537            _ => {
1538                use std::fmt::Write;
1539                let _ = write!(out, "%{b:02X}");
1540            }
1541        }
1542    }
1543    out
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548    use super::*;
1549    use axum::Router;
1550    use axum::extract::Json as AxumJson;
1551    use axum::http::StatusCode;
1552    use axum::routing::post;
1553    use std::sync::atomic::{AtomicUsize, Ordering};
1554    use tokio::net::TcpListener;
1555
1556    fn sample_memory() -> Memory {
1557        let now = chrono::Utc::now().to_rfc3339();
1558        Memory {
1559            id: "fed-test".to_string(),
1560            tier: crate::models::Tier::Mid,
1561            namespace: "app".to_string(),
1562            title: "hello".to_string(),
1563            content: "world for federation test".to_string(),
1564            tags: vec!["t".to_string()],
1565            priority: 5,
1566            confidence: 1.0,
1567            source: "test".to_string(),
1568            access_count: 0,
1569            created_at: now.clone(),
1570            updated_at: now,
1571            last_accessed_at: None,
1572            expires_at: None,
1573            metadata: serde_json::json!({"agent_id":"ai:test"}),
1574        }
1575    }
1576
1577    #[derive(Clone, Copy)]
1578    enum MockBehaviour {
1579        Ack,
1580        Fail,
1581        Hang,
1582        /// Return HTTP 500 on the first `fail_until` calls, then 200.
1583        /// Used to exercise the S40 retry-once path.
1584        FailThenAck {
1585            fail_until: usize,
1586        },
1587    }
1588
1589    #[derive(Clone)]
1590    struct MockState {
1591        behaviour: MockBehaviour,
1592        count: Arc<AtomicUsize>,
1593    }
1594
1595    async fn mock_handler(
1596        axum::extract::State(state): axum::extract::State<MockState>,
1597        AxumJson(_body): AxumJson<serde_json::Value>,
1598    ) -> (StatusCode, AxumJson<serde_json::Value>) {
1599        let call = state.count.fetch_add(1, Ordering::Relaxed) + 1;
1600        match state.behaviour {
1601            MockBehaviour::Ack => (
1602                StatusCode::OK,
1603                AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
1604            ),
1605            MockBehaviour::Fail => (
1606                StatusCode::INTERNAL_SERVER_ERROR,
1607                AxumJson(serde_json::json!({"error":"stub failure"})),
1608            ),
1609            MockBehaviour::Hang => {
1610                tokio::time::sleep(Duration::from_secs(10)).await;
1611                (StatusCode::OK, AxumJson(serde_json::json!({"applied":1})))
1612            }
1613            MockBehaviour::FailThenAck { fail_until } => {
1614                if call <= fail_until {
1615                    (
1616                        StatusCode::INTERNAL_SERVER_ERROR,
1617                        AxumJson(serde_json::json!({"error":"stub transient failure"})),
1618                    )
1619                } else {
1620                    (
1621                        StatusCode::OK,
1622                        AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
1623                    )
1624                }
1625            }
1626        }
1627    }
1628
1629    async fn spawn_mock_peer(behaviour: MockBehaviour) -> (String, Arc<AtomicUsize>) {
1630        let call_count = Arc::new(AtomicUsize::new(0));
1631        let state = MockState {
1632            behaviour,
1633            count: call_count.clone(),
1634        };
1635        let app = Router::new()
1636            .route("/api/v1/sync/push", post(mock_handler))
1637            .with_state(state);
1638        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1639        let addr = listener.local_addr().unwrap();
1640        tokio::spawn(async move {
1641            axum::serve(listener, app).await.ok();
1642        });
1643        (format!("http://{addr}"), call_count)
1644    }
1645
1646    fn build_config(peers: Vec<String>, w: usize, timeout_ms: u64) -> FederationConfig {
1647        let client = reqwest::Client::builder()
1648            .timeout(Duration::from_millis(timeout_ms))
1649            .build()
1650            .unwrap();
1651        let n = 1 + peers.len();
1652        FederationConfig {
1653            policy: QuorumPolicy::new(
1654                n,
1655                w,
1656                Duration::from_millis(timeout_ms),
1657                Duration::from_secs(30),
1658            )
1659            .unwrap(),
1660            peers: peers
1661                .into_iter()
1662                .enumerate()
1663                .map(|(i, url)| PeerEndpoint {
1664                    id: format!("peer-{i}:{url}"),
1665                    sync_push_url: format!("{url}/api/v1/sync/push"),
1666                })
1667                .collect(),
1668            client,
1669            sender_agent_id: "ai:fed-test".to_string(),
1670        }
1671    }
1672
1673    #[tokio::test]
1674    async fn happy_path_two_peers_quorum_met() {
1675        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1676        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1677        let cfg = build_config(vec![url1, url2], 2, 2000);
1678        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1679            .await
1680            .unwrap();
1681        let result = finalise_quorum(&tracker);
1682        assert!(result.is_ok(), "expected quorum met, got {result:?}");
1683        // At least one peer called before quorum returned. With v0.6.0's
1684        // post-quorum detach, additional fan-outs complete in the
1685        // background and may or may not have landed by the time this
1686        // assertion runs — the synchronous contract is only "≥ 1 peer
1687        // acked before return".
1688        let calls = count1.load(Ordering::Relaxed) + count2.load(Ordering::Relaxed);
1689        assert!(calls >= 1);
1690    }
1691
1692    #[tokio::test]
1693    async fn post_quorum_fanout_reaches_all_peers() {
1694        // Contract: once quorum is met, the background detach must still
1695        // deliver the write to every peer. Ship-gate run 14 uncovered the
1696        // prior abort-on-quorum regression that left one peer permanently
1697        // missing ~50% of burst writes under W=2/N=3.
1698        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1699        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1700        let cfg = build_config(vec![url1, url2], 2, 2000);
1701        let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
1702            .await
1703            .unwrap();
1704        // Give the detached fanout a slow path to complete. Mock handlers
1705        // are in-process, so 200ms is comfortable without being flaky.
1706        for _ in 0..20 {
1707            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1708                break;
1709            }
1710            tokio::time::sleep(Duration::from_millis(10)).await;
1711        }
1712        assert_eq!(
1713            count1.load(Ordering::Relaxed),
1714            1,
1715            "peer-1 must receive the write post-quorum"
1716        );
1717        assert_eq!(
1718            count2.load(Ordering::Relaxed),
1719            1,
1720            "peer-2 must receive the write post-quorum"
1721        );
1722    }
1723
1724    #[tokio::test]
1725    async fn transient_peer_failure_is_retried_once() {
1726        // S40 regression guard: a transient 5xx from a peer on the
1727        // first POST must be retried exactly once. Previously the post
1728        // was fire-and-forget — one peer that 5xx'd a single bulk row
1729        // left that row permanently missing on that peer (v3r26
1730        // hermes-tls scenario-40: node-2 saw 499/500).
1731        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1732        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
1733        let cfg = build_config(vec![url1, url2], 2, 2000);
1734        let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
1735            .await
1736            .unwrap();
1737        // Retry backoff is 250ms + retry round-trip; poll up to 2s.
1738        for _ in 0..200 {
1739            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
1740                break;
1741            }
1742            tokio::time::sleep(Duration::from_millis(10)).await;
1743        }
1744        assert_eq!(
1745            count1.load(Ordering::Relaxed),
1746            1,
1747            "peer-1 acked first time, no retry"
1748        );
1749        assert_eq!(
1750            count2.load(Ordering::Relaxed),
1751            2,
1752            "peer-2 must see exactly two attempts (first fail, retry ack)"
1753        );
1754    }
1755
1756    #[tokio::test]
1757    async fn persistent_peer_failure_stops_after_one_retry() {
1758        // Retry policy is exactly one retry — a peer that stays down
1759        // must NOT be called more than twice per row (no infinite
1760        // backoff, no thundering herd on a wedged peer).
1761        let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
1762        let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
1763        let cfg = build_config(vec![url1, url2], 2, 2000);
1764        let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
1765            .await
1766            .unwrap();
1767        // Wait long enough that any further retries would have fired.
1768        tokio::time::sleep(Duration::from_millis(800)).await;
1769        assert_eq!(
1770            count2.load(Ordering::Relaxed),
1771            2,
1772            "persistently-failing peer must be called exactly twice (1 + 1 retry)"
1773        );
1774    }
1775
1776    #[tokio::test]
1777    async fn bulk_catchup_push_hits_every_peer_once() {
1778        // S40 catchup: verify the terminal batch POST reaches every
1779        // peer exactly once, with the full row set in a single request.
1780        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1781        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1782        let cfg = build_config(vec![url1, url2], 2, 2000);
1783        let mems = vec![sample_memory(), sample_memory(), sample_memory()];
1784        let errors = bulk_catchup_push(&cfg, &mems).await;
1785        assert!(
1786            errors.is_empty(),
1787            "catchup must succeed on healthy peers, got {errors:?}"
1788        );
1789        assert_eq!(
1790            count1.load(Ordering::Relaxed),
1791            1,
1792            "peer-1 must receive exactly one catchup batch"
1793        );
1794        assert_eq!(
1795            count2.load(Ordering::Relaxed),
1796            1,
1797            "peer-2 must receive exactly one catchup batch"
1798        );
1799    }
1800
1801    #[tokio::test]
1802    async fn bulk_catchup_push_reports_peer_failures() {
1803        // Catchup errors must be surfaced to the caller for logging —
1804        // quorum was already met upstream, so the HTTP contract holds,
1805        // but the leader should record which peers fell behind.
1806        let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
1807        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1808        let cfg = build_config(vec![url1, url2], 2, 2000);
1809        let mems = vec![sample_memory()];
1810        let errors = bulk_catchup_push(&cfg, &mems).await;
1811        assert_eq!(errors.len(), 1, "exactly one peer failed the catchup");
1812        assert!(
1813            errors[0].1.contains("500") || errors[0].1.contains("http"),
1814            "error must name the HTTP failure, got {:?}",
1815            errors[0]
1816        );
1817    }
1818
1819    #[tokio::test]
1820    async fn bulk_catchup_push_empty_inputs_are_noop() {
1821        // No rows + no peers → no work, no panics, no POSTs.
1822        let cfg = build_config(vec![], 1, 500);
1823        assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
1824
1825        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1826        let cfg = build_config(vec![url1], 1, 500);
1827        assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
1828        assert_eq!(
1829            count1.load(Ordering::Relaxed),
1830            0,
1831            "no catchup POST must fire when the row set is empty"
1832        );
1833    }
1834
1835    #[tokio::test]
1836    async fn partition_minority_fails_quorum() {
1837        // N = 3, W = 3. Two peers fail → cannot meet quorum.
1838        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1839        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1840        let cfg = build_config(vec![url1, url2], 3, 500);
1841        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1842            .await
1843            .unwrap();
1844        let err = finalise_quorum(&tracker).unwrap_err();
1845        match err {
1846            QuorumError::QuorumNotMet { got, needed, .. } => {
1847                assert_eq!(got, 1, "local commit only");
1848                assert_eq!(needed, 3);
1849            }
1850            other => panic!("expected QuorumNotMet, got {other:?}"),
1851        }
1852    }
1853
1854    #[tokio::test]
1855    async fn timeout_on_hanging_peer_classified_timeout() {
1856        // N = 2, W = 2. One hanging peer → timeout before ack.
1857        let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
1858        let cfg = build_config(vec![url1], 2, 200);
1859        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1860            .await
1861            .unwrap();
1862        // Ensure the deadline passed.
1863        tokio::time::sleep(Duration::from_millis(50)).await;
1864        let err = finalise_quorum(&tracker).unwrap_err();
1865        match err {
1866            QuorumError::QuorumNotMet { reason, .. } => {
1867                assert!(
1868                    matches!(
1869                        reason,
1870                        QuorumFailureReason::Timeout | QuorumFailureReason::Unreachable
1871                    ),
1872                    "unexpected reason {reason:?}"
1873                );
1874            }
1875            other => panic!("expected QuorumNotMet, got {other:?}"),
1876        }
1877    }
1878
1879    #[tokio::test]
1880    async fn majority_quorum_tolerates_one_peer_down() {
1881        // N = 3, W = 2 (majority). One fails, one acks → quorum met.
1882        let (url_up, _) = spawn_mock_peer(MockBehaviour::Ack).await;
1883        let (url_down, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1884        let cfg = build_config(vec![url_up, url_down], 2, 2000);
1885        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1886            .await
1887            .unwrap();
1888        let result = finalise_quorum(&tracker);
1889        assert!(
1890            result.is_ok(),
1891            "majority should tolerate 1 peer down, got {result:?}"
1892        );
1893    }
1894
1895    #[test]
1896    fn config_build_disabled_when_w_zero() {
1897        let cfg = FederationConfig::build(
1898            0,
1899            &["http://example.com".to_string()],
1900            Duration::from_millis(500),
1901            None,
1902            None,
1903            None,
1904            "ai:test".to_string(),
1905        )
1906        .unwrap();
1907        assert!(cfg.is_none());
1908    }
1909
1910    #[test]
1911    fn config_build_disabled_when_peers_empty() {
1912        let cfg = FederationConfig::build(
1913            2,
1914            &[],
1915            Duration::from_millis(500),
1916            None,
1917            None,
1918            None,
1919            "ai:test".to_string(),
1920        )
1921        .unwrap();
1922        assert!(cfg.is_none());
1923    }
1924
1925    #[test]
1926    fn quorum_not_met_payload_from_err() {
1927        let err = QuorumError::QuorumNotMet {
1928            got: 1,
1929            needed: 3,
1930            reason: QuorumFailureReason::Timeout,
1931        };
1932        let payload = QuorumNotMetPayload::from_err(&err);
1933        assert_eq!(payload.error, "quorum_not_met");
1934        assert_eq!(payload.got, 1);
1935        assert_eq!(payload.needed, 3);
1936        assert_eq!(payload.reason, "timeout");
1937    }
1938
1939    // --- broadcast_archive_quorum tests (S29) ---
1940
1941    #[tokio::test]
1942    async fn archive_quorum_two_peers_ack_meets_quorum() {
1943        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1944        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1945        let cfg = build_config(vec![url1, url2], 2, 2000);
1946        let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
1947        let result = finalise_quorum(&tracker);
1948        assert!(result.is_ok(), "expected quorum met, got {result:?}");
1949        // Let detached fanout complete so both peers are observed.
1950        for _ in 0..20 {
1951            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1952                break;
1953            }
1954            tokio::time::sleep(Duration::from_millis(10)).await;
1955        }
1956        assert_eq!(count1.load(Ordering::Relaxed), 1);
1957        assert_eq!(count2.load(Ordering::Relaxed), 1);
1958    }
1959
1960    #[tokio::test]
1961    async fn archive_quorum_partition_minority_fails() {
1962        // N = 3, W = 3. Two peers fail → archive quorum cannot be met.
1963        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1964        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1965        let cfg = build_config(vec![url1, url2], 3, 500);
1966        let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
1967        let err = finalise_quorum(&tracker).unwrap_err();
1968        match err {
1969            QuorumError::QuorumNotMet { got, needed, .. } => {
1970                assert_eq!(got, 1);
1971                assert_eq!(needed, 3);
1972            }
1973            other => panic!("expected QuorumNotMet, got {other:?}"),
1974        }
1975    }
1976
1977    // --- broadcast_delete_quorum tests (Wave 3) ---
1978    //
1979    // The delete fanout mirrors the store fanout but rides a `deletions: [id]`
1980    // payload instead of memory bodies. These two cases hit the entire
1981    // function body — happy ack loop, deadline check, post-quorum detach,
1982    // tracker unwrap.
1983
1984    #[tokio::test]
1985    async fn delete_quorum_two_peers_ack_meets_quorum() {
1986        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1987        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1988        let cfg = build_config(vec![url1, url2], 2, 2000);
1989        let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
1990        assert!(finalise_quorum(&tracker).is_ok());
1991        for _ in 0..20 {
1992            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1993                break;
1994            }
1995            tokio::time::sleep(Duration::from_millis(10)).await;
1996        }
1997        assert_eq!(count1.load(Ordering::Relaxed), 1);
1998        assert_eq!(count2.load(Ordering::Relaxed), 1);
1999    }
2000
2001    #[tokio::test]
2002    async fn delete_quorum_partition_minority_fails() {
2003        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2004        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2005        let cfg = build_config(vec![url1, url2], 3, 500);
2006        let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
2007        let err = finalise_quorum(&tracker).unwrap_err();
2008        match err {
2009            QuorumError::QuorumNotMet { got, needed, .. } => {
2010                assert_eq!(got, 1);
2011                assert_eq!(needed, 3);
2012            }
2013            other => panic!("expected QuorumNotMet, got {other:?}"),
2014        }
2015    }
2016
2017    // --- broadcast_restore_quorum tests (Wave 3) ---
2018
2019    #[tokio::test]
2020    async fn restore_quorum_two_peers_ack_meets_quorum() {
2021        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2022        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2023        let cfg = build_config(vec![url1, url2], 2, 2000);
2024        let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
2025        assert!(finalise_quorum(&tracker).is_ok());
2026        for _ in 0..20 {
2027            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2028                break;
2029            }
2030            tokio::time::sleep(Duration::from_millis(10)).await;
2031        }
2032        assert_eq!(count1.load(Ordering::Relaxed), 1);
2033        assert_eq!(count2.load(Ordering::Relaxed), 1);
2034    }
2035
2036    #[tokio::test]
2037    async fn restore_quorum_partition_minority_fails() {
2038        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2039        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2040        let cfg = build_config(vec![url1, url2], 3, 500);
2041        let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
2042        let err = finalise_quorum(&tracker).unwrap_err();
2043        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2044    }
2045
2046    // --- broadcast_link_quorum tests (Wave 3) ---
2047
2048    fn sample_link() -> MemoryLink {
2049        MemoryLink {
2050            source_id: "mem-a".to_string(),
2051            target_id: "mem-b".to_string(),
2052            relation: "related_to".to_string(),
2053            created_at: chrono::Utc::now().to_rfc3339(),
2054        }
2055    }
2056
2057    #[tokio::test]
2058    async fn link_quorum_two_peers_ack_meets_quorum() {
2059        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2060        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2061        let cfg = build_config(vec![url1, url2], 2, 2000);
2062        let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
2063        assert!(finalise_quorum(&tracker).is_ok());
2064        for _ in 0..20 {
2065            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2066                break;
2067            }
2068            tokio::time::sleep(Duration::from_millis(10)).await;
2069        }
2070        assert_eq!(count1.load(Ordering::Relaxed), 1);
2071        assert_eq!(count2.load(Ordering::Relaxed), 1);
2072    }
2073
2074    #[tokio::test]
2075    async fn link_quorum_partition_minority_fails() {
2076        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2077        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2078        let cfg = build_config(vec![url1, url2], 3, 500);
2079        let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
2080        let err = finalise_quorum(&tracker).unwrap_err();
2081        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2082    }
2083
2084    // --- broadcast_consolidate_quorum tests (Wave 3) ---
2085
2086    #[tokio::test]
2087    async fn consolidate_quorum_two_peers_ack_meets_quorum() {
2088        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2089        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2090        let cfg = build_config(vec![url1, url2], 2, 2000);
2091        let new_mem = sample_memory();
2092        let sources = vec!["src-a".to_string(), "src-b".to_string()];
2093        let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &sources)
2094            .await
2095            .unwrap();
2096        assert!(finalise_quorum(&tracker).is_ok());
2097        for _ in 0..20 {
2098            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2099                break;
2100            }
2101            tokio::time::sleep(Duration::from_millis(10)).await;
2102        }
2103        assert_eq!(count1.load(Ordering::Relaxed), 1);
2104        assert_eq!(count2.load(Ordering::Relaxed), 1);
2105    }
2106
2107    #[tokio::test]
2108    async fn consolidate_quorum_partition_minority_fails() {
2109        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2110        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2111        let cfg = build_config(vec![url1, url2], 3, 500);
2112        let new_mem = sample_memory();
2113        let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
2114            .await
2115            .unwrap();
2116        let err = finalise_quorum(&tracker).unwrap_err();
2117        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2118    }
2119
2120    // --- broadcast_pending_quorum tests (Wave 3) ---
2121
2122    fn sample_pending() -> PendingAction {
2123        PendingAction {
2124            id: "pa-1".to_string(),
2125            action_type: "delete".to_string(),
2126            memory_id: Some("mem-x".to_string()),
2127            namespace: "app".to_string(),
2128            payload: serde_json::json!({}),
2129            requested_by: "ai:test".to_string(),
2130            requested_at: chrono::Utc::now().to_rfc3339(),
2131            status: "pending".to_string(),
2132            decided_by: None,
2133            decided_at: None,
2134            approvals: vec![],
2135        }
2136    }
2137
2138    #[tokio::test]
2139    async fn pending_quorum_two_peers_ack_meets_quorum() {
2140        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2141        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2142        let cfg = build_config(vec![url1, url2], 2, 2000);
2143        let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
2144            .await
2145            .unwrap();
2146        assert!(finalise_quorum(&tracker).is_ok());
2147        for _ in 0..20 {
2148            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2149                break;
2150            }
2151            tokio::time::sleep(Duration::from_millis(10)).await;
2152        }
2153        assert_eq!(count1.load(Ordering::Relaxed), 1);
2154        assert_eq!(count2.load(Ordering::Relaxed), 1);
2155    }
2156
2157    #[tokio::test]
2158    async fn pending_quorum_partition_minority_fails() {
2159        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2160        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2161        let cfg = build_config(vec![url1, url2], 3, 500);
2162        let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
2163            .await
2164            .unwrap();
2165        let err = finalise_quorum(&tracker).unwrap_err();
2166        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2167    }
2168
2169    // --- broadcast_pending_decision_quorum tests (Wave 3) ---
2170
2171    fn sample_decision() -> PendingDecision {
2172        PendingDecision {
2173            id: "pa-1".to_string(),
2174            approved: true,
2175            decider: "ai:approver".to_string(),
2176        }
2177    }
2178
2179    #[tokio::test]
2180    async fn pending_decision_quorum_two_peers_ack_meets_quorum() {
2181        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2182        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2183        let cfg = build_config(vec![url1, url2], 2, 2000);
2184        let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
2185            .await
2186            .unwrap();
2187        assert!(finalise_quorum(&tracker).is_ok());
2188        for _ in 0..20 {
2189            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2190                break;
2191            }
2192            tokio::time::sleep(Duration::from_millis(10)).await;
2193        }
2194        assert_eq!(count1.load(Ordering::Relaxed), 1);
2195        assert_eq!(count2.load(Ordering::Relaxed), 1);
2196    }
2197
2198    #[tokio::test]
2199    async fn pending_decision_quorum_partition_minority_fails() {
2200        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2201        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2202        let cfg = build_config(vec![url1, url2], 3, 500);
2203        let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
2204            .await
2205            .unwrap();
2206        let err = finalise_quorum(&tracker).unwrap_err();
2207        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2208    }
2209
2210    // --- broadcast_namespace_meta_quorum tests (Wave 3) ---
2211
2212    fn sample_namespace_meta() -> NamespaceMetaEntry {
2213        NamespaceMetaEntry {
2214            namespace: "app/team".to_string(),
2215            standard_id: "mem-std-1".to_string(),
2216            parent_namespace: Some("app".to_string()),
2217            updated_at: chrono::Utc::now().to_rfc3339(),
2218        }
2219    }
2220
2221    #[tokio::test]
2222    async fn namespace_meta_quorum_two_peers_ack_meets_quorum() {
2223        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2224        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2225        let cfg = build_config(vec![url1, url2], 2, 2000);
2226        let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
2227            .await
2228            .unwrap();
2229        assert!(finalise_quorum(&tracker).is_ok());
2230        for _ in 0..20 {
2231            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2232                break;
2233            }
2234            tokio::time::sleep(Duration::from_millis(10)).await;
2235        }
2236        assert_eq!(count1.load(Ordering::Relaxed), 1);
2237        assert_eq!(count2.load(Ordering::Relaxed), 1);
2238    }
2239
2240    #[tokio::test]
2241    async fn namespace_meta_quorum_partition_minority_fails() {
2242        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2243        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2244        let cfg = build_config(vec![url1, url2], 3, 500);
2245        let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
2246            .await
2247            .unwrap();
2248        let err = finalise_quorum(&tracker).unwrap_err();
2249        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2250    }
2251
2252    // --- broadcast_namespace_meta_clear_quorum tests (Wave 3) ---
2253
2254    #[tokio::test]
2255    async fn namespace_meta_clear_quorum_two_peers_ack_meets_quorum() {
2256        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2257        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2258        let cfg = build_config(vec![url1, url2], 2, 2000);
2259        let namespaces = vec!["app/team".to_string(), "app/other".to_string()];
2260        let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
2261            .await
2262            .unwrap();
2263        assert!(finalise_quorum(&tracker).is_ok());
2264        for _ in 0..20 {
2265            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
2266                break;
2267            }
2268            tokio::time::sleep(Duration::from_millis(10)).await;
2269        }
2270        assert_eq!(count1.load(Ordering::Relaxed), 1);
2271        assert_eq!(count2.load(Ordering::Relaxed), 1);
2272    }
2273
2274    #[tokio::test]
2275    async fn namespace_meta_clear_quorum_partition_minority_fails() {
2276        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2277        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2278        let cfg = build_config(vec![url1, url2], 3, 500);
2279        let namespaces = vec!["app/team".to_string()];
2280        let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
2281            .await
2282            .unwrap();
2283        let err = finalise_quorum(&tracker).unwrap_err();
2284        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2285    }
2286
2287    // --- QuorumNotMetPayload::from_err branch coverage (Wave 3) ---
2288    //
2289    // The non-Timeout reasons (Unreachable, IdDrift, InFlight) and the
2290    // non-QuorumNotMet variants (InvalidPolicy, LocalWriteFailed) were
2291    // never exercised — `from_err` had only the Timeout path covered.
2292
2293    #[test]
2294    fn quorum_not_met_payload_unreachable_reason() {
2295        let err = QuorumError::QuorumNotMet {
2296            got: 1,
2297            needed: 2,
2298            reason: QuorumFailureReason::Unreachable,
2299        };
2300        let payload = QuorumNotMetPayload::from_err(&err);
2301        assert_eq!(payload.reason, "unreachable");
2302    }
2303
2304    #[test]
2305    fn quorum_not_met_payload_id_drift_reason() {
2306        let err = QuorumError::QuorumNotMet {
2307            got: 1,
2308            needed: 2,
2309            reason: QuorumFailureReason::IdDrift,
2310        };
2311        let payload = QuorumNotMetPayload::from_err(&err);
2312        assert_eq!(payload.reason, "id_drift");
2313    }
2314
2315    #[test]
2316    fn quorum_not_met_payload_in_flight_reason_maps_to_timeout() {
2317        // InFlight is a transient internal state; HTTP payload maps it to
2318        // "timeout" rather than leaking a fourth public reason string.
2319        let err = QuorumError::QuorumNotMet {
2320            got: 1,
2321            needed: 2,
2322            reason: QuorumFailureReason::InFlight,
2323        };
2324        let payload = QuorumNotMetPayload::from_err(&err);
2325        assert_eq!(payload.reason, "timeout");
2326    }
2327
2328    #[test]
2329    fn quorum_not_met_payload_invalid_policy_branch() {
2330        let err = QuorumError::InvalidPolicy {
2331            detail: "bad-thing".to_string(),
2332        };
2333        let payload = QuorumNotMetPayload::from_err(&err);
2334        assert_eq!(payload.error, "quorum_not_met");
2335        assert_eq!(payload.got, 0);
2336        assert_eq!(payload.needed, 0);
2337        assert!(payload.reason.starts_with("invalid_policy:"));
2338        assert!(payload.reason.contains("bad-thing"));
2339    }
2340
2341    #[test]
2342    fn quorum_not_met_payload_local_write_failed_branch() {
2343        let err = QuorumError::LocalWriteFailed {
2344            detail: "disk-full".to_string(),
2345        };
2346        let payload = QuorumNotMetPayload::from_err(&err);
2347        assert_eq!(payload.error, "quorum_not_met");
2348        assert!(payload.reason.starts_with("local_write_failed:"));
2349        assert!(payload.reason.contains("disk-full"));
2350    }
2351
2352    // --- FederationConfig::build coverage (Wave 3) ---
2353
2354    #[test]
2355    fn config_build_constructs_when_w_and_peers_set() {
2356        let cfg = FederationConfig::build(
2357            2,
2358            &[
2359                "http://peer-a.example/".to_string(),
2360                "http://peer-b.example".to_string(),
2361            ],
2362            Duration::from_millis(500),
2363            None,
2364            None,
2365            None,
2366            "ai:builder".to_string(),
2367        )
2368        .unwrap()
2369        .expect("config should be Some when w>0 and peers nonempty");
2370        assert_eq!(cfg.peer_count(), 2);
2371        assert_eq!(cfg.peers[0].id, "peer-0");
2372        assert_eq!(cfg.peers[1].id, "peer-1");
2373        // Trailing slash is stripped during URL normalization.
2374        assert_eq!(
2375            cfg.peers[0].sync_push_url,
2376            "http://peer-a.example/api/v1/sync/push"
2377        );
2378        assert_eq!(
2379            cfg.peers[1].sync_push_url,
2380            "http://peer-b.example/api/v1/sync/push"
2381        );
2382        assert_eq!(cfg.sender_agent_id, "ai:builder");
2383    }
2384
2385    #[test]
2386    fn config_build_rejects_duplicate_peer_urls() {
2387        let result = FederationConfig::build(
2388            2,
2389            &[
2390                "http://peer.example".to_string(),
2391                "http://peer.example/".to_string(),
2392            ],
2393            Duration::from_millis(500),
2394            None,
2395            None,
2396            None,
2397            "ai:builder".to_string(),
2398        );
2399        let err = match result {
2400            Ok(_) => panic!("expected duplicate-URL rejection"),
2401            Err(e) => e,
2402        };
2403        let msg = format!("{err}");
2404        assert!(
2405            msg.contains("duplicate peer URL"),
2406            "expected duplicate-URL rejection, got {msg:?}"
2407        );
2408    }
2409
2410    #[test]
2411    fn config_build_rejects_missing_ca_cert_path() {
2412        // ca_cert_path supplied but file doesn't exist → read error
2413        let bogus = std::path::PathBuf::from("/definitely/does/not/exist/ca.pem");
2414        let result = FederationConfig::build(
2415            2,
2416            &["http://peer.example".to_string()],
2417            Duration::from_millis(500),
2418            None,
2419            None,
2420            Some(&bogus),
2421            "ai:builder".to_string(),
2422        );
2423        let err = match result {
2424            Ok(_) => panic!("expected ca-cert read error"),
2425            Err(e) => e,
2426        };
2427        let msg = format!("{err}");
2428        assert!(
2429            msg.contains("read --quorum-ca-cert"),
2430            "expected ca-cert read error, got {msg:?}"
2431        );
2432    }
2433
2434    #[test]
2435    fn config_build_rejects_invalid_ca_cert_pem() {
2436        // Write a non-PEM file and confirm parse-side rejection.
2437        let dir = tempfile::tempdir().unwrap();
2438        let bad = dir.path().join("not-a-cert.pem");
2439        std::fs::write(&bad, b"this is not a valid pem certificate").unwrap();
2440        let result = FederationConfig::build(
2441            2,
2442            &["http://peer.example".to_string()],
2443            Duration::from_millis(500),
2444            None,
2445            None,
2446            Some(&bad),
2447            "ai:builder".to_string(),
2448        );
2449        let err = match result {
2450            Ok(_) => panic!("expected ca-cert parse error"),
2451            Err(e) => e,
2452        };
2453        let msg = format!("{err}");
2454        assert!(
2455            msg.contains("parse --quorum-ca-cert") || msg.contains("--quorum-ca-cert"),
2456            "expected ca-cert parse error, got {msg:?}"
2457        );
2458    }
2459
2460    #[test]
2461    fn config_build_rejects_missing_client_cert_path() {
2462        let bogus_cert = std::path::PathBuf::from("/definitely/missing/cert.pem");
2463        let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
2464        let result = FederationConfig::build(
2465            2,
2466            &["http://peer.example".to_string()],
2467            Duration::from_millis(500),
2468            Some(&bogus_cert),
2469            Some(&bogus_key),
2470            None,
2471            "ai:builder".to_string(),
2472        );
2473        let err = match result {
2474            Ok(_) => panic!("expected client-cert read error"),
2475            Err(e) => e,
2476        };
2477        let msg = format!("{err}");
2478        assert!(
2479            msg.contains("read --client-cert"),
2480            "expected client-cert read error, got {msg:?}"
2481        );
2482    }
2483
2484    #[test]
2485    fn peer_count_matches_peer_list() {
2486        let cfg = build_config(
2487            vec![
2488                "http://a.example".to_string(),
2489                "http://b.example".to_string(),
2490                "http://c.example".to_string(),
2491            ],
2492            2,
2493            500,
2494        );
2495        assert_eq!(cfg.peer_count(), 3);
2496    }
2497
2498    // --- urlencoding_encode coverage (Wave 3) ---
2499
2500    #[test]
2501    fn urlencoding_encode_passthrough_safe_chars() {
2502        // ASCII alpha-numeric + RFC3986 unreserved (-_.~) pass through.
2503        let encoded = urlencoding_encode("abcXYZ-09_.~");
2504        assert_eq!(encoded, "abcXYZ-09_.~");
2505    }
2506
2507    #[test]
2508    fn urlencoding_encode_percent_encodes_reserved_and_high_bits() {
2509        // Space, colon, plus, slash all get percent-encoded.
2510        let encoded = urlencoding_encode("2026-04-26T12:00:00+00:00 / x");
2511        assert!(
2512            encoded.contains("%3A"),
2513            "expected colon to be percent-encoded: {encoded}"
2514        );
2515        assert!(
2516            encoded.contains("%2B"),
2517            "expected + to be percent-encoded: {encoded}"
2518        );
2519        assert!(
2520            encoded.contains("%2F"),
2521            "expected / to be percent-encoded: {encoded}"
2522        );
2523        assert!(
2524            encoded.contains("%20"),
2525            "expected space to be percent-encoded: {encoded}"
2526        );
2527        // Hyphen IS in the unreserved set → must NOT be percent-encoded.
2528        assert!(
2529            !encoded.contains("%2D"),
2530            "hyphen must pass through unencoded: {encoded}"
2531        );
2532    }
2533
2534    #[test]
2535    fn urlencoding_encode_empty_string() {
2536        assert_eq!(urlencoding_encode(""), "");
2537    }
2538
2539    // --- broadcast_store_quorum id-drift path (Wave 3) ---
2540    //
2541    // The `IdDrift` arm in post_once + broadcast_store_quorum (lines around
2542    // 243-244 / 362-366) was uncovered. A peer that returns a 200 with an
2543    // `ids` array NOT containing the expected memory id should be classified
2544    // as IdDrift, not Ack.
2545
2546    async fn id_drift_handler(
2547        AxumJson(_body): AxumJson<serde_json::Value>,
2548    ) -> (StatusCode, AxumJson<serde_json::Value>) {
2549        // 200 OK but ids[0] disagrees with the memory the leader sent.
2550        (
2551            StatusCode::OK,
2552            AxumJson(serde_json::json!({"ids": ["some-other-id"], "applied": 1})),
2553        )
2554    }
2555
2556    async fn spawn_id_drift_peer() -> String {
2557        let app = Router::new().route("/api/v1/sync/push", post(id_drift_handler));
2558        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2559        let addr = listener.local_addr().unwrap();
2560        tokio::spawn(async move {
2561            axum::serve(listener, app).await.ok();
2562        });
2563        format!("http://{addr}")
2564    }
2565
2566    #[tokio::test]
2567    async fn id_drift_peer_does_not_count_as_ack() {
2568        // Two peers, both return 200 but with `ids: [other-id]`. Quorum
2569        // can't be met because neither counts as a peer ack — only the
2570        // local commit registers.
2571        let url1 = spawn_id_drift_peer().await;
2572        let url2 = spawn_id_drift_peer().await;
2573        let cfg = build_config(vec![url1, url2], 2, 1000);
2574        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
2575            .await
2576            .unwrap();
2577        let result = finalise_quorum(&tracker);
2578        // With W=2, N=3 (local + 2 peers), local + 0 peer-acks = 1 < 2.
2579        let err = result.unwrap_err();
2580        match err {
2581            QuorumError::QuorumNotMet {
2582                got,
2583                needed,
2584                reason,
2585            } => {
2586                assert_eq!(got, 1, "only local should count");
2587                assert_eq!(needed, 2);
2588                // IdDrift / Timeout / InFlight are all valid here. The
2589                // tracker classifies based on whether ANY peer reported a
2590                // drift (IdDrift), the deadline elapsed first (Timeout),
2591                // or all peers reported but the deadline still hadn't
2592                // passed when finalise was called (InFlight). The
2593                // important invariant is just "peer with drifted ids does
2594                // NOT count toward quorum".
2595                assert!(
2596                    matches!(
2597                        reason,
2598                        QuorumFailureReason::IdDrift
2599                            | QuorumFailureReason::Timeout
2600                            | QuorumFailureReason::InFlight
2601                    ),
2602                    "expected IdDrift / Timeout / InFlight, got {reason:?}"
2603                );
2604            }
2605            other => panic!("expected QuorumNotMet, got {other:?}"),
2606        }
2607    }
2608
2609    // -----------------------------------------------------------------
2610    // W9 (v0.6.3) — catchup_once + spawn_catchup_loop coverage.
2611    //
2612    // Lines 1406-1525 of `federation.rs` were uncovered through W3 because
2613    // they require a mock peer that serves `/api/v1/sync/since`, plus a
2614    // real `Db` to track the sync_state vector clock between ticks. We
2615    // reuse the existing in-process axum mock-peer pattern (see
2616    // `spawn_mock_peer` above) and a `:memory:` rusqlite handle.
2617    // -----------------------------------------------------------------
2618
2619    /// Behaviours the `/api/v1/sync/since` mock peer can take. Each variant
2620    /// is a single canned response shape — we don't need long-running
2621    /// stateful peers for catchup coverage because `catchup_once` is a
2622    /// one-shot function.
2623    #[derive(Clone)]
2624    enum SinceMockBehaviour {
2625        /// Return a 200 with `{ "memories": <list> }` on every call.
2626        ReturnMemories(Vec<Memory>),
2627        /// Return a 500 server error.
2628        Error500,
2629        /// Sleep `delay` then return memories (used for client-timeout test).
2630        Hang(Duration),
2631        /// Return 200 but with a non-JSON body so `resp.json()` fails.
2632        MalformedBody,
2633    }
2634
2635    #[derive(Clone)]
2636    struct SinceMockState {
2637        behaviour: SinceMockBehaviour,
2638        hits: Arc<AtomicUsize>,
2639        last_since: Arc<Mutex<Option<String>>>,
2640        last_peer: Arc<Mutex<Option<String>>>,
2641    }
2642
2643    async fn since_handler(
2644        axum::extract::Query(q): axum::extract::Query<std::collections::HashMap<String, String>>,
2645        axum::extract::State(state): axum::extract::State<SinceMockState>,
2646    ) -> axum::response::Response {
2647        use axum::response::IntoResponse;
2648        state.hits.fetch_add(1, Ordering::Relaxed);
2649        {
2650            let mut s = state.last_since.lock().await;
2651            *s = q.get("since").cloned();
2652        }
2653        {
2654            let mut p = state.last_peer.lock().await;
2655            *p = q.get("peer").cloned();
2656        }
2657        match &state.behaviour {
2658            SinceMockBehaviour::ReturnMemories(mems) => {
2659                let body = serde_json::json!({"memories": mems});
2660                (StatusCode::OK, AxumJson(body)).into_response()
2661            }
2662            SinceMockBehaviour::Error500 => (
2663                StatusCode::INTERNAL_SERVER_ERROR,
2664                AxumJson(serde_json::json!({"error":"oops"})),
2665            )
2666                .into_response(),
2667            SinceMockBehaviour::Hang(d) => {
2668                tokio::time::sleep(*d).await;
2669                (
2670                    StatusCode::OK,
2671                    AxumJson(serde_json::json!({"memories": []})),
2672                )
2673                    .into_response()
2674            }
2675            SinceMockBehaviour::MalformedBody => {
2676                // 200 OK but the body is not JSON — `resp.json::<Value>()`
2677                // will return an Err on the parse step.
2678                (
2679                    [(axum::http::header::CONTENT_TYPE, "application/json")],
2680                    "this is not json {{{",
2681                )
2682                    .into_response()
2683            }
2684        }
2685    }
2686
2687    /// Spawn a `/api/v1/sync/since` mock and return its base URL plus the
2688    /// hit-counter and last-query-param tracker.
2689    async fn spawn_since_peer(
2690        behaviour: SinceMockBehaviour,
2691    ) -> (
2692        String,
2693        Arc<AtomicUsize>,
2694        Arc<Mutex<Option<String>>>,
2695        Arc<Mutex<Option<String>>>,
2696    ) {
2697        let hits = Arc::new(AtomicUsize::new(0));
2698        let last_since = Arc::new(Mutex::new(None));
2699        let last_peer = Arc::new(Mutex::new(None));
2700        let state = SinceMockState {
2701            behaviour,
2702            hits: hits.clone(),
2703            last_since: last_since.clone(),
2704            last_peer: last_peer.clone(),
2705        };
2706        let app = Router::new()
2707            .route("/api/v1/sync/since", axum::routing::get(since_handler))
2708            .with_state(state);
2709        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2710        let addr = listener.local_addr().unwrap();
2711        tokio::spawn(async move {
2712            axum::serve(listener, app).await.ok();
2713        });
2714        (format!("http://{addr}"), hits, last_since, last_peer)
2715    }
2716
2717    /// Build an in-memory `Db` matching `handlers::Db` shape. Catchup only
2718    /// uses `lock().await.0` (the `Connection`), so the path / TTL / pragma
2719    /// fields can be defaults.
2720    fn build_test_db() -> crate::handlers::Db {
2721        let conn = crate::db::open(std::path::Path::new(":memory:")).unwrap();
2722        let path = std::path::PathBuf::from(":memory:");
2723        Arc::new(Mutex::new((
2724            conn,
2725            path,
2726            crate::config::ResolvedTtl::default(),
2727            true,
2728        )))
2729    }
2730
2731    /// Build a `FederationConfig` whose peer's `id` matches the segment we
2732    /// pull from sync_state — `peer-0`. This mirrors the production
2733    /// invariant: the catchup loop keys vector-clock entries by peer.id.
2734    /// We intentionally use the W9-shape (id = "peer-0") here rather than
2735    /// the W3-shape ("peer-0:<url>") because `catchup_once`'s url-trim path
2736    /// depends on the trailing `/api/v1/sync/push` and the id stays opaque
2737    /// either way — but the simpler shape is also closer to production.
2738    fn build_catchup_cfg(peer_url: &str, timeout_ms: u64) -> FederationConfig {
2739        let client = reqwest::Client::builder()
2740            .timeout(Duration::from_millis(timeout_ms))
2741            .build()
2742            .unwrap();
2743        FederationConfig {
2744            policy: QuorumPolicy::new(
2745                2,
2746                1,
2747                Duration::from_millis(timeout_ms),
2748                Duration::from_secs(30),
2749            )
2750            .unwrap(),
2751            peers: vec![PeerEndpoint {
2752                id: "peer-0".to_string(),
2753                sync_push_url: format!("{peer_url}/api/v1/sync/push"),
2754            }],
2755            client,
2756            sender_agent_id: "ai:catchup-test".to_string(),
2757        }
2758    }
2759
2760    /// Memory factory dedicated to catchup tests — every memory gets a
2761    /// unique title so `insert_if_newer`'s ON CONFLICT(title, namespace)
2762    /// path doesn't collapse them into one row. Timestamp is a fixed
2763    /// progression so the test asserts deterministic ordering.
2764    fn catchup_memory(title: &str, updated_at: &str) -> Memory {
2765        Memory {
2766            id: format!("cat-{title}"),
2767            tier: crate::models::Tier::Mid,
2768            namespace: "catchup".to_string(),
2769            title: title.to_string(),
2770            content: format!("content for {title}"),
2771            tags: vec!["catchup".to_string()],
2772            priority: 5,
2773            confidence: 1.0,
2774            // `validate_memory` enforces a source-allowlist (user, claude,
2775            // hook, api, cli, import, consolidation, system, chaos, notify).
2776            // Use "system" so catchup_once's `validate_memory(&mem).is_err()`
2777            // skip-branch isn't tripped — that's what we're trying NOT to
2778            // exercise in the happy-path tests below.
2779            source: "system".to_string(),
2780            access_count: 0,
2781            created_at: updated_at.to_string(),
2782            updated_at: updated_at.to_string(),
2783            last_accessed_at: None,
2784            expires_at: None,
2785            metadata: serde_json::json!({"agent_id":"ai:peer-0"}),
2786        }
2787    }
2788
2789    // ---- catchup_once: pulls `since`, advances state ----
2790
2791    #[tokio::test]
2792    async fn test_catchup_once_pulls_since_cursor_advances_state() {
2793        // First-time catchup with empty sync_state: we expect the request
2794        // to land WITHOUT a `since` query param, and after the call
2795        // sync_state should be advanced to the latest memory's timestamp.
2796        let mems = vec![
2797            catchup_memory("a", "2026-04-26T10:00:00Z"),
2798            catchup_memory("b", "2026-04-26T10:00:01Z"),
2799            catchup_memory("c", "2026-04-26T10:00:02Z"),
2800            catchup_memory("d", "2026-04-26T10:00:03Z"),
2801            catchup_memory("e", "2026-04-26T10:00:04Z"),
2802        ];
2803        let latest_ts = mems.last().unwrap().updated_at.clone();
2804        let (url, hits, last_since, last_peer) =
2805            spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems.clone())).await;
2806        let cfg = build_catchup_cfg(&url, 2000);
2807        let db = build_test_db();
2808
2809        catchup_once(&cfg, &db).await;
2810
2811        assert_eq!(hits.load(Ordering::Relaxed), 1, "peer hit exactly once");
2812        // First-time call → no `since` query param.
2813        assert!(
2814            last_since.lock().await.is_none(),
2815            "first catchup must omit since"
2816        );
2817        // Local agent id is forwarded.
2818        assert_eq!(last_peer.lock().await.as_deref(), Some("ai:catchup-test"));
2819        // sync_state advanced to the latest memory's timestamp.
2820        let lock = db.lock().await;
2821        let clock =
2822            crate::db::sync_state_load(&lock.0, "ai:catchup-test").expect("load sync state");
2823        assert_eq!(
2824            clock.entries.get("peer-0").map(String::as_str),
2825            Some(latest_ts.as_str()),
2826            "sync state advanced to latest pulled memory's updated_at"
2827        );
2828        // All 5 memories landed.
2829        let count: i64 = lock
2830            .0
2831            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2832            .unwrap();
2833        assert_eq!(count, 5, "all five memories inserted");
2834    }
2835
2836    // ---- catchup_once: empty array no-op ----
2837
2838    #[tokio::test]
2839    async fn test_catchup_once_no_new_memories_no_op() {
2840        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
2841        let cfg = build_catchup_cfg(&url, 2000);
2842        let db = build_test_db();
2843
2844        catchup_once(&cfg, &db).await;
2845
2846        assert_eq!(hits.load(Ordering::Relaxed), 1);
2847        let lock = db.lock().await;
2848        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2849        assert!(
2850            clock.entries.get("peer-0").is_none(),
2851            "empty response must not advance sync_state"
2852        );
2853        let count: i64 = lock
2854            .0
2855            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2856            .unwrap();
2857        assert_eq!(count, 0);
2858    }
2859
2860    // ---- catchup_once: 5xx error swallowed, state untouched ----
2861
2862    #[tokio::test]
2863    async fn test_catchup_once_peer_500_error_logged_no_panic() {
2864        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::Error500).await;
2865        let cfg = build_catchup_cfg(&url, 2000);
2866        let db = build_test_db();
2867
2868        // Must NOT panic. The function logs at debug! and continues.
2869        catchup_once(&cfg, &db).await;
2870
2871        assert_eq!(hits.load(Ordering::Relaxed), 1);
2872        let lock = db.lock().await;
2873        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2874        assert!(
2875            clock.entries.get("peer-0").is_none(),
2876            "500 must not advance sync state"
2877        );
2878    }
2879
2880    // ---- catchup_once: timeout swallowed ----
2881
2882    #[tokio::test]
2883    async fn test_catchup_once_peer_timeout_handled() {
2884        // Mock hangs for 2s, client timeout is 200ms → reqwest returns Err,
2885        // catchup logs at debug! and skips this peer.
2886        let (url, hits, _, _) =
2887            spawn_since_peer(SinceMockBehaviour::Hang(Duration::from_secs(2))).await;
2888        let cfg = build_catchup_cfg(&url, 200);
2889        let db = build_test_db();
2890
2891        let start = Instant::now();
2892        catchup_once(&cfg, &db).await;
2893        let elapsed = start.elapsed();
2894
2895        // Must return promptly after the client-timeout fires, not after
2896        // the full 2s mock-side hang.
2897        assert!(
2898            elapsed < Duration::from_millis(1500),
2899            "catchup_once should honour the client timeout, took {elapsed:?}"
2900        );
2901        assert_eq!(hits.load(Ordering::Relaxed), 1, "request was sent");
2902        let lock = db.lock().await;
2903        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2904        assert!(clock.entries.get("peer-0").is_none());
2905    }
2906
2907    // ---- catchup_once: malformed JSON body ----
2908
2909    #[tokio::test]
2910    async fn test_catchup_once_malformed_response_handled() {
2911        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::MalformedBody).await;
2912        let cfg = build_catchup_cfg(&url, 2000);
2913        let db = build_test_db();
2914
2915        // No panic — the function `tracing::warn!`s and skips the peer.
2916        catchup_once(&cfg, &db).await;
2917
2918        assert_eq!(hits.load(Ordering::Relaxed), 1);
2919        let lock = db.lock().await;
2920        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2921        assert!(
2922            clock.entries.get("peer-0").is_none(),
2923            "malformed body must not advance sync state"
2924        );
2925    }
2926
2927    // ---- catchup_once: only newer memories overwrite local ----
2928
2929    #[tokio::test]
2930    async fn test_catchup_once_inserts_only_newer_memories() {
2931        // Pre-seed local DB with a memory titled "shared" at t=10:00:01.
2932        // Mock peer returns:
2933        //   - "shared" at t=10:00:00  (older — must NOT clobber local)
2934        //   - "fresh"  at t=10:00:02  (new title — must insert)
2935        let db = build_test_db();
2936        {
2937            let lock = db.lock().await;
2938            let local = catchup_memory("shared", "2026-04-26T10:00:01Z");
2939            // Insert via the test path — this is the "we already have it
2940            // locally at a newer timestamp" precondition.
2941            crate::db::insert_if_newer(&lock.0, &local).unwrap();
2942            // Confirm pre-state.
2943            let cnt: i64 = lock
2944                .0
2945                .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2946                .unwrap();
2947            assert_eq!(cnt, 1, "pre-seeded shared row");
2948        }
2949
2950        let mut stale_shared = catchup_memory("shared", "2026-04-26T10:00:00Z");
2951        // Distinct content so the "did the older catchup body win?" assertion
2952        // is meaningful — base catchup_memory derives content from title.
2953        stale_shared.content = "stale-from-catchup-peer".to_string();
2954        stale_shared.id = "cat-shared-OLD".to_string();
2955        let stale_shared_content = stale_shared.content.clone();
2956        let new_fresh = catchup_memory("fresh", "2026-04-26T10:00:02Z");
2957        let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![
2958            stale_shared,
2959            new_fresh,
2960        ]))
2961        .await;
2962        let cfg = build_catchup_cfg(&url, 2000);
2963
2964        catchup_once(&cfg, &db).await;
2965
2966        let lock = db.lock().await;
2967        // Both rows now exist.
2968        let cnt: i64 = lock
2969            .0
2970            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2971            .unwrap();
2972        assert_eq!(cnt, 2, "fresh row inserted, shared kept");
2973        // The "shared" row's content must still be the locally-seeded
2974        // version (older catchup body did NOT win).
2975        let shared_content: String = lock
2976            .0
2977            .query_row(
2978                "SELECT content FROM memories WHERE title = 'shared' AND namespace = 'catchup'",
2979                [],
2980                |r| r.get(0),
2981            )
2982            .unwrap();
2983        assert_ne!(
2984            shared_content, stale_shared_content,
2985            "older catchup memory must NOT overwrite newer local row"
2986        );
2987        // sync_state advanced to the LATEST timestamp seen, not to the
2988        // one we actually applied — function tracks `latest_ts` over the
2989        // whole batch.
2990        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2991        assert_eq!(
2992            clock.entries.get("peer-0").map(String::as_str),
2993            Some("2026-04-26T10:00:02Z"),
2994        );
2995    }
2996
2997    // ---- spawn_catchup_loop: runs at interval (paused-time) ----
2998
2999    #[tokio::test(start_paused = true)]
3000    async fn test_spawn_catchup_loop_runs_at_interval() {
3001        // The loop sleeps 5s up-front then ticks every `interval`. With
3002        // paused time, advance past the initial sleep and one full tick
3003        // and assert the mock saw at least one hit.
3004        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
3005        let cfg = build_catchup_cfg(&url, 5000);
3006        let db = build_test_db();
3007
3008        let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(60));
3009
3010        // Advance past the 5s startup delay + give the first catchup_once
3011        // a slice of real wall-clock to actually execute the network call.
3012        // Paused time still yields() between awaits; the network IO is
3013        // not virtualized — so we step in chunks separated by yields.
3014        for _ in 0..6 {
3015            tokio::time::advance(Duration::from_secs(1)).await;
3016            tokio::task::yield_now().await;
3017        }
3018        // Allow the spawned reqwest::send to actually complete on the
3019        // real runtime — a small real-time wait covers in-process axum
3020        // round-trip latency without paused-time interference.
3021        for _ in 0..50 {
3022            if hits.load(Ordering::Relaxed) >= 1 {
3023                break;
3024            }
3025            tokio::task::yield_now().await;
3026            tokio::time::advance(Duration::from_millis(10)).await;
3027        }
3028
3029        assert!(
3030            hits.load(Ordering::Relaxed) >= 1,
3031            "first catchup tick must hit the mock peer (got {})",
3032            hits.load(Ordering::Relaxed),
3033        );
3034
3035        handle.abort();
3036    }
3037
3038    // ---- spawn_catchup_loop: aborts cleanly on handle drop ----
3039
3040    #[tokio::test]
3041    async fn test_spawn_catchup_loop_aborts_cleanly_on_handle_drop() {
3042        // Drop the JoinHandle (via abort) and confirm the task ends quickly
3043        // — no lingering tasks, no panics from being killed mid-tick.
3044        let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
3045        let cfg = build_catchup_cfg(&url, 2000);
3046        let db = build_test_db();
3047
3048        let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(3600));
3049        // Don't let it run a full 5s startup-sleep. Abort and confirm
3050        // the join future resolves promptly with a Cancelled error.
3051        handle.abort();
3052        let result = tokio::time::timeout(Duration::from_millis(500), handle).await;
3053        let join = result.expect("aborted handle must resolve within 500ms");
3054        assert!(
3055            join.is_err() && join.unwrap_err().is_cancelled(),
3056            "handle.abort() must surface as is_cancelled() == true"
3057        );
3058    }
3059
3060    // ---- mTLS client-cert flow: build_config happy path ----
3061
3062    #[test]
3063    fn test_build_config_mtls_with_valid_files() {
3064        // Use the existing rcgen-generated test fixtures (PEM cert +
3065        // PKCS#8 key). The build path concatenates them into one PEM
3066        // and feeds that to `reqwest::Identity::from_pem`. We only need
3067        // to assert the client builds — TLS handshake itself isn't part
3068        // of this path's contract.
3069        let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3070            .join("tests/fixtures/tls/valid_cert.pem");
3071        let key = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3072            .join("tests/fixtures/tls/valid_key_pkcs8.pem");
3073        // Sanity: fixtures exist on disk.
3074        assert!(cert.exists(), "missing test fixture: {cert:?}");
3075        assert!(key.exists(), "missing test fixture: {key:?}");
3076
3077        let result = FederationConfig::build(
3078            2,
3079            &["http://peer.example".to_string()],
3080            Duration::from_millis(500),
3081            Some(&cert),
3082            Some(&key),
3083            None,
3084            "ai:builder".to_string(),
3085        );
3086        let cfg = match result {
3087            Ok(Some(c)) => c,
3088            Ok(None) => panic!("expected Some(FederationConfig), got None"),
3089            Err(e) => panic!("expected Ok, got Err: {e}"),
3090        };
3091        assert_eq!(cfg.peer_count(), 1);
3092    }
3093
3094    // ---- mTLS client-cert flow: missing key file errors ----
3095
3096    #[test]
3097    fn test_build_config_mtls_with_missing_files_returns_error() {
3098        // Cert path exists, key path doesn't → the second `read` errors
3099        // with "read --client-key:". This exercises the second arm of
3100        // the `(Some(cert), Some(key))` branch that the existing
3101        // `config_build_rejects_missing_client_cert_path` test (which
3102        // makes BOTH paths missing) doesn't reach.
3103        let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3104            .join("tests/fixtures/tls/valid_cert.pem");
3105        let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
3106        assert!(cert.exists(), "missing test fixture: {cert:?}");
3107
3108        let result = FederationConfig::build(
3109            2,
3110            &["http://peer.example".to_string()],
3111            Duration::from_millis(500),
3112            Some(&cert),
3113            Some(&bogus_key),
3114            None,
3115            "ai:builder".to_string(),
3116        );
3117        let err = match result {
3118            Ok(_) => panic!("expected client-key read error"),
3119            Err(e) => e,
3120        };
3121        let msg = format!("{err}");
3122        assert!(
3123            msg.contains("read --client-key"),
3124            "expected client-key read error, got {msg:?}"
3125        );
3126    }
3127
3128    // -----------------------------------------------------------------
3129    // W12-G (v0.6.3) — federation.rs remaining edges (89.87% → 94%+).
3130    //
3131    // Targets the residual uncovered surface after W3 + W9 F9:
3132    //   - post_and_classify direct: persistent retry-fail and id-drift
3133    //     skip-retry paths.
3134    //   - bulk_catchup_push edge cases not previously reached
3135    //     (no-peers shortcut, mixed pass+fail outcomes).
3136    //   - Quorum-policy edges: W=1 single-peer-ack already returns,
3137    //     QuorumPolicy::majority convenience constructor, FederationConfig
3138    //     duplicate detection on trailing-slash and case differences.
3139    //   - Each broadcast_*_quorum has only the all-Ack and all-Fail
3140    //     paths — exercise the `Hang` (timeout-mid-loop) classification
3141    //     for the remaining variants so the inner `Ok(None) | Err(_)`
3142    //     break arm is hit on every flavour.
3143    //   - catchup_once: 5xx classified as "Ok(r) where !success" arm
3144    //     (F9 covers it once but with peer.id == "peer-0"; the
3145    //     ServerError + non-empty body path is already covered).
3146    //     New: peer URL whose `sync_push_url` does NOT carry the
3147    //     `/api/v1/sync/push` suffix — the trim_end_matches no-ops
3148    //     and the `since` URL is built from the raw base.
3149    //   - QuorumNotMetPayload: `from_err` on a peer-acks-empty result
3150    //     after the deadline (Unreachable variant via real broadcast).
3151    //
3152    // All tests reuse the in-process axum mock-peer infrastructure
3153    // (`spawn_mock_peer`, `spawn_since_peer`) and do not require disk.
3154    // -----------------------------------------------------------------
3155
3156    /// W12-G #1: `post_and_classify` returns `Fail` after retry also fails,
3157    /// and the failure string carries BOTH attempts' reasons (`first:` /
3158    /// `retry:` prefixes). Hits the `Fail(format!("first: {}; retry: {}"))`
3159    /// arm at lines ~437-440 directly — the outer broadcast tests only
3160    /// assert that quorum-not-met surfaces, not the format of the error.
3161    #[tokio::test]
3162    async fn post_and_classify_persistent_fail_concatenates_both_reasons() {
3163        let (url, count) = spawn_mock_peer(MockBehaviour::Fail).await;
3164        let client = reqwest::Client::builder()
3165            .timeout(Duration::from_millis(2000))
3166            .build()
3167            .unwrap();
3168        let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
3169        let target = format!("{url}/api/v1/sync/push");
3170
3171        let outcome = post_and_classify(&client, &target, &body, "mem-x", Some("mem-x")).await;
3172        match outcome {
3173            AckOutcome::Fail(reason) => {
3174                assert!(
3175                    reason.contains("first:") && reason.contains("retry:"),
3176                    "expected both attempts in reason, got {reason:?}"
3177                );
3178                // 5xx → both attempts should have classified as `http 500`.
3179                assert!(
3180                    reason.contains("http 500"),
3181                    "expected 5xx in reason, got {reason:?}"
3182                );
3183            }
3184            other => panic!("expected AckOutcome::Fail, got {other:?}"),
3185        }
3186        assert_eq!(
3187            count.load(Ordering::Relaxed),
3188            2,
3189            "first attempt + one retry = exactly two POSTs"
3190        );
3191    }
3192
3193    /// W12-G #2: `post_and_classify` does NOT retry on `IdDrift`. A peer
3194    /// that semantically disagrees on the id is not a transient failure;
3195    /// retrying would just observe the same disagreement. Hits the
3196    /// outer-match `IdDrift => IdDrift` arm at line ~410 (no inner retry
3197    /// dispatch) — distinct from the `Fail` arm that performs the retry.
3198    #[tokio::test]
3199    async fn post_and_classify_id_drift_does_not_retry() {
3200        // Hand-rolled mock that always 200's with a divergent id.
3201        let count = Arc::new(AtomicUsize::new(0));
3202        let cnt_clone = count.clone();
3203        let app = Router::new().route(
3204            "/api/v1/sync/push",
3205            post(move |AxumJson(_b): AxumJson<serde_json::Value>| {
3206                let c = cnt_clone.clone();
3207                async move {
3208                    c.fetch_add(1, Ordering::Relaxed);
3209                    (
3210                        StatusCode::OK,
3211                        AxumJson(serde_json::json!({"ids":["other-id"],"applied":1})),
3212                    )
3213                }
3214            }),
3215        );
3216        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
3217        let addr = listener.local_addr().unwrap();
3218        tokio::spawn(async move {
3219            axum::serve(listener, app).await.ok();
3220        });
3221        let url = format!("http://{addr}/api/v1/sync/push");
3222
3223        let client = reqwest::Client::builder()
3224            .timeout(Duration::from_millis(2000))
3225            .build()
3226            .unwrap();
3227        let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
3228        let outcome = post_and_classify(&client, &url, &body, "mem-x", Some("mem-x")).await;
3229        assert!(
3230            matches!(outcome, AckOutcome::IdDrift),
3231            "expected IdDrift, got {outcome:?}"
3232        );
3233        assert_eq!(
3234            count.load(Ordering::Relaxed),
3235            1,
3236            "IdDrift must NOT trigger the retry path (only one POST)"
3237        );
3238    }
3239
3240    /// W12-G #3: `bulk_catchup_push` with no peers returns immediately
3241    /// without spawning. Hits the `if memories.is_empty() || config.peers
3242    /// .is_empty()` shortcut — the existing
3243    /// `bulk_catchup_push_empty_inputs_are_noop` covers `memories.is_empty()`
3244    /// only.
3245    #[tokio::test]
3246    async fn bulk_catchup_push_no_peers_is_noop() {
3247        let client = reqwest::Client::builder()
3248            .timeout(Duration::from_millis(500))
3249            .build()
3250            .unwrap();
3251        let cfg = FederationConfig {
3252            policy: QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30))
3253                .unwrap(),
3254            peers: Vec::new(),
3255            client,
3256            sender_agent_id: "ai:no-peers".to_string(),
3257        };
3258        // Non-empty memories list — the shortcut should still fire because
3259        // the peer list is empty.
3260        let mems = vec![sample_memory()];
3261        let errors = bulk_catchup_push(&cfg, &mems).await;
3262        assert!(
3263            errors.is_empty(),
3264            "no-peers catchup must return empty error vec immediately, got {errors:?}"
3265        );
3266    }
3267
3268    /// W12-G #4: `bulk_catchup_push` with mixed peer outcomes (one Ack,
3269    /// one Fail). The Ack peer must NOT appear in the error vec; the
3270    /// Fail peer MUST appear with its `peer.id` and an http-500 reason.
3271    /// Validates the per-peer error propagation more precisely than the
3272    /// existing `bulk_catchup_push_reports_peer_failures` — that test
3273    /// uses two failing peers.
3274    #[tokio::test]
3275    async fn bulk_catchup_push_mixed_outcomes_only_failing_peer_in_errors() {
3276        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
3277        let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
3278        let cfg = build_config(vec![url1, url2], 2, 2000);
3279        let mems = vec![sample_memory()];
3280        let errors = bulk_catchup_push(&cfg, &mems).await;
3281        assert_eq!(
3282            errors.len(),
3283            1,
3284            "exactly one failing peer should be in errors, got {errors:?}"
3285        );
3286        let (peer_id, reason) = &errors[0];
3287        // build_config assigns `peer-0:<url>` and `peer-1:<url>`. The
3288        // failing peer is the second one we registered.
3289        assert!(
3290            peer_id.starts_with("peer-1"),
3291            "failing peer should be peer-1, got {peer_id}"
3292        );
3293        assert!(
3294            reason.contains("http 500"),
3295            "expected http 500 reason, got {reason}"
3296        );
3297        // Both peers were called regardless.
3298        assert_eq!(count1.load(Ordering::Relaxed), 1);
3299        assert_eq!(count2.load(Ordering::Relaxed), 1);
3300    }
3301
3302    /// W12-G #5: W=1 quorum is met by the local commit alone — no peer
3303    /// ack needed. Even when every peer fails, the broadcast still
3304    /// returns Ok and `finalise_quorum` returns `Ok(1)`. Exercises the
3305    /// `is_quorum_met` early-exit path with `acks.len() == 0`.
3306    #[tokio::test]
3307    async fn quorum_w1_local_commit_alone_is_sufficient() {
3308        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
3309        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
3310        // W=1, N=3 — local commit is enough on its own.
3311        let cfg = build_config(vec![url1, url2], 1, 1000);
3312        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
3313            .await
3314            .unwrap();
3315        let count = finalise_quorum(&tracker).expect("W=1 must succeed on local commit alone");
3316        assert_eq!(count, 1, "W=1 quorum returns local-only count");
3317    }
3318
3319    /// W12-G #6: `QuorumPolicy::majority` builds the convenience config
3320    /// with `W = ceil((N+1)/2)`. N=3 → W=2; N=5 → W=3. The existing
3321    /// suite uses `QuorumPolicy::new` directly everywhere — `majority`
3322    /// goes uncovered.
3323    #[test]
3324    fn quorum_policy_majority_builds_with_ceil_n_plus_1_div_2() {
3325        let p3 = QuorumPolicy::majority(3).expect("N=3 majority builds");
3326        // public field for tests: re-derive via finalise round-trip if
3327        // the internal `w` is private. Instead use a lightweight
3328        // tracker-based check.
3329        let mut t = AckTracker::new(p3, Instant::now());
3330        t.record_local();
3331        // With W=2, local-only is NOT yet quorum.
3332        assert!(
3333            !t.is_quorum_met(Instant::now()),
3334            "majority-of-3 needs more than local"
3335        );
3336        t.record_peer_ack("peer-a");
3337        assert!(
3338            t.is_quorum_met(Instant::now()),
3339            "local + 1 peer ack = 2 = majority of 3"
3340        );
3341
3342        let p5 = QuorumPolicy::majority(5).expect("N=5 majority builds");
3343        let mut t5 = AckTracker::new(p5, Instant::now());
3344        t5.record_local();
3345        t5.record_peer_ack("a");
3346        assert!(
3347            !t5.is_quorum_met(Instant::now()),
3348            "majority-of-5 needs 3 acks"
3349        );
3350        t5.record_peer_ack("b");
3351        assert!(t5.is_quorum_met(Instant::now()), "local + 2 peers = 3");
3352    }
3353
3354    /// W12-G #7: `QuorumPolicy::majority(0)` rejects with InvalidPolicy.
3355    /// Hits the `n == 0` guard via the convenience constructor (the
3356    /// existing `quorum_not_met_payload_invalid_policy_branch` builds
3357    /// the error directly without going through `QuorumPolicy::new`).
3358    #[test]
3359    fn quorum_policy_majority_rejects_zero() {
3360        let err = QuorumPolicy::majority(0).expect_err("n=0 must be rejected");
3361        match err {
3362            QuorumError::InvalidPolicy { detail } => {
3363                assert!(
3364                    detail.contains("n must be"),
3365                    "expected n>=1 message, got {detail}"
3366                );
3367            }
3368            other => panic!("expected InvalidPolicy, got {other:?}"),
3369        }
3370    }
3371
3372    /// W12-G #8: `FederationConfig::build` rejects duplicate peers
3373    /// where the URLs differ only in trailing-slash. Existing test
3374    /// (`config_build_rejects_duplicate_peer_urls`) uses identical
3375    /// strings; this exercises the normalization branch
3376    /// (`trim_end_matches('/').to_ascii_lowercase()`).
3377    #[test]
3378    fn config_build_rejects_duplicate_peers_differing_only_in_trailing_slash() {
3379        let result = FederationConfig::build(
3380            2,
3381            &[
3382                "http://peer.example".to_string(),
3383                "http://peer.example/".to_string(),
3384            ],
3385            Duration::from_millis(500),
3386            None,
3387            None,
3388            None,
3389            "ai:dup-test".to_string(),
3390        );
3391        let err = match result {
3392            Ok(_) => panic!("trailing-slash dup must be rejected"),
3393            Err(e) => e,
3394        };
3395        let msg = format!("{err}");
3396        assert!(
3397            msg.contains("duplicate peer URL"),
3398            "expected duplicate-peer error, got {msg}"
3399        );
3400    }
3401
3402    /// W12-G #9: `FederationConfig::build` rejects duplicate peers where
3403    /// the URLs differ only in scheme/host casing. Mirrors the
3404    /// `to_ascii_lowercase` half of the normalization.
3405    #[test]
3406    fn config_build_rejects_duplicate_peers_differing_only_in_case() {
3407        let result = FederationConfig::build(
3408            2,
3409            &[
3410                "http://Peer.Example".to_string(),
3411                "http://peer.example".to_string(),
3412            ],
3413            Duration::from_millis(500),
3414            None,
3415            None,
3416            None,
3417            "ai:dup-case-test".to_string(),
3418        );
3419        let err = match result {
3420            Ok(_) => panic!("case-only dup must be rejected"),
3421            Err(e) => e,
3422        };
3423        let msg = format!("{err}");
3424        assert!(
3425            msg.contains("duplicate peer URL"),
3426            "expected duplicate-peer error, got {msg}"
3427        );
3428    }
3429
3430    /// W12-G #10: archive_quorum classifies a hanging peer as
3431    /// non-acking — the existing tests for archive_quorum use Ack and
3432    /// Fail only. With Hang behaviour and a tight 200ms timeout, the
3433    /// `Ok(None) | Err(_) => break` arm fires in the inner timeout
3434    /// match. (Ditto for restore/link/consolidate — covered together
3435    /// via a sweep below to keep this test focused.)
3436    #[tokio::test]
3437    async fn archive_quorum_hanging_peer_times_out_to_break_arm() {
3438        let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
3439        let (url2, _) = spawn_mock_peer(MockBehaviour::Hang).await;
3440        // W=2 with two hanging peers + 200ms timeout. The local commit
3441        // is the only source of acks; quorum cannot be met.
3442        let cfg = build_config(vec![url1, url2], 2, 200);
3443        let start = Instant::now();
3444        let tracker = broadcast_archive_quorum(&cfg, "mem-arch-id").await.unwrap();
3445        let elapsed = start.elapsed();
3446        // Loop must give up at the deadline, not hang for the full 10s
3447        // peer sleep.
3448        assert!(
3449            elapsed < Duration::from_secs(2),
3450            "archive_quorum must exit at deadline, took {elapsed:?}"
3451        );
3452        let err = finalise_quorum(&tracker).unwrap_err();
3453        assert!(
3454            matches!(err, QuorumError::QuorumNotMet { .. }),
3455            "expected QuorumNotMet, got {err:?}"
3456        );
3457    }
3458
3459    /// W12-G #11: `QuorumNotMetPayload::from_err` round-trip on a real
3460    /// `Unreachable` outcome from the broadcast loop. Existing direct
3461    /// tests build the QuorumError by hand; this end-to-end path has
3462    /// the broadcast actually classify the failure reason.
3463    #[tokio::test]
3464    async fn quorum_not_met_payload_unreachable_round_trip_from_broadcast() {
3465        // Two peers both Fail (not Hang) — we want the deadline to
3466        // elapse with zero peer acks. The broadcast finalises with
3467        // `Unreachable` because acks.is_empty() AND past deadline.
3468        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
3469        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
3470        // Tight timeout so the deadline beats the 250ms backoff retry.
3471        let cfg = build_config(vec![url1, url2], 2, 100);
3472        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
3473            .await
3474            .unwrap();
3475        // Wait past the deadline before finalising — this guarantees
3476        // `now > deadline` in finalise() so the Unreachable branch is
3477        // selected (rather than InFlight).
3478        tokio::time::sleep(Duration::from_millis(150)).await;
3479        let err = finalise_quorum(&tracker).unwrap_err();
3480        let payload = QuorumNotMetPayload::from_err(&err);
3481        assert_eq!(payload.error, "quorum_not_met");
3482        assert_eq!(payload.got, 1, "only local commit");
3483        assert_eq!(payload.needed, 2);
3484        assert!(
3485            payload.reason == "unreachable" || payload.reason == "timeout",
3486            "expected unreachable/timeout, got {}",
3487            payload.reason
3488        );
3489    }
3490
3491    /// W12-G #12: `catchup_once` against a peer with an unusual base URL
3492    /// (no `/api/v1/sync/push` suffix) — `trim_end_matches` no-ops, so
3493    /// the constructed `since` URL appends `/api/v1/sync/since` to the
3494    /// raw base. Exercises the trim-noop branch at the start of
3495    /// catchup_once.
3496    #[tokio::test]
3497    async fn catchup_once_peer_url_without_push_suffix_still_builds_since() {
3498        let (url, hits, _, last_peer) =
3499            spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
3500        // Build a config whose peer.sync_push_url does NOT end in
3501        // `/api/v1/sync/push`. The trim_end_matches in catchup_once is
3502        // a no-op for this shape, so the base URL is the raw `url`.
3503        let client = reqwest::Client::builder()
3504            .timeout(Duration::from_millis(2000))
3505            .build()
3506            .unwrap();
3507        let cfg = FederationConfig {
3508            policy: QuorumPolicy::new(2, 1, Duration::from_millis(2000), Duration::from_secs(30))
3509                .unwrap(),
3510            peers: vec![PeerEndpoint {
3511                id: "peer-0".to_string(),
3512                // No /api/v1/sync/push suffix — verifies the trim is
3513                // tolerant of unexpected shapes.
3514                sync_push_url: url.clone(),
3515            }],
3516            client,
3517            sender_agent_id: "ai:no-suffix".to_string(),
3518        };
3519        let db = build_test_db();
3520        catchup_once(&cfg, &db).await;
3521        // The mock saw a hit at /api/v1/sync/since with the local agent id.
3522        assert_eq!(hits.load(Ordering::Relaxed), 1);
3523        assert_eq!(
3524            last_peer.lock().await.as_deref(),
3525            Some("ai:no-suffix"),
3526            "local agent id should be forwarded as ?peer="
3527        );
3528    }
3529
3530    /// W12-G #13: `catchup_once` skips memories that fail
3531    /// `validate_memory` (e.g. invalid `source` enum). The valid memory
3532    /// IS applied; sync_state advances to the latest TS seen. Exercises
3533    /// the `if crate::validate::validate_memory(&mem).is_err() { continue; }`
3534    /// branch which the F9 happy-path tests don't trigger.
3535    #[tokio::test]
3536    async fn catchup_once_skips_invalid_memory_but_applies_valid_neighbour() {
3537        // valid memory uses source="system" (whitelisted by validate_memory).
3538        let valid = catchup_memory("ok-mem", "2026-04-26T10:00:00Z");
3539        // invalid memory has source not in the allowlist (validate fails).
3540        let mut bad = catchup_memory("bad-source", "2026-04-26T10:00:01Z");
3541        bad.source = "made-up-source-not-in-allowlist".to_string();
3542        let mems = vec![valid.clone(), bad];
3543
3544        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
3545        let cfg = build_catchup_cfg(&url, 2000);
3546        let db = build_test_db();
3547        catchup_once(&cfg, &db).await;
3548
3549        assert_eq!(hits.load(Ordering::Relaxed), 1);
3550        let lock = db.lock().await;
3551        // Only the valid memory was inserted.
3552        let count: i64 = lock
3553            .0
3554            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
3555            .unwrap();
3556        assert_eq!(count, 1, "only the valid memory should land");
3557        let title: String = lock
3558            .0
3559            .query_row(
3560                "SELECT title FROM memories WHERE namespace='catchup' LIMIT 1",
3561                [],
3562                |r| r.get(0),
3563            )
3564            .unwrap();
3565        assert_eq!(title, "ok-mem");
3566        // sync_state advanced to the latest TS of the APPLIED rows
3567        // only — the validate-fail `continue` happens before the
3568        // `latest_ts` bump, so the invalid 10:00:01 row does NOT
3569        // contribute. Net: latest_ts == valid memory's timestamp.
3570        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
3571        assert_eq!(
3572            clock.entries.get("peer-0").map(String::as_str),
3573            Some("2026-04-26T10:00:00Z"),
3574            "sync_state tracks latest_ts of validate-passing rows"
3575        );
3576    }
3577
3578    /// W12-G #14: `AckTracker::record_peer_ack` is idempotent — recording
3579    /// the same peer id twice does not double-count. Exercised
3580    /// indirectly by the broadcast layer (the tracker is a HashSet under
3581    /// the hood) but never asserted directly.
3582    #[test]
3583    fn ack_tracker_record_peer_ack_is_idempotent() {
3584        let policy = QuorumPolicy::new(3, 2, Duration::from_secs(1), Duration::from_secs(30))
3585            .expect("policy");
3586        let mut t = AckTracker::new(policy, Instant::now());
3587        t.record_local();
3588        t.record_peer_ack("peer-a");
3589        t.record_peer_ack("peer-a"); // dup — must dedupe
3590        // 2 acks (local + 1 distinct peer) = 2 = W → quorum met.
3591        assert!(t.is_quorum_met(Instant::now()));
3592        // Adding a third distinct peer does not regress quorum.
3593        t.record_peer_ack("peer-b");
3594        assert!(t.is_quorum_met(Instant::now()));
3595    }
3596
3597    /// W12-G #15a: `catchup_once` against a peer whose 200 body lacks
3598    /// a `memories` key — `body.get("memories")` returns None and the
3599    /// loop `continue`s without applying anything or advancing
3600    /// sync_state. Hits the `None => continue` arm at line ~1478
3601    /// (the existing F9 tests always include the `memories` array).
3602    #[tokio::test]
3603    async fn catchup_once_body_without_memories_key_is_skipped() {
3604        // Hand-rolled handler returning `{"applied": 0}` (no memories key).
3605        let app = Router::new().route(
3606            "/api/v1/sync/since",
3607            axum::routing::get(|| async {
3608                (
3609                    StatusCode::OK,
3610                    AxumJson(serde_json::json!({"applied":0,"note":"empty cluster"})),
3611                )
3612            }),
3613        );
3614        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
3615        let addr = listener.local_addr().unwrap();
3616        tokio::spawn(async move {
3617            axum::serve(listener, app).await.ok();
3618        });
3619        let url = format!("http://{addr}");
3620        let cfg = build_catchup_cfg(&url, 2000);
3621        let db = build_test_db();
3622        catchup_once(&cfg, &db).await;
3623        let lock = db.lock().await;
3624        let count: i64 = lock
3625            .0
3626            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
3627            .unwrap();
3628        assert_eq!(count, 0, "no memories key → no inserts");
3629        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
3630        assert!(
3631            clock.entries.get("peer-0").is_none(),
3632            "no memories key → sync_state untouched"
3633        );
3634    }
3635
3636    /// W12-G #15b: `catchup_once` against a peer that returns a 200 with
3637    /// a `memories` array containing an unparseable element. The
3638    /// individual element is skipped (`serde_json::from_value` Err) and
3639    /// the rest of the batch is applied. Hits lines 1492-1494.
3640    #[tokio::test]
3641    async fn catchup_once_unparseable_individual_memory_is_skipped() {
3642        // `memories[0]` is a valid Memory, `memories[1]` is a JSON object
3643        // with the wrong shape (missing required fields).
3644        let valid_mem = serde_json::to_value(catchup_memory("ok", "2026-04-26T10:00:00Z")).unwrap();
3645        let bad_mem = serde_json::json!({"id":"oops","not_a_memory_field": true});
3646        let app = Router::new().route(
3647            "/api/v1/sync/since",
3648            axum::routing::get(move || {
3649                let valid = valid_mem.clone();
3650                let bad = bad_mem.clone();
3651                async move {
3652                    (
3653                        StatusCode::OK,
3654                        AxumJson(serde_json::json!({"memories": [valid, bad]})),
3655                    )
3656                }
3657            }),
3658        );
3659        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
3660        let addr = listener.local_addr().unwrap();
3661        tokio::spawn(async move {
3662            axum::serve(listener, app).await.ok();
3663        });
3664        let url = format!("http://{addr}");
3665        let cfg = build_catchup_cfg(&url, 2000);
3666        let db = build_test_db();
3667        catchup_once(&cfg, &db).await;
3668        let lock = db.lock().await;
3669        // Only the parseable memory landed.
3670        let count: i64 = lock
3671            .0
3672            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
3673            .unwrap();
3674        assert_eq!(count, 1, "only parseable memory inserted");
3675    }
3676
3677    /// W12-G #16: id-drift on `broadcast_delete_quorum` exercises the
3678    /// `IdDrift => record_id_drift` arm at line ~591 (the existing
3679    /// `id_drift_peer_does_not_count_as_ack` only hits the store path).
3680    #[tokio::test]
3681    async fn delete_quorum_id_drift_peer_records_drift_not_ack() {
3682        let url1 = spawn_id_drift_peer().await;
3683        let url2 = spawn_id_drift_peer().await;
3684        let cfg = build_config(vec![url1, url2], 2, 1000);
3685        let tracker = broadcast_delete_quorum(&cfg, "mem-del-x").await.unwrap();
3686        // local + 0 peer acks = 1 < W=2 → not met.
3687        let err = finalise_quorum(&tracker).unwrap_err();
3688        assert!(
3689            matches!(err, QuorumError::QuorumNotMet { got: 1, .. }),
3690            "expected QuorumNotMet got=1, got {err:?}"
3691        );
3692        // Both peers reported drift.
3693        assert_eq!(
3694            tracker.id_drift_count(),
3695            2,
3696            "both peers should be recorded as drift"
3697        );
3698    }
3699
3700    /// W12-G #17: id-drift on `broadcast_archive_quorum` exercises the
3701    /// IdDrift arm at line ~679.
3702    #[tokio::test]
3703    async fn archive_quorum_id_drift_peer_records_drift_not_ack() {
3704        let url1 = spawn_id_drift_peer().await;
3705        let cfg = build_config(vec![url1], 2, 1000);
3706        let tracker = broadcast_archive_quorum(&cfg, "mem-arch-x").await.unwrap();
3707        let err = finalise_quorum(&tracker).unwrap_err();
3708        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3709        assert_eq!(tracker.id_drift_count(), 1);
3710    }
3711
3712    /// W12-G #18: id-drift on `broadcast_restore_quorum` exercises the
3713    /// IdDrift arm at line ~768.
3714    #[tokio::test]
3715    async fn restore_quorum_id_drift_peer_records_drift_not_ack() {
3716        let url1 = spawn_id_drift_peer().await;
3717        let cfg = build_config(vec![url1], 2, 1000);
3718        let tracker = broadcast_restore_quorum(&cfg, "mem-res-x").await.unwrap();
3719        let err = finalise_quorum(&tracker).unwrap_err();
3720        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3721        assert_eq!(tracker.id_drift_count(), 1);
3722    }
3723
3724    /// W12-G #19: id-drift on `broadcast_link_quorum` exercises the
3725    /// IdDrift arm at line ~851.
3726    #[tokio::test]
3727    async fn link_quorum_id_drift_peer_records_drift_not_ack() {
3728        let url1 = spawn_id_drift_peer().await;
3729        let cfg = build_config(vec![url1], 2, 1000);
3730        let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
3731        let err = finalise_quorum(&tracker).unwrap_err();
3732        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3733        assert_eq!(tracker.id_drift_count(), 1);
3734    }
3735
3736    /// W12-G #20: id-drift on `broadcast_consolidate_quorum` exercises
3737    /// the IdDrift arm at line ~935.
3738    #[tokio::test]
3739    async fn consolidate_quorum_id_drift_peer_records_drift_not_ack() {
3740        let url1 = spawn_id_drift_peer().await;
3741        let cfg = build_config(vec![url1], 2, 1000);
3742        let new_mem = sample_memory();
3743        let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
3744            .await
3745            .unwrap();
3746        let err = finalise_quorum(&tracker).unwrap_err();
3747        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3748        assert_eq!(tracker.id_drift_count(), 1);
3749    }
3750
3751    /// W12-G #21: id-drift on `broadcast_pending_quorum` exercises the
3752    /// IdDrift arm at line ~1024.
3753    #[tokio::test]
3754    async fn pending_quorum_id_drift_peer_records_drift_not_ack() {
3755        let url1 = spawn_id_drift_peer().await;
3756        let cfg = build_config(vec![url1], 2, 1000);
3757        let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
3758            .await
3759            .unwrap();
3760        let err = finalise_quorum(&tracker).unwrap_err();
3761        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3762        assert_eq!(tracker.id_drift_count(), 1);
3763    }
3764
3765    /// W12-G #22: id-drift on `broadcast_pending_decision_quorum`
3766    /// exercises the IdDrift arm at line ~1112.
3767    #[tokio::test]
3768    async fn pending_decision_quorum_id_drift_peer_records_drift_not_ack() {
3769        let url1 = spawn_id_drift_peer().await;
3770        let cfg = build_config(vec![url1], 2, 1000);
3771        let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
3772            .await
3773            .unwrap();
3774        let err = finalise_quorum(&tracker).unwrap_err();
3775        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3776        assert_eq!(tracker.id_drift_count(), 1);
3777    }
3778
3779    /// W12-G #23: id-drift on `broadcast_namespace_meta_quorum`
3780    /// exercises the IdDrift arm at line ~1201.
3781    #[tokio::test]
3782    async fn namespace_meta_quorum_id_drift_peer_records_drift_not_ack() {
3783        let url1 = spawn_id_drift_peer().await;
3784        let cfg = build_config(vec![url1], 2, 1000);
3785        let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
3786            .await
3787            .unwrap();
3788        let err = finalise_quorum(&tracker).unwrap_err();
3789        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3790        assert_eq!(tracker.id_drift_count(), 1);
3791    }
3792
3793    /// W12-G #24: id-drift on `broadcast_namespace_meta_clear_quorum`
3794    /// exercises the IdDrift arm at line ~1294.
3795    #[tokio::test]
3796    async fn namespace_meta_clear_quorum_id_drift_peer_records_drift_not_ack() {
3797        let url1 = spawn_id_drift_peer().await;
3798        let cfg = build_config(vec![url1], 2, 1000);
3799        let namespaces = vec!["app/team".to_string()];
3800        let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
3801            .await
3802            .unwrap();
3803        let err = finalise_quorum(&tracker).unwrap_err();
3804        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3805        assert_eq!(tracker.id_drift_count(), 1);
3806    }
3807
3808    /// W12-G #25: post-quorum detach for `broadcast_delete_quorum`
3809    /// fanout exercises the post-quorum spawn block at lines 608-616
3810    /// (the `if !joins.is_empty()` arm). With W=2 N=3 and one peer
3811    /// hanging, quorum is met by the two ack peers and the detached
3812    /// task drains the still-running join.
3813    #[tokio::test]
3814    async fn delete_quorum_post_quorum_detach_drains_remaining_peer() {
3815        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
3816        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
3817        let (url3, count3) = spawn_mock_peer(MockBehaviour::Fail).await;
3818        let cfg = build_config(vec![url1, url2, url3], 2, 2000);
3819        let _tracker = broadcast_delete_quorum(&cfg, "mem-detach").await.unwrap();
3820        // Wait long enough for the detached failing peer to finish its
3821        // first attempt + 250ms backoff + retry.
3822        for _ in 0..100 {
3823            if count1.load(Ordering::Relaxed) >= 1
3824                && count2.load(Ordering::Relaxed) >= 1
3825                && count3.load(Ordering::Relaxed) >= 1
3826            {
3827                break;
3828            }
3829            tokio::time::sleep(Duration::from_millis(20)).await;
3830        }
3831        // Failing peer must have been called by the detach (it
3832        // wouldn't have been if the detach was aborted on quorum-met).
3833        assert!(
3834            count3.load(Ordering::Relaxed) >= 1,
3835            "failing peer must be reached by the detached fanout"
3836        );
3837    }
3838
3839    /// W12-G #15: `AckTracker::finalise` returns `InFlight` when called
3840    /// pre-deadline with insufficient acks. Distinct from Timeout
3841    /// (post-deadline w/ partial) and Unreachable (post-deadline w/ none).
3842    /// Validates the third reason variant directly.
3843    #[test]
3844    fn ack_tracker_finalise_pre_deadline_returns_in_flight() {
3845        // Long timeout so we are pre-deadline at finalise().
3846        let policy = QuorumPolicy::new(3, 2, Duration::from_secs(60), Duration::from_secs(30))
3847            .expect("policy");
3848        let now = Instant::now();
3849        let mut t = AckTracker::new(policy, now);
3850        t.record_local();
3851        // No peer acks yet — finalise pre-deadline should be InFlight.
3852        let err = t.finalise(now).unwrap_err();
3853        match err {
3854            QuorumError::QuorumNotMet {
3855                got,
3856                needed,
3857                reason,
3858            } => {
3859                assert_eq!(got, 1);
3860                assert_eq!(needed, 2);
3861                assert_eq!(
3862                    reason,
3863                    QuorumFailureReason::InFlight,
3864                    "pre-deadline insufficient-ack must classify as InFlight"
3865                );
3866            }
3867            other => panic!("expected QuorumNotMet, got {other:?}"),
3868        }
3869    }
3870}