Skip to main content

ai_memory/federation/
mod.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! Federation autonomy — wires the quorum primitives from `replication`
5//! into the HTTP write path (v0.7 track C, PR 2 of N).
6//!
7//! ## Contract
8//!
9//! When the `ai-memory serve` daemon is started with `--quorum-writes N`
10//! and `--quorum-peers <url1,url2,…>`, every successful HTTP write
11//! fans out a 1-memory `/api/v1/sync/push` POST to each peer and counts
12//! 2xx responses as acks. The write returns OK to the HTTP caller only
13//! once the local commit plus `W - 1` peer acks land within the
14//! `--quorum-timeout-ms` deadline. Fewer acks → `503` with body
15//! `{"error":"quorum_not_met", "got":X, "needed":Y, "reason":…}`.
16//!
17//! ## Scope of this module
18//!
19//! - `FederationConfig` — the serve-time config parsed from CLI flags.
20//! - `broadcast_store_quorum` — async HTTP fan-out that builds an
21//!   `AckTracker` from `replication::QuorumPolicy`, spawns one task
22//!   per peer, and waits on either quorum-met or deadline.
23//! - Mock-peer integration tests covering the happy path, a dropped
24//!   ack pattern, and a total outage.
25//!
26//! ## NOT in scope of this module
27//!
28//! - The real multi-process chaos harness lives under `packaging/chaos/`
29//!   as an operator-facing shell script. A campaign report is produced
30//!   by `packaging/chaos/run-chaos.sh` — see that file for how to
31//!   measure the convergence bound committed to in ADR-0001.
32//! - MCP-over-stdio and CLI writes do NOT fan out to peers. The MCP
33//!   server is a single-tenant stdio client and the CLI is local; both
34//!   rely on the sync-daemon for eventual propagation. Only the HTTP
35//!   daemon is a federation node.
36
37// v0.7.0 epic — federation identity at scale. Phase 1: `identity::resolver`
38// de-hardcodes the `host:<hostname>` bootstrap identity behind an explicit
39// precedence (env > operator config > hostname). See ADR-001.
40pub mod identity;
41pub mod peer;
42pub mod peer_attestation;
43// v0.7.0 Track D #933 — federation push DLQ + replay worker. The
44// concrete module requires `async-trait` for the object-safe sink
45// trait + sqlx for the postgres sink; both are SAL-feature deps so
46// the entire DLQ surface is feature-gated to `--features sal`. The
47// sqlite-only (default-features) build keeps `FederationConfig.dlq_sink`
48// typed as `Option<()>` via the stub below so call sites stay uniform
49// across builds.
50#[cfg(feature = "sal")]
51pub mod push_dlq;
52pub mod quorum;
53pub mod receive;
54pub mod reflection_bookkeeping;
55// v0.7.0 #791 — per-message Ed25519 signing of federation POSTs.
56// Outbound POSTs (`broadcast_*_quorum`) attach an `X-Memory-Sig`
57// header; inbound `/sync/push` rejects missing / invalid sigs with
58// `401 Unauthorized` when `AI_MEMORY_FED_REQUIRE_SIG=1` (default).
59pub mod signing;
60pub mod sync;
61pub mod vector_clock;
62
63pub use quorum::*;
64pub use receive::spawn_catchup_loop;
65#[cfg(feature = "sal")]
66pub use receive::spawn_catchup_loop_with_store;
67// #935 (v0.7.0 Track D, 2026-05-20) — `catchup_once_for_tests` is a
68// public test driver for the integration test in
69// `tests/federation_catchup_api_key.rs`. Marked `#[doc(hidden)]` on
70// the source-side so it doesn't appear in rustdoc, but kept `pub`
71// here so the integration test (separate crate) can import it.
72pub use receive::catchup_once_for_tests;
73pub use sync::*;
74// v0.7.0 Track D #933 — re-export push DLQ surface for daemon bootstrap +
75// integration tests.
76#[cfg(feature = "sal")]
77pub use push_dlq::{
78    FederationDlqSink, FederationPushDlqRow, REPLAY_BATCH_SIZE, replay_once,
79    spawn_replay_federation_push_dlq,
80};
81
82use crate::replication::QuorumPolicy;
83
84/// Tracing target for the quorum-broadcast / fan-out sync path
85/// (`sync.rs` + the postgres create-path branch in
86/// `handlers::create`). #1558 tracing-target SSOT.
87pub(crate) const SYNC_TRACE_TARGET: &str = "ai_memory::federation::sync";
88
89/// Tracing target for per-message Ed25519 federation signing —
90/// outbound header attachment (`sync.rs`, `identity::outbound`),
91/// credential renewal (`identity::renewal`), and the receive-side
92/// verification branch (`handlers::federation_signing_check`).
93/// #1558 tracing-target SSOT.
94pub(crate) const SIGNING_TRACE_TARGET: &str = "federation::signing";
95
96/// Configured-at-serve federation state. Parsed from
97/// `--quorum-writes` + `--quorum-peers` + `--quorum-timeout-ms`.
98#[derive(Clone)]
99pub struct FederationConfig {
100    pub policy: QuorumPolicy,
101    pub peers: Vec<PeerEndpoint>,
102    pub client: reqwest::Client,
103    pub sender_agent_id: String,
104    /// v0.7.0 fold-A2A1.4 (#702) — the operator-configured `[api] api_key`
105    /// from the local daemon's `AppConfig`, threaded here so outbound
106    /// federation POSTs can attach the `x-api-key` header. Without this,
107    /// a peer that itself runs with `api_key` set rejects every fanout
108    /// with 401 and quorum can never converge across hosts. `None` means
109    /// the local daemon doesn't run with api-key auth — outbound headers
110    /// stay unmodified (backwards-compatible with mTLS-only deployments
111    /// and the v0.6.x default-off auth posture).
112    pub api_key: Option<String>,
113    /// v0.7.0 #791 — Ed25519 signing key the outbound `post_once` uses
114    /// to compute the `X-Memory-Sig: ed25519=<base64>` header. `None`
115    /// = no header attached (legacy peers + receivers that opted out
116    /// via `AI_MEMORY_FED_REQUIRE_SIG=0` keep working).
117    pub signing_key: Option<std::sync::Arc<ed25519_dalek::SigningKey>>,
118    /// v0.7.0 Track D #933 — federation push DLQ sink. When `Some`,
119    /// per-peer fanout failures inside `broadcast_store_quorum`
120    /// (Fail outcome OR no-Ack-before-deadline) land a row in
121    /// `federation_push_dlq` via this sink, and the
122    /// `replay_federation_push_dlq` worker re-attempts the push
123    /// later. `None` preserves pre-#933 behaviour (silent fanout
124    /// failures) for builds/configs that haven't wired the SAL store
125    /// — typically test harnesses that exercise `broadcast_*_quorum`
126    /// in isolation.
127    ///
128    /// Feature-gated to `--features sal` because the trait surface
129    /// (`async-trait`) is a SAL-only dep.
130    #[cfg(feature = "sal")]
131    pub dlq_sink: Option<std::sync::Arc<dyn push_dlq::FederationDlqSink>>,
132}
133
134/// A single peer in the quorum mesh. The `id` is what we record in
135/// the ack tracker (typically the URL or the peer's mTLS fingerprint).
136#[derive(Clone, Debug)]
137pub struct PeerEndpoint {
138    pub id: String,
139    pub sync_push_url: String,
140}
141
142/// #1566 / #1579 B1 — embed-once-replicate-vector. A source-side
143/// embedding shipped alongside its memory row in the federation
144/// `/sync/push` payload (wire key [`crate::models::field_names::EMBEDDINGS`]).
145///
146/// ## Wire contract
147///
148/// - The array rides INSIDE the JSON body that `sync::post_once`
149///   serialises once and signs (`X-Memory-Sig` over the exact body
150///   bytes, nonce-bound per #922), so the vector's TRANSIT integrity is
151///   covered by the same Ed25519 signature + replay protection as the
152///   memory rows themselves: a vector altered IN FLIGHT invalidates the
153///   signature.
154///
155///   **Trust boundary (#1584).** The signature attests the SENDER and
156///   that the bytes were not altered in transit — it does NOT attest
157///   that the f32 values are a well-formed embedding, nor that the
158///   vector honestly embeds the shipped `(title, content)`. The
159///   content↔vector honesty is an inherent limit of shipping
160///   sender-computed vectors (the receiver trusts the enrolled peer not
161///   to mislabel). The VALUE DOMAIN, however, is receiver-enforced:
162///   [`sanitize_shipped_vector`] rejects non-finite components and
163///   L2-normalizes the rest before storage, so a peer cannot poison
164///   cosine ranking with a NaN/±Inf or high-magnitude vector. (Non-finite
165///   components additionally cannot cross the JSON wire at all — serde
166///   serialises them to `null` and the strict `Vec<f32>` decoder rejects
167///   it with `400`.)
168/// - Decode is TOLERANT of absence: the receiver's `SyncPushBody`
169///   field defaults to an empty vec, so pushes from older peers (no
170///   `embeddings` key) and pushes to older peers (unknown fields are
171///   ignored — request structs are deliberately permissive per #1052)
172///   both interoperate. The fleet swaps as one, but the protocol must
173///   not hard-require the field.
174///
175/// ## Receive contract
176///
177/// The receiver stores `vector` directly ONLY when `dim` matches its
178/// own configured embedder dimensionality (the same dim-safety
179/// property recall's H7 `CosineComparison::DimensionMismatch` exists
180/// for). On mismatch — or when no vector was shipped — the row falls
181/// back to the deferred background-embed path; either way the
182/// receiver acks after commit WITHOUT a synchronous embed (~1s/row
183/// via ollama pre-#1566, which rode inside the sender's quorum-ack
184/// window and drove the `deadline_exceeded` → DLQ cascade).
185#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
186pub struct ShippedEmbedding {
187    /// Id of the memory row (in the same push) this vector belongs to.
188    pub memory_id: String,
189    /// Human-readable id of the model that produced the vector
190    /// (sender's `Embedder::model_description()`). Observability only —
191    /// the dim gate is the load-bearing safety check.
192    pub model: String,
193    /// Dimensionality the sender claims for `vector`. Receivers verify
194    /// `dim == vector.len()` AND `dim == local embedder dim` before
195    /// storing the vector directly.
196    pub dim: usize,
197    /// The embedding vector itself.
198    pub vector: Vec<f32>,
199}
200
201impl ShippedEmbedding {
202    /// Build a shipped embedding for `memory_id` from a freshly
203    /// computed vector; `dim` is derived from the vector length so the
204    /// two can never disagree on the sender side.
205    #[must_use]
206    pub fn new(memory_id: String, model: String, vector: Vec<f32>) -> Self {
207        Self {
208            memory_id,
209            model,
210            dim: vector.len(),
211            vector,
212        }
213    }
214}
215
216/// #1584 (SEC) — tolerance band around unit L2 norm within which a
217/// peer-shipped vector is accepted as-is (already normalized by the
218/// sender's embedder). Outside the band the receiver re-normalizes; a
219/// zero / non-finite norm is rejected entirely.
220pub const SHIPPED_VECTOR_NORM_TOLERANCE: f32 = 1e-3;
221
222/// #1584 (SEC, MED) — validate + L2-normalize a peer-shipped embedding
223/// before it is stored as a memory's embedding on the #1579 B1
224/// embed-ship receive path.
225///
226/// **Threat.** B1 stores the SENDER's vector directly (no receiver
227/// re-embed) once the dimension gate (`dim == vector.len() == local
228/// embedder dim`) passes. The Ed25519 envelope signature proves the
229/// bytes came from the holder of the peer key and were not altered in
230/// transit — it does NOT prove the f32 values are a well-formed
231/// embedding, and it does NOT prove the vector honestly embeds the
232/// shipped `(title, content)` (that content↔vector honesty is an
233/// inherent limit of shipping sender-computed vectors; the receiver
234/// trusts the enrolled peer not to mislabel). What the receiver CAN
235/// and MUST enforce is the value domain:
236///
237/// - **Non-finite components** (NaN / ±Inf) silently corrupt cosine
238///   ordering: NaN is unordered under `partial_cmp`, so a single
239///   poisoned row perturbs the ranking of an entire candidate set.
240/// - **Non-unit-norm vectors** break the cosine assumption: the HNSW
241///   distance (`1.0 - dot`, [`crate::hnsw`]) and the linear-scan paths
242///   assume L2-normalized operands, so a high-magnitude vector inflates
243///   its dot product against every query and ranks itself artificially
244///   high across all recalls — the cheapest ranking-manipulation
245///   primitive on this surface, needing no NaN trickery.
246///
247/// Returns `Some(v)` — a finite, L2-normalized vector safe to store —
248/// or `None` when the vector is unusable (any non-finite component, or
249/// a zero / non-finite norm), in which case the caller falls back to a
250/// local re-embed of the row's text (the SAME fallback the dim-mismatch
251/// arm already uses). Locally-computed embeddings are normalized at
252/// embed time, so a well-behaved peer's vector lands inside
253/// [`SHIPPED_VECTOR_NORM_TOLERANCE`] and is stored byte-for-byte.
254#[must_use]
255pub fn sanitize_shipped_vector(vector: &[f32]) -> Option<Vec<f32>> {
256    if vector.is_empty() || vector.iter().any(|x| !x.is_finite()) {
257        return None;
258    }
259    let norm_sq: f32 = vector.iter().map(|x| x * x).sum();
260    if !norm_sq.is_finite() || norm_sq <= 0.0 {
261        return None;
262    }
263    let norm = norm_sq.sqrt();
264    if (norm - 1.0).abs() <= SHIPPED_VECTOR_NORM_TOLERANCE {
265        return Some(vector.to_vec());
266    }
267    let inv = 1.0 / norm;
268    Some(vector.iter().map(|x| x * inv).collect())
269}
270
271#[cfg(test)]
272mod sanitize_shipped_vector_tests {
273    use super::sanitize_shipped_vector;
274
275    /// #1584 — a non-finite component (NaN / ±Inf) rejects the vector
276    /// so the caller re-embeds locally instead of poisoning ranking.
277    #[test]
278    fn rejects_non_finite_components() {
279        assert!(sanitize_shipped_vector(&[0.6, f32::NAN, 0.8]).is_none());
280        assert!(sanitize_shipped_vector(&[f32::INFINITY, 0.0]).is_none());
281        assert!(sanitize_shipped_vector(&[1.0, f32::NEG_INFINITY]).is_none());
282    }
283
284    /// #1584 — a zero vector (zero norm) and an empty vector are
285    /// rejected (no meaningful direction to store).
286    #[test]
287    fn rejects_zero_and_empty() {
288        assert!(sanitize_shipped_vector(&[]).is_none());
289        assert!(sanitize_shipped_vector(&[0.0, 0.0, 0.0]).is_none());
290    }
291
292    /// #1584 — an already-unit-norm vector is stored byte-for-byte
293    /// (well-behaved peers pay no perturbation).
294    #[test]
295    fn unit_norm_vector_passes_through() {
296        let v = vec![0.6_f32, 0.8]; // norm = 1.0 exactly
297        let out = sanitize_shipped_vector(&v).expect("unit-norm accepted");
298        assert_eq!(out, v);
299    }
300
301    /// #1584 — a high-magnitude (non-normalized) vector is
302    /// L2-normalized before storage, neutralizing the "rank myself high
303    /// everywhere" dot-product-inflation primitive.
304    #[test]
305    fn high_magnitude_vector_is_normalized() {
306        let v = vec![30.0_f32, 40.0]; // norm = 50
307        let out = sanitize_shipped_vector(&v).expect("finite vector normalized");
308        let norm: f32 = out.iter().map(|x| x * x).sum::<f32>().sqrt();
309        assert!(
310            (norm - 1.0).abs() < 1e-6,
311            "normalized to unit norm; got {norm}"
312        );
313        // Direction preserved: 30/50, 40/50.
314        assert!((out[0] - 0.6).abs() < 1e-6 && (out[1] - 0.8).abs() < 1e-6);
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::receive::{catchup_once, urlencoding_encode};
321    use super::sync::AckOutcome;
322    use super::*;
323    use crate::models::{Memory, MemoryLink, NamespaceMetaEntry, PendingAction, PendingDecision};
324    use crate::replication::{AckTracker, QuorumError, QuorumFailureReason, QuorumPolicy};
325    use axum::Router;
326    use axum::extract::Json as AxumJson;
327    use axum::http::StatusCode;
328    use axum::routing::post;
329    use std::sync::Arc;
330    use std::sync::atomic::{AtomicUsize, Ordering};
331    use std::time::{Duration, Instant};
332    use tokio::net::TcpListener;
333    use tokio::sync::Mutex;
334
335    fn sample_memory() -> Memory {
336        let now = chrono::Utc::now().to_rfc3339();
337        Memory {
338            id: "fed-test".to_string(),
339            tier: crate::models::Tier::Mid,
340            namespace: "app".to_string(),
341            title: "hello".to_string(),
342            content: "world for federation test".to_string(),
343            tags: vec!["t".to_string()],
344            priority: 5,
345            confidence: 1.0,
346            source: "test".to_string(),
347            access_count: 0,
348            created_at: now.clone(),
349            updated_at: now,
350            last_accessed_at: None,
351            expires_at: None,
352            metadata: serde_json::json!({"agent_id":"ai:test"}),
353            reflection_depth: 0,
354            memory_kind: crate::models::MemoryKind::Observation,
355            entity_id: None,
356            persona_version: None,
357            citations: Vec::new(),
358            source_uri: None,
359            source_span: None,
360            confidence_source: crate::models::ConfidenceSource::CallerProvided,
361            confidence_signals: None,
362            confidence_decayed_at: None,
363            version: 1,
364        }
365    }
366
367    #[derive(Clone, Copy)]
368    enum MockBehaviour {
369        Ack,
370        Fail,
371        Hang,
372        /// Return HTTP 500 on the first `fail_until` calls, then 200.
373        /// Used to exercise the S40 retry-once path.
374        FailThenAck {
375            fail_until: usize,
376        },
377    }
378
379    #[derive(Clone)]
380    struct MockState {
381        behaviour: MockBehaviour,
382        count: Arc<AtomicUsize>,
383    }
384
385    async fn mock_handler(
386        axum::extract::State(state): axum::extract::State<MockState>,
387        AxumJson(_body): AxumJson<serde_json::Value>,
388    ) -> (StatusCode, AxumJson<serde_json::Value>) {
389        let call = state.count.fetch_add(1, Ordering::Relaxed) + 1;
390        match state.behaviour {
391            MockBehaviour::Ack => (
392                StatusCode::OK,
393                AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
394            ),
395            MockBehaviour::Fail => (
396                StatusCode::INTERNAL_SERVER_ERROR,
397                AxumJson(serde_json::json!({"error":"stub failure"})),
398            ),
399            MockBehaviour::Hang => {
400                tokio::time::sleep(Duration::from_secs(10)).await;
401                (StatusCode::OK, AxumJson(serde_json::json!({"applied":1})))
402            }
403            MockBehaviour::FailThenAck { fail_until } => {
404                if call <= fail_until {
405                    (
406                        StatusCode::INTERNAL_SERVER_ERROR,
407                        AxumJson(serde_json::json!({"error":"stub transient failure"})),
408                    )
409                } else {
410                    (
411                        StatusCode::OK,
412                        AxumJson(serde_json::json!({"applied":1,"noop":0,"skipped":0})),
413                    )
414                }
415            }
416        }
417    }
418
419    async fn spawn_mock_peer(behaviour: MockBehaviour) -> (String, Arc<AtomicUsize>) {
420        let call_count = Arc::new(AtomicUsize::new(0));
421        let state = MockState {
422            behaviour,
423            count: call_count.clone(),
424        };
425        let app = Router::new()
426            .route("/api/v1/sync/push", post(mock_handler))
427            .with_state(state);
428        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
429        let addr = listener.local_addr().unwrap();
430        tokio::spawn(async move {
431            axum::serve(listener, app).await.ok();
432        });
433        (format!("http://{addr}"), call_count)
434    }
435
436    fn build_config(peers: Vec<String>, w: usize, timeout_ms: u64) -> FederationConfig {
437        let client = reqwest::Client::builder()
438            .timeout(Duration::from_millis(timeout_ms))
439            .build()
440            .unwrap();
441        let n = 1 + peers.len();
442        FederationConfig {
443            policy: QuorumPolicy::new(
444                n,
445                w,
446                Duration::from_millis(timeout_ms),
447                Duration::from_secs(30),
448            )
449            .unwrap(),
450            peers: peers
451                .into_iter()
452                .enumerate()
453                .map(|(i, url)| PeerEndpoint {
454                    id: format!("peer-{i}:{url}"),
455                    sync_push_url: format!("{url}/api/v1/sync/push"),
456                })
457                .collect(),
458            client,
459            sender_agent_id: "ai:fed-test".to_string(),
460            api_key: None,
461            signing_key: None,
462            #[cfg(feature = "sal")]
463            dlq_sink: None,
464        }
465    }
466
467    #[tokio::test]
468    async fn happy_path_two_peers_quorum_met() {
469        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
470        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
471        let cfg = build_config(vec![url1, url2], 2, 2000);
472        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
473            .await
474            .unwrap();
475        let result = finalise_quorum(&tracker);
476        assert!(result.is_ok(), "expected quorum met, got {result:?}");
477        // At least one peer called before quorum returned. With v0.6.0's
478        // post-quorum detach, additional fan-outs complete in the
479        // background and may or may not have landed by the time this
480        // assertion runs — the synchronous contract is only "≥ 1 peer
481        // acked before return".
482        let calls = count1.load(Ordering::Relaxed) + count2.load(Ordering::Relaxed);
483        assert!(calls >= 1);
484    }
485
486    /// #931 (v0.7.0 Track D, 2026-05-20) — `broadcast_store_quorum`
487    /// MUST emit an info-level entry-line log on every call so the
488    /// silent-bypass case (function never invoked) is immediately
489    /// distinguishable from "function called but every peer failed".
490    /// Pre-#931 there was no entry log; the only federation tracing
491    /// was per-peer warn lines on a per-peer failure, so the Track D
492    /// Docker probe couldn't tell whether `app.federation` was
493    /// `None` (handler bypassed the call) or every peer 401'd.
494    ///
495    /// This test pins the wire wording `federation::broadcast: store`
496    /// and the structured fields. Any refactor that drops the log or
497    /// changes the phrase MUST update the Track D docker probe in
498    /// lockstep.
499    #[tokio::test]
500    async fn broadcast_emits_entry_line_log_for_track_d_grep() {
501        use tracing_subscriber::Registry;
502        use tracing_subscriber::layer::SubscriberExt;
503
504        #[derive(Clone, Default)]
505        struct CaptureLayer(Arc<std::sync::Mutex<Vec<String>>>);
506        impl<S: tracing::Subscriber> tracing_subscriber::Layer<S> for CaptureLayer {
507            fn on_event(
508                &self,
509                event: &tracing::Event<'_>,
510                _ctx: tracing_subscriber::layer::Context<'_, S>,
511            ) {
512                struct Visit<'a>(&'a mut Vec<String>);
513                impl tracing::field::Visit for Visit<'_> {
514                    fn record_debug(
515                        &mut self,
516                        field: &tracing::field::Field,
517                        value: &dyn std::fmt::Debug,
518                    ) {
519                        if field.name() == "message" {
520                            self.0.push(format!("{value:?}"));
521                        }
522                    }
523                    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
524                        if field.name() == "message" {
525                            self.0.push(value.to_string());
526                        }
527                    }
528                }
529                let mut local: Vec<String> = Vec::new();
530                event.record(&mut Visit(&mut local));
531                if let Ok(mut buf) = self.0.lock() {
532                    buf.extend(local);
533                }
534            }
535        }
536
537        let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
538        let cfg = build_config(vec![url1], 1, 1000);
539
540        let layer = CaptureLayer::default();
541        let messages = layer.0.clone();
542        let dispatch = tracing::Dispatch::new(Registry::default().with(layer));
543
544        // Bind the subscriber for the duration of the broadcast call.
545        // `set_default` is per-thread; the broadcast lives inside the
546        // current task, but spawned peer-fanout tasks may run on
547        // other tokio workers — we only need the entry-line log
548        // which fires on the calling thread before any spawn.
549        {
550            let _guard = tracing::dispatcher::set_default(&dispatch);
551            let _ = broadcast_store_quorum(&cfg, &sample_memory())
552                .await
553                .expect("broadcast must succeed");
554        }
555
556        let captured = messages.lock().unwrap().clone();
557        let joined = captured.join("\n");
558        assert!(
559            joined.contains("federation::broadcast: store"),
560            "expected entry-line log `federation::broadcast: store ... -> 1 peer(s)`; got:\n{joined}"
561        );
562    }
563
564    #[tokio::test]
565    async fn post_quorum_fanout_reaches_all_peers() {
566        // Contract: once quorum is met, the background detach must still
567        // deliver the write to every peer. Ship-gate run 14 uncovered the
568        // prior abort-on-quorum regression that left one peer permanently
569        // missing ~50% of burst writes under W=2/N=3.
570        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
571        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
572        let cfg = build_config(vec![url1, url2], 2, 2000);
573        let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
574            .await
575            .unwrap();
576        // Give the detached fanout a slow path to complete. Mock handlers
577        // are in-process, so 200ms is comfortable without being flaky.
578        for _ in 0..20 {
579            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
580                break;
581            }
582            tokio::time::sleep(Duration::from_millis(10)).await;
583        }
584        assert_eq!(
585            count1.load(Ordering::Relaxed),
586            1,
587            "peer-1 must receive the write post-quorum"
588        );
589        assert_eq!(
590            count2.load(Ordering::Relaxed),
591            1,
592            "peer-2 must receive the write post-quorum"
593        );
594    }
595
596    #[tokio::test]
597    async fn transient_peer_failure_is_retried_once() {
598        // S40 regression guard: a transient 5xx from a peer on the
599        // first POST must be retried exactly once. Previously the post
600        // was fire-and-forget — one peer that 5xx'd a single bulk row
601        // left that row permanently missing on that peer (v3r26
602        // hermes-tls scenario-40: node-2 saw 499/500).
603        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
604        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
605        let cfg = build_config(vec![url1, url2], 2, 2000);
606        let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
607            .await
608            .unwrap();
609        // Retry backoff is 250ms + retry round-trip; poll up to 2s.
610        for _ in 0..200 {
611            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
612                break;
613            }
614            tokio::time::sleep(Duration::from_millis(10)).await;
615        }
616        assert_eq!(
617            count1.load(Ordering::Relaxed),
618            1,
619            "peer-1 acked first time, no retry"
620        );
621        assert_eq!(
622            count2.load(Ordering::Relaxed),
623            2,
624            "peer-2 must see exactly two attempts (first fail, retry ack)"
625        );
626    }
627
628    #[tokio::test]
629    async fn persistent_peer_failure_stops_after_one_retry() {
630        // Retry policy is exactly one retry — a peer that stays down
631        // must NOT be called more than twice per row (no infinite
632        // backoff, no thundering herd on a wedged peer).
633        let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
634        let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
635        let cfg = build_config(vec![url1, url2], 2, 2000);
636        let _tracker = broadcast_store_quorum(&cfg, &sample_memory())
637            .await
638            .unwrap();
639        // Wait long enough that any further retries would have fired.
640        tokio::time::sleep(Duration::from_millis(800)).await;
641        assert_eq!(
642            count2.load(Ordering::Relaxed),
643            2,
644            "persistently-failing peer must be called exactly twice (1 + 1 retry)"
645        );
646    }
647
648    #[tokio::test]
649    async fn bulk_catchup_push_hits_every_peer_once() {
650        // S40 catchup: verify the terminal batch POST reaches every
651        // peer exactly once, with the full row set in a single request.
652        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
653        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
654        let cfg = build_config(vec![url1, url2], 2, 2000);
655        let mems = vec![sample_memory(), sample_memory(), sample_memory()];
656        let errors = bulk_catchup_push(&cfg, &mems).await;
657        assert!(
658            errors.is_empty(),
659            "catchup must succeed on healthy peers, got {errors:?}"
660        );
661        assert_eq!(
662            count1.load(Ordering::Relaxed),
663            1,
664            "peer-1 must receive exactly one catchup batch"
665        );
666        assert_eq!(
667            count2.load(Ordering::Relaxed),
668            1,
669            "peer-2 must receive exactly one catchup batch"
670        );
671    }
672
673    #[tokio::test]
674    async fn bulk_catchup_push_reports_peer_failures() {
675        // Catchup errors must be surfaced to the caller for logging —
676        // quorum was already met upstream, so the HTTP contract holds,
677        // but the leader should record which peers fell behind.
678        let (url1, _) = spawn_mock_peer(MockBehaviour::Ack).await;
679        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
680        let cfg = build_config(vec![url1, url2], 2, 2000);
681        let mems = vec![sample_memory()];
682        let errors = bulk_catchup_push(&cfg, &mems).await;
683        assert_eq!(errors.len(), 1, "exactly one peer failed the catchup");
684        assert!(
685            errors[0].1.contains("500") || errors[0].1.contains("http"),
686            "error must name the HTTP failure, got {:?}",
687            errors[0]
688        );
689    }
690
691    #[tokio::test]
692    async fn bulk_catchup_push_empty_inputs_are_noop() {
693        // No rows + no peers → no work, no panics, no POSTs.
694        let cfg = build_config(vec![], 1, 500);
695        assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
696
697        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
698        let cfg = build_config(vec![url1], 1, 500);
699        assert!(bulk_catchup_push(&cfg, &[]).await.is_empty());
700        assert_eq!(
701            count1.load(Ordering::Relaxed),
702            0,
703            "no catchup POST must fire when the row set is empty"
704        );
705    }
706
707    #[tokio::test]
708    async fn partition_minority_fails_quorum() {
709        // N = 3, W = 3. Two peers fail → cannot meet quorum.
710        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
711        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
712        let cfg = build_config(vec![url1, url2], 3, 500);
713        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
714            .await
715            .unwrap();
716        let err = finalise_quorum(&tracker).unwrap_err();
717        match err {
718            QuorumError::QuorumNotMet { got, needed, .. } => {
719                assert_eq!(got, 1, "local commit only");
720                assert_eq!(needed, 3);
721            }
722            other => panic!("expected QuorumNotMet, got {other:?}"),
723        }
724    }
725
726    #[tokio::test]
727    async fn timeout_on_hanging_peer_classified_timeout() {
728        // N = 2, W = 2. One hanging peer → timeout before ack.
729        let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
730        let cfg = build_config(vec![url1], 2, 200);
731        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
732            .await
733            .unwrap();
734        // Ensure the deadline passed.
735        tokio::time::sleep(Duration::from_millis(50)).await;
736        let err = finalise_quorum(&tracker).unwrap_err();
737        match err {
738            QuorumError::QuorumNotMet { reason, .. } => {
739                assert!(
740                    matches!(
741                        reason,
742                        QuorumFailureReason::Timeout | QuorumFailureReason::Unreachable
743                    ),
744                    "unexpected reason {reason:?}"
745                );
746            }
747            other => panic!("expected QuorumNotMet, got {other:?}"),
748        }
749    }
750
751    #[tokio::test]
752    async fn majority_quorum_tolerates_one_peer_down() {
753        // N = 3, W = 2 (majority). One fails, one acks → quorum met.
754        let (url_up, _) = spawn_mock_peer(MockBehaviour::Ack).await;
755        let (url_down, _) = spawn_mock_peer(MockBehaviour::Fail).await;
756        let cfg = build_config(vec![url_up, url_down], 2, 2000);
757        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
758            .await
759            .unwrap();
760        let result = finalise_quorum(&tracker);
761        assert!(
762            result.is_ok(),
763            "majority should tolerate 1 peer down, got {result:?}"
764        );
765    }
766
767    #[test]
768    fn config_build_disabled_when_w_zero() {
769        let cfg = FederationConfig::build(
770            0,
771            &["http://example.com".to_string()],
772            Duration::from_millis(500),
773            None,
774            None,
775            None,
776            "ai:test".to_string(),
777            None,
778        )
779        .unwrap();
780        assert!(cfg.is_none());
781    }
782
783    #[test]
784    fn config_build_disabled_when_peers_empty() {
785        let cfg = FederationConfig::build(
786            2,
787            &[],
788            Duration::from_millis(500),
789            None,
790            None,
791            None,
792            "ai:test".to_string(),
793            None,
794        )
795        .unwrap();
796        assert!(cfg.is_none());
797    }
798
799    #[test]
800    fn quorum_not_met_payload_from_err() {
801        let err = QuorumError::QuorumNotMet {
802            got: 1,
803            needed: 3,
804            reason: QuorumFailureReason::Timeout,
805        };
806        let payload = QuorumNotMetPayload::from_err(&err);
807        assert_eq!(payload.error, "quorum_not_met");
808        assert_eq!(payload.got, 1);
809        assert_eq!(payload.needed, 3);
810        assert_eq!(payload.reason, "timeout");
811    }
812
813    // --- broadcast_archive_quorum tests (S29) ---
814
815    #[tokio::test]
816    async fn archive_quorum_two_peers_ack_meets_quorum() {
817        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
818        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
819        let cfg = build_config(vec![url1, url2], 2, 2000);
820        let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
821        let result = finalise_quorum(&tracker);
822        assert!(result.is_ok(), "expected quorum met, got {result:?}");
823        // Let detached fanout complete so both peers are observed.
824        for _ in 0..20 {
825            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
826                break;
827            }
828            tokio::time::sleep(Duration::from_millis(10)).await;
829        }
830        assert_eq!(count1.load(Ordering::Relaxed), 1);
831        assert_eq!(count2.load(Ordering::Relaxed), 1);
832    }
833
834    #[tokio::test]
835    async fn archive_quorum_partition_minority_fails() {
836        // N = 3, W = 3. Two peers fail → archive quorum cannot be met.
837        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
838        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
839        let cfg = build_config(vec![url1, url2], 3, 500);
840        let tracker = broadcast_archive_quorum(&cfg, "mem-s29").await.unwrap();
841        let err = finalise_quorum(&tracker).unwrap_err();
842        match err {
843            QuorumError::QuorumNotMet { got, needed, .. } => {
844                assert_eq!(got, 1);
845                assert_eq!(needed, 3);
846            }
847            other => panic!("expected QuorumNotMet, got {other:?}"),
848        }
849    }
850
851    // --- broadcast_delete_quorum tests (Wave 3) ---
852    //
853    // The delete fanout mirrors the store fanout but rides a `deletions: [id]`
854    // payload instead of memory bodies. These two cases hit the entire
855    // function body — happy ack loop, deadline check, post-quorum detach,
856    // tracker unwrap.
857
858    #[tokio::test]
859    async fn delete_quorum_two_peers_ack_meets_quorum() {
860        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
861        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
862        let cfg = build_config(vec![url1, url2], 2, 2000);
863        let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
864        assert!(finalise_quorum(&tracker).is_ok());
865        for _ in 0..20 {
866            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
867                break;
868            }
869            tokio::time::sleep(Duration::from_millis(10)).await;
870        }
871        assert_eq!(count1.load(Ordering::Relaxed), 1);
872        assert_eq!(count2.load(Ordering::Relaxed), 1);
873    }
874
875    #[tokio::test]
876    async fn delete_quorum_partition_minority_fails() {
877        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
878        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
879        let cfg = build_config(vec![url1, url2], 3, 500);
880        let tracker = broadcast_delete_quorum(&cfg, "mem-del").await.unwrap();
881        let err = finalise_quorum(&tracker).unwrap_err();
882        match err {
883            QuorumError::QuorumNotMet { got, needed, .. } => {
884                assert_eq!(got, 1);
885                assert_eq!(needed, 3);
886            }
887            other => panic!("expected QuorumNotMet, got {other:?}"),
888        }
889    }
890
891    // --- broadcast_restore_quorum tests (Wave 3) ---
892
893    #[tokio::test]
894    async fn restore_quorum_two_peers_ack_meets_quorum() {
895        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
896        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
897        let cfg = build_config(vec![url1, url2], 2, 2000);
898        let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
899        assert!(finalise_quorum(&tracker).is_ok());
900        for _ in 0..20 {
901            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
902                break;
903            }
904            tokio::time::sleep(Duration::from_millis(10)).await;
905        }
906        assert_eq!(count1.load(Ordering::Relaxed), 1);
907        assert_eq!(count2.load(Ordering::Relaxed), 1);
908    }
909
910    #[tokio::test]
911    async fn restore_quorum_partition_minority_fails() {
912        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
913        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
914        let cfg = build_config(vec![url1, url2], 3, 500);
915        let tracker = broadcast_restore_quorum(&cfg, "mem-restore").await.unwrap();
916        let err = finalise_quorum(&tracker).unwrap_err();
917        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
918    }
919
920    // --- broadcast_link_quorum tests (Wave 3) ---
921
922    fn sample_link() -> MemoryLink {
923        MemoryLink {
924            source_id: "mem-a".to_string(),
925            target_id: "mem-b".to_string(),
926            relation: crate::models::MemoryLinkRelation::RelatedTo,
927            created_at: chrono::Utc::now().to_rfc3339(),
928            signature: None,
929            observed_by: None,
930            valid_from: None,
931            valid_until: None,
932            attest_level: None,
933        }
934    }
935
936    #[tokio::test]
937    async fn link_quorum_two_peers_ack_meets_quorum() {
938        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
939        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
940        let cfg = build_config(vec![url1, url2], 2, 2000);
941        let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
942        assert!(finalise_quorum(&tracker).is_ok());
943        for _ in 0..20 {
944            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
945                break;
946            }
947            tokio::time::sleep(Duration::from_millis(10)).await;
948        }
949        assert_eq!(count1.load(Ordering::Relaxed), 1);
950        assert_eq!(count2.load(Ordering::Relaxed), 1);
951    }
952
953    #[tokio::test]
954    async fn link_quorum_partition_minority_fails() {
955        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
956        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
957        let cfg = build_config(vec![url1, url2], 3, 500);
958        let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
959        let err = finalise_quorum(&tracker).unwrap_err();
960        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
961    }
962
963    // --- broadcast_consolidate_quorum tests (Wave 3) ---
964
965    #[tokio::test]
966    async fn consolidate_quorum_two_peers_ack_meets_quorum() {
967        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
968        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
969        let cfg = build_config(vec![url1, url2], 2, 2000);
970        let new_mem = sample_memory();
971        let sources = vec!["src-a".to_string(), "src-b".to_string()];
972        let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &sources)
973            .await
974            .unwrap();
975        assert!(finalise_quorum(&tracker).is_ok());
976        for _ in 0..20 {
977            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
978                break;
979            }
980            tokio::time::sleep(Duration::from_millis(10)).await;
981        }
982        assert_eq!(count1.load(Ordering::Relaxed), 1);
983        assert_eq!(count2.load(Ordering::Relaxed), 1);
984    }
985
986    #[tokio::test]
987    async fn consolidate_quorum_partition_minority_fails() {
988        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
989        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
990        let cfg = build_config(vec![url1, url2], 3, 500);
991        let new_mem = sample_memory();
992        let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
993            .await
994            .unwrap();
995        let err = finalise_quorum(&tracker).unwrap_err();
996        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
997    }
998
999    // --- broadcast_pending_quorum tests (Wave 3) ---
1000
1001    fn sample_pending() -> PendingAction {
1002        PendingAction {
1003            id: "pa-1".to_string(),
1004            action_type: "delete".to_string(),
1005            memory_id: Some("mem-x".to_string()),
1006            namespace: "app".to_string(),
1007            payload: serde_json::json!({}),
1008            requested_by: "ai:test".to_string(),
1009            requested_at: chrono::Utc::now().to_rfc3339(),
1010            status: "pending".to_string(),
1011            decided_by: None,
1012            decided_at: None,
1013            approvals: vec![],
1014        }
1015    }
1016
1017    #[tokio::test]
1018    async fn pending_quorum_two_peers_ack_meets_quorum() {
1019        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1020        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1021        let cfg = build_config(vec![url1, url2], 2, 2000);
1022        let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
1023            .await
1024            .unwrap();
1025        assert!(finalise_quorum(&tracker).is_ok());
1026        for _ in 0..20 {
1027            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1028                break;
1029            }
1030            tokio::time::sleep(Duration::from_millis(10)).await;
1031        }
1032        assert_eq!(count1.load(Ordering::Relaxed), 1);
1033        assert_eq!(count2.load(Ordering::Relaxed), 1);
1034    }
1035
1036    #[tokio::test]
1037    async fn pending_quorum_partition_minority_fails() {
1038        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1039        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1040        let cfg = build_config(vec![url1, url2], 3, 500);
1041        let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
1042            .await
1043            .unwrap();
1044        let err = finalise_quorum(&tracker).unwrap_err();
1045        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
1046    }
1047
1048    // --- broadcast_pending_decision_quorum tests (Wave 3) ---
1049
1050    fn sample_decision() -> PendingDecision {
1051        PendingDecision {
1052            id: "pa-1".to_string(),
1053            approved: true,
1054            decider: "ai:approver".to_string(),
1055        }
1056    }
1057
1058    #[tokio::test]
1059    async fn pending_decision_quorum_two_peers_ack_meets_quorum() {
1060        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1061        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1062        let cfg = build_config(vec![url1, url2], 2, 2000);
1063        let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
1064            .await
1065            .unwrap();
1066        assert!(finalise_quorum(&tracker).is_ok());
1067        for _ in 0..20 {
1068            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1069                break;
1070            }
1071            tokio::time::sleep(Duration::from_millis(10)).await;
1072        }
1073        assert_eq!(count1.load(Ordering::Relaxed), 1);
1074        assert_eq!(count2.load(Ordering::Relaxed), 1);
1075    }
1076
1077    #[tokio::test]
1078    async fn pending_decision_quorum_partition_minority_fails() {
1079        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1080        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1081        let cfg = build_config(vec![url1, url2], 3, 500);
1082        let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
1083            .await
1084            .unwrap();
1085        let err = finalise_quorum(&tracker).unwrap_err();
1086        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
1087    }
1088
1089    // --- broadcast_namespace_meta_quorum tests (Wave 3) ---
1090
1091    fn sample_namespace_meta() -> NamespaceMetaEntry {
1092        NamespaceMetaEntry {
1093            namespace: "app/team".to_string(),
1094            standard_id: "mem-std-1".to_string(),
1095            parent_namespace: Some("app".to_string()),
1096            updated_at: chrono::Utc::now().to_rfc3339(),
1097        }
1098    }
1099
1100    #[tokio::test]
1101    async fn namespace_meta_quorum_two_peers_ack_meets_quorum() {
1102        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1103        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1104        let cfg = build_config(vec![url1, url2], 2, 2000);
1105        let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
1106            .await
1107            .unwrap();
1108        assert!(finalise_quorum(&tracker).is_ok());
1109        for _ in 0..20 {
1110            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1111                break;
1112            }
1113            tokio::time::sleep(Duration::from_millis(10)).await;
1114        }
1115        assert_eq!(count1.load(Ordering::Relaxed), 1);
1116        assert_eq!(count2.load(Ordering::Relaxed), 1);
1117    }
1118
1119    #[tokio::test]
1120    async fn namespace_meta_quorum_partition_minority_fails() {
1121        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1122        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1123        let cfg = build_config(vec![url1, url2], 3, 500);
1124        let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
1125            .await
1126            .unwrap();
1127        let err = finalise_quorum(&tracker).unwrap_err();
1128        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
1129    }
1130
1131    // --- broadcast_namespace_meta_clear_quorum tests (Wave 3) ---
1132
1133    #[tokio::test]
1134    async fn namespace_meta_clear_quorum_two_peers_ack_meets_quorum() {
1135        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
1136        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
1137        let cfg = build_config(vec![url1, url2], 2, 2000);
1138        let namespaces = vec!["app/team".to_string(), "app/other".to_string()];
1139        let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
1140            .await
1141            .unwrap();
1142        assert!(finalise_quorum(&tracker).is_ok());
1143        for _ in 0..20 {
1144            if count1.load(Ordering::Relaxed) == 1 && count2.load(Ordering::Relaxed) == 1 {
1145                break;
1146            }
1147            tokio::time::sleep(Duration::from_millis(10)).await;
1148        }
1149        assert_eq!(count1.load(Ordering::Relaxed), 1);
1150        assert_eq!(count2.load(Ordering::Relaxed), 1);
1151    }
1152
1153    #[tokio::test]
1154    async fn namespace_meta_clear_quorum_partition_minority_fails() {
1155        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1156        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
1157        let cfg = build_config(vec![url1, url2], 3, 500);
1158        let namespaces = vec!["app/team".to_string()];
1159        let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
1160            .await
1161            .unwrap();
1162        let err = finalise_quorum(&tracker).unwrap_err();
1163        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
1164    }
1165
1166    // --- QuorumNotMetPayload::from_err branch coverage (Wave 3) ---
1167    //
1168    // The non-Timeout reasons (Unreachable, IdDrift, InFlight) and the
1169    // non-QuorumNotMet variants (InvalidPolicy, LocalWriteFailed) were
1170    // never exercised — `from_err` had only the Timeout path covered.
1171
1172    #[test]
1173    fn quorum_not_met_payload_unreachable_reason() {
1174        let err = QuorumError::QuorumNotMet {
1175            got: 1,
1176            needed: 2,
1177            reason: QuorumFailureReason::Unreachable,
1178        };
1179        let payload = QuorumNotMetPayload::from_err(&err);
1180        assert_eq!(payload.reason, "unreachable");
1181    }
1182
1183    #[test]
1184    fn quorum_not_met_payload_id_drift_reason() {
1185        let err = QuorumError::QuorumNotMet {
1186            got: 1,
1187            needed: 2,
1188            reason: QuorumFailureReason::IdDrift,
1189        };
1190        let payload = QuorumNotMetPayload::from_err(&err);
1191        assert_eq!(payload.reason, "id_drift");
1192    }
1193
1194    #[test]
1195    fn quorum_not_met_payload_in_flight_reason_maps_to_timeout() {
1196        // InFlight is a transient internal state; HTTP payload maps it to
1197        // "timeout" rather than leaking a fourth public reason string.
1198        let err = QuorumError::QuorumNotMet {
1199            got: 1,
1200            needed: 2,
1201            reason: QuorumFailureReason::InFlight,
1202        };
1203        let payload = QuorumNotMetPayload::from_err(&err);
1204        assert_eq!(payload.reason, "timeout");
1205    }
1206
1207    #[test]
1208    fn quorum_not_met_payload_invalid_policy_branch() {
1209        let err = QuorumError::InvalidPolicy {
1210            detail: "bad-thing".to_string(),
1211        };
1212        let payload = QuorumNotMetPayload::from_err(&err);
1213        assert_eq!(payload.error, "quorum_not_met");
1214        assert_eq!(payload.got, 0);
1215        assert_eq!(payload.needed, 0);
1216        assert!(payload.reason.starts_with("invalid_policy:"));
1217        assert!(payload.reason.contains("bad-thing"));
1218    }
1219
1220    #[test]
1221    fn quorum_not_met_payload_local_write_failed_branch() {
1222        let err = QuorumError::LocalWriteFailed {
1223            detail: "disk-full".to_string(),
1224        };
1225        let payload = QuorumNotMetPayload::from_err(&err);
1226        assert_eq!(payload.error, "quorum_not_met");
1227        assert!(payload.reason.starts_with("local_write_failed:"));
1228        assert!(payload.reason.contains("disk-full"));
1229    }
1230
1231    // --- FederationConfig::build coverage (Wave 3) ---
1232
1233    #[test]
1234    fn config_build_constructs_when_w_and_peers_set() {
1235        let cfg = FederationConfig::build(
1236            2,
1237            &[
1238                "http://peer-a.example/".to_string(),
1239                "http://peer-b.example".to_string(),
1240            ],
1241            Duration::from_millis(500),
1242            None,
1243            None,
1244            None,
1245            "ai:builder".to_string(),
1246            None,
1247        )
1248        .unwrap()
1249        .expect("config should be Some when w>0 and peers nonempty");
1250        assert_eq!(cfg.peer_count(), 2);
1251        assert_eq!(cfg.peers[0].id, "peer-0");
1252        assert_eq!(cfg.peers[1].id, "peer-1");
1253        // Trailing slash is stripped during URL normalization.
1254        assert_eq!(
1255            cfg.peers[0].sync_push_url,
1256            "http://peer-a.example/api/v1/sync/push"
1257        );
1258        assert_eq!(
1259            cfg.peers[1].sync_push_url,
1260            "http://peer-b.example/api/v1/sync/push"
1261        );
1262        assert_eq!(cfg.sender_agent_id, "ai:builder");
1263    }
1264
1265    #[test]
1266    fn config_build_rejects_duplicate_peer_urls() {
1267        let result = FederationConfig::build(
1268            2,
1269            &[
1270                "http://peer.example".to_string(),
1271                "http://peer.example/".to_string(),
1272            ],
1273            Duration::from_millis(500),
1274            None,
1275            None,
1276            None,
1277            "ai:builder".to_string(),
1278            None,
1279        );
1280        let err = match result {
1281            Ok(_) => panic!("expected duplicate-URL rejection"),
1282            Err(e) => e,
1283        };
1284        let msg = format!("{err}");
1285        assert!(
1286            msg.contains("duplicate peer URL"),
1287            "expected duplicate-URL rejection, got {msg:?}"
1288        );
1289    }
1290
1291    #[test]
1292    fn config_build_rejects_missing_ca_cert_path() {
1293        // ca_cert_path supplied but file doesn't exist → read error
1294        let bogus = std::path::PathBuf::from("/definitely/does/not/exist/ca.pem");
1295        let result = FederationConfig::build(
1296            2,
1297            &["http://peer.example".to_string()],
1298            Duration::from_millis(500),
1299            None,
1300            None,
1301            Some(&bogus),
1302            "ai:builder".to_string(),
1303            None,
1304        );
1305        let err = match result {
1306            Ok(_) => panic!("expected ca-cert read error"),
1307            Err(e) => e,
1308        };
1309        let msg = format!("{err}");
1310        assert!(
1311            msg.contains("read --quorum-ca-cert"),
1312            "expected ca-cert read error, got {msg:?}"
1313        );
1314    }
1315
1316    #[test]
1317    fn config_build_rejects_invalid_ca_cert_pem() {
1318        // Write a non-PEM file and confirm parse-side rejection.
1319        let dir = tempfile::tempdir().unwrap();
1320        let bad = dir.path().join("not-a-cert.pem");
1321        std::fs::write(&bad, b"this is not a valid pem certificate").unwrap();
1322        let result = FederationConfig::build(
1323            2,
1324            &["http://peer.example".to_string()],
1325            Duration::from_millis(500),
1326            None,
1327            None,
1328            Some(&bad),
1329            "ai:builder".to_string(),
1330            None,
1331        );
1332        let err = match result {
1333            Ok(_) => panic!("expected ca-cert parse error"),
1334            Err(e) => e,
1335        };
1336        let msg = format!("{err}");
1337        assert!(
1338            msg.contains("parse --quorum-ca-cert") || msg.contains("--quorum-ca-cert"),
1339            "expected ca-cert parse error, got {msg:?}"
1340        );
1341    }
1342
1343    #[test]
1344    fn config_build_rejects_missing_client_cert_path() {
1345        let bogus_cert = std::path::PathBuf::from("/definitely/missing/cert.pem");
1346        let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
1347        let result = FederationConfig::build(
1348            2,
1349            &["http://peer.example".to_string()],
1350            Duration::from_millis(500),
1351            Some(&bogus_cert),
1352            Some(&bogus_key),
1353            None,
1354            "ai:builder".to_string(),
1355            None,
1356        );
1357        let err = match result {
1358            Ok(_) => panic!("expected client-cert read error"),
1359            Err(e) => e,
1360        };
1361        let msg = format!("{err}");
1362        assert!(
1363            msg.contains("read --client-cert"),
1364            "expected client-cert read error, got {msg:?}"
1365        );
1366    }
1367
1368    #[test]
1369    fn peer_count_matches_peer_list() {
1370        let cfg = build_config(
1371            vec![
1372                "http://a.example".to_string(),
1373                "http://b.example".to_string(),
1374                "http://c.example".to_string(),
1375            ],
1376            2,
1377            500,
1378        );
1379        assert_eq!(cfg.peer_count(), 3);
1380    }
1381
1382    // --- urlencoding_encode coverage (Wave 3) ---
1383
1384    #[test]
1385    fn urlencoding_encode_passthrough_safe_chars() {
1386        // ASCII alpha-numeric + RFC3986 unreserved (-_.~) pass through.
1387        let encoded = urlencoding_encode("abcXYZ-09_.~");
1388        assert_eq!(encoded, "abcXYZ-09_.~");
1389    }
1390
1391    #[test]
1392    fn urlencoding_encode_percent_encodes_reserved_and_high_bits() {
1393        // Space, colon, plus, slash all get percent-encoded.
1394        let encoded = urlencoding_encode("2026-04-26T12:00:00+00:00 / x");
1395        assert!(
1396            encoded.contains("%3A"),
1397            "expected colon to be percent-encoded: {encoded}"
1398        );
1399        assert!(
1400            encoded.contains("%2B"),
1401            "expected + to be percent-encoded: {encoded}"
1402        );
1403        assert!(
1404            encoded.contains("%2F"),
1405            "expected / to be percent-encoded: {encoded}"
1406        );
1407        assert!(
1408            encoded.contains("%20"),
1409            "expected space to be percent-encoded: {encoded}"
1410        );
1411        // Hyphen IS in the unreserved set → must NOT be percent-encoded.
1412        assert!(
1413            !encoded.contains("%2D"),
1414            "hyphen must pass through unencoded: {encoded}"
1415        );
1416    }
1417
1418    #[test]
1419    fn urlencoding_encode_empty_string() {
1420        assert_eq!(urlencoding_encode(""), "");
1421    }
1422
1423    // --- broadcast_store_quorum id-drift path (Wave 3) ---
1424    //
1425    // The `IdDrift` arm in post_once + broadcast_store_quorum (lines around
1426    // 243-244 / 362-366) was uncovered. A peer that returns a 200 with an
1427    // `ids` array NOT containing the expected memory id should be classified
1428    // as IdDrift, not Ack.
1429
1430    async fn id_drift_handler(
1431        AxumJson(_body): AxumJson<serde_json::Value>,
1432    ) -> (StatusCode, AxumJson<serde_json::Value>) {
1433        // 200 OK but ids[0] disagrees with the memory the leader sent.
1434        (
1435            StatusCode::OK,
1436            AxumJson(serde_json::json!({"ids": ["some-other-id"], "applied": 1})),
1437        )
1438    }
1439
1440    async fn spawn_id_drift_peer() -> String {
1441        let app = Router::new().route("/api/v1/sync/push", post(id_drift_handler));
1442        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1443        let addr = listener.local_addr().unwrap();
1444        tokio::spawn(async move {
1445            axum::serve(listener, app).await.ok();
1446        });
1447        format!("http://{addr}")
1448    }
1449
1450    #[tokio::test]
1451    async fn id_drift_peer_does_not_count_as_ack() {
1452        // Two peers, both return 200 but with `ids: [other-id]`. Quorum
1453        // can't be met because neither counts as a peer ack — only the
1454        // local commit registers.
1455        let url1 = spawn_id_drift_peer().await;
1456        let url2 = spawn_id_drift_peer().await;
1457        let cfg = build_config(vec![url1, url2], 2, 1000);
1458        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
1459            .await
1460            .unwrap();
1461        let result = finalise_quorum(&tracker);
1462        // With W=2, N=3 (local + 2 peers), local + 0 peer-acks = 1 < 2.
1463        let err = result.unwrap_err();
1464        match err {
1465            QuorumError::QuorumNotMet {
1466                got,
1467                needed,
1468                reason,
1469            } => {
1470                assert_eq!(got, 1, "only local should count");
1471                assert_eq!(needed, 2);
1472                // IdDrift / Timeout / InFlight are all valid here. The
1473                // tracker classifies based on whether ANY peer reported a
1474                // drift (IdDrift), the deadline elapsed first (Timeout),
1475                // or all peers reported but the deadline still hadn't
1476                // passed when finalise was called (InFlight). The
1477                // important invariant is just "peer with drifted ids does
1478                // NOT count toward quorum".
1479                assert!(
1480                    matches!(
1481                        reason,
1482                        QuorumFailureReason::IdDrift
1483                            | QuorumFailureReason::Timeout
1484                            | QuorumFailureReason::InFlight
1485                    ),
1486                    "expected IdDrift / Timeout / InFlight, got {reason:?}"
1487                );
1488            }
1489            other => panic!("expected QuorumNotMet, got {other:?}"),
1490        }
1491    }
1492
1493    // -----------------------------------------------------------------
1494    // W9 (v0.6.3) — catchup_once + spawn_catchup_loop coverage.
1495    //
1496    // Lines 1406-1525 of `federation.rs` were uncovered through W3 because
1497    // they require a mock peer that serves `/api/v1/sync/since`, plus a
1498    // real `Db` to track the sync_state vector clock between ticks. We
1499    // reuse the existing in-process axum mock-peer pattern (see
1500    // `spawn_mock_peer` above) and a `:memory:` rusqlite handle.
1501    // -----------------------------------------------------------------
1502
1503    /// Behaviours the `/api/v1/sync/since` mock peer can take. Each variant
1504    /// is a single canned response shape — we don't need long-running
1505    /// stateful peers for catchup coverage because `catchup_once` is a
1506    /// one-shot function.
1507    #[derive(Clone)]
1508    enum SinceMockBehaviour {
1509        /// Return a 200 with `{ "memories": <list> }` on every call.
1510        ReturnMemories(Vec<Memory>),
1511        /// Return a 500 server error.
1512        Error500,
1513        /// Sleep `delay` then return memories (used for client-timeout test).
1514        Hang(Duration),
1515        /// Return 200 but with a non-JSON body so `resp.json()` fails.
1516        MalformedBody,
1517    }
1518
1519    #[derive(Clone)]
1520    struct SinceMockState {
1521        behaviour: SinceMockBehaviour,
1522        hits: Arc<AtomicUsize>,
1523        last_since: Arc<Mutex<Option<String>>>,
1524        last_peer: Arc<Mutex<Option<String>>>,
1525    }
1526
1527    async fn since_handler(
1528        axum::extract::Query(q): axum::extract::Query<std::collections::HashMap<String, String>>,
1529        axum::extract::State(state): axum::extract::State<SinceMockState>,
1530    ) -> axum::response::Response {
1531        use axum::response::IntoResponse;
1532        state.hits.fetch_add(1, Ordering::Relaxed);
1533        {
1534            let mut s = state.last_since.lock().await;
1535            *s = q.get("since").cloned();
1536        }
1537        {
1538            let mut p = state.last_peer.lock().await;
1539            *p = q.get("peer").cloned();
1540        }
1541        match &state.behaviour {
1542            SinceMockBehaviour::ReturnMemories(mems) => {
1543                let body = serde_json::json!({"memories": mems});
1544                (StatusCode::OK, AxumJson(body)).into_response()
1545            }
1546            SinceMockBehaviour::Error500 => (
1547                StatusCode::INTERNAL_SERVER_ERROR,
1548                AxumJson(serde_json::json!({"error":"oops"})),
1549            )
1550                .into_response(),
1551            SinceMockBehaviour::Hang(d) => {
1552                tokio::time::sleep(*d).await;
1553                (
1554                    StatusCode::OK,
1555                    AxumJson(serde_json::json!({"memories": []})),
1556                )
1557                    .into_response()
1558            }
1559            SinceMockBehaviour::MalformedBody => {
1560                // 200 OK but the body is not JSON — `resp.json::<Value>()`
1561                // will return an Err on the parse step.
1562                (
1563                    [(axum::http::header::CONTENT_TYPE, crate::MIME_JSON)],
1564                    "this is not json {{{",
1565                )
1566                    .into_response()
1567            }
1568        }
1569    }
1570
1571    /// Spawn a `/api/v1/sync/since` mock and return its base URL plus the
1572    /// hit-counter and last-query-param tracker.
1573    async fn spawn_since_peer(
1574        behaviour: SinceMockBehaviour,
1575    ) -> (
1576        String,
1577        Arc<AtomicUsize>,
1578        Arc<Mutex<Option<String>>>,
1579        Arc<Mutex<Option<String>>>,
1580    ) {
1581        let hits = Arc::new(AtomicUsize::new(0));
1582        let last_since = Arc::new(Mutex::new(None));
1583        let last_peer = Arc::new(Mutex::new(None));
1584        let state = SinceMockState {
1585            behaviour,
1586            hits: hits.clone(),
1587            last_since: last_since.clone(),
1588            last_peer: last_peer.clone(),
1589        };
1590        let app = Router::new()
1591            .route("/api/v1/sync/since", axum::routing::get(since_handler))
1592            .with_state(state);
1593        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1594        let addr = listener.local_addr().unwrap();
1595        tokio::spawn(async move {
1596            axum::serve(listener, app).await.ok();
1597        });
1598        (format!("http://{addr}"), hits, last_since, last_peer)
1599    }
1600
1601    /// Build an in-memory `Db` matching `handlers::Db` shape. Catchup only
1602    /// uses `lock().await.0` (the `Connection`), so the path / TTL / pragma
1603    /// fields can be defaults.
1604    fn build_test_db() -> crate::handlers::Db {
1605        let conn = crate::db::open(std::path::Path::new(":memory:")).unwrap();
1606        let path = std::path::PathBuf::from(":memory:");
1607        Arc::new(Mutex::new((
1608            conn,
1609            path,
1610            crate::config::ResolvedTtl::default(),
1611            true,
1612        )))
1613    }
1614
1615    /// Build a `FederationConfig` whose peer's `id` matches the segment we
1616    /// pull from sync_state — `peer-0`. This mirrors the production
1617    /// invariant: the catchup loop keys vector-clock entries by peer.id.
1618    /// We intentionally use the W9-shape (id = "peer-0") here rather than
1619    /// the W3-shape ("peer-0:<url>") because `catchup_once`'s url-trim path
1620    /// depends on the trailing `/api/v1/sync/push` and the id stays opaque
1621    /// either way — but the simpler shape is also closer to production.
1622    fn build_catchup_cfg(peer_url: &str, timeout_ms: u64) -> FederationConfig {
1623        let client = reqwest::Client::builder()
1624            .timeout(Duration::from_millis(timeout_ms))
1625            .build()
1626            .unwrap();
1627        FederationConfig {
1628            policy: QuorumPolicy::new(
1629                2,
1630                1,
1631                Duration::from_millis(timeout_ms),
1632                Duration::from_secs(30),
1633            )
1634            .unwrap(),
1635            peers: vec![PeerEndpoint {
1636                id: "peer-0".to_string(),
1637                sync_push_url: format!("{peer_url}/api/v1/sync/push"),
1638            }],
1639            client,
1640            sender_agent_id: "ai:catchup-test".to_string(),
1641            api_key: None,
1642            signing_key: None,
1643            #[cfg(feature = "sal")]
1644            dlq_sink: None,
1645        }
1646    }
1647
1648    /// Memory factory dedicated to catchup tests — every memory gets a
1649    /// unique title so `insert_if_newer`'s ON CONFLICT(title, namespace)
1650    /// path doesn't collapse them into one row. Timestamp is a fixed
1651    /// progression so the test asserts deterministic ordering.
1652    fn catchup_memory(title: &str, updated_at: &str) -> Memory {
1653        Memory {
1654            id: format!("cat-{title}"),
1655            tier: crate::models::Tier::Mid,
1656            namespace: "catchup".to_string(),
1657            title: title.to_string(),
1658            content: format!("content for {title}"),
1659            tags: vec!["catchup".to_string()],
1660            priority: 5,
1661            confidence: 1.0,
1662            // `validate_memory` enforces a source-allowlist (user, claude,
1663            // hook, api, cli, import, consolidation, system, chaos, notify).
1664            // Use "system" so catchup_once's `validate_memory(&mem).is_err()`
1665            // skip-branch isn't tripped — that's what we're trying NOT to
1666            // exercise in the happy-path tests below.
1667            source: "system".to_string(),
1668            access_count: 0,
1669            created_at: updated_at.to_string(),
1670            updated_at: updated_at.to_string(),
1671            last_accessed_at: None,
1672            expires_at: None,
1673            // #910 — mark scope=collective so the test's post-catchup
1674            // `store.get(&CallerContext::for_agent("test"), ...)` round-
1675            // trip doesn't trip the SAL-level scope=private filter.
1676            // Real-world catchup uses `for_admin` and bypasses the
1677            // filter; the test fixtures need `scope=collective` to
1678            // round-trip via tenant-scoped reads.
1679            metadata: serde_json::json!({
1680                "agent_id": "ai:peer-0",
1681                "scope": "collective",
1682            }),
1683            reflection_depth: 0,
1684            memory_kind: crate::models::MemoryKind::Observation,
1685            entity_id: None,
1686            persona_version: None,
1687            citations: Vec::new(),
1688            source_uri: None,
1689            source_span: None,
1690            confidence_source: crate::models::ConfidenceSource::CallerProvided,
1691            confidence_signals: None,
1692            confidence_decayed_at: None,
1693            version: 1,
1694        }
1695    }
1696
1697    // ---- catchup_once: pulls `since`, advances state ----
1698
1699    #[tokio::test]
1700    async fn test_catchup_once_pulls_since_cursor_advances_state() {
1701        // First-time catchup with empty sync_state: we expect the request
1702        // to land WITHOUT a `since` query param, and after the call
1703        // sync_state should be advanced to the latest memory's timestamp.
1704        let mems = vec![
1705            catchup_memory("a", "2026-04-26T10:00:00Z"),
1706            catchup_memory("b", "2026-04-26T10:00:01Z"),
1707            catchup_memory("c", "2026-04-26T10:00:02Z"),
1708            catchup_memory("d", "2026-04-26T10:00:03Z"),
1709            catchup_memory("e", "2026-04-26T10:00:04Z"),
1710        ];
1711        let latest_ts = mems.last().unwrap().updated_at.clone();
1712        let (url, hits, last_since, last_peer) =
1713            spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems.clone())).await;
1714        let cfg = build_catchup_cfg(&url, 2000);
1715        let db = build_test_db();
1716
1717        catchup_once(&cfg, &db).await;
1718
1719        assert_eq!(hits.load(Ordering::Relaxed), 1, "peer hit exactly once");
1720        // First-time call → no `since` query param.
1721        assert!(
1722            last_since.lock().await.is_none(),
1723            "first catchup must omit since"
1724        );
1725        // Local agent id is forwarded.
1726        assert_eq!(last_peer.lock().await.as_deref(), Some("ai:catchup-test"));
1727        // sync_state advanced to the latest memory's timestamp.
1728        let lock = db.lock().await;
1729        let clock =
1730            crate::db::sync_state_load(&lock.0, "ai:catchup-test").expect("load sync state");
1731        assert_eq!(
1732            clock.entries.get("peer-0").map(String::as_str),
1733            Some(latest_ts.as_str()),
1734            "sync state advanced to latest pulled memory's updated_at"
1735        );
1736        // All 5 memories landed.
1737        let count: i64 = lock
1738            .0
1739            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
1740            .unwrap();
1741        assert_eq!(count, 5, "all five memories inserted");
1742    }
1743
1744    // ---- catchup_once: empty array no-op ----
1745
1746    #[tokio::test]
1747    async fn test_catchup_once_no_new_memories_no_op() {
1748        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
1749        let cfg = build_catchup_cfg(&url, 2000);
1750        let db = build_test_db();
1751
1752        catchup_once(&cfg, &db).await;
1753
1754        assert_eq!(hits.load(Ordering::Relaxed), 1);
1755        let lock = db.lock().await;
1756        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1757        assert!(
1758            clock.entries.get("peer-0").is_none(),
1759            "empty response must not advance sync_state"
1760        );
1761        let count: i64 = lock
1762            .0
1763            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
1764            .unwrap();
1765        assert_eq!(count, 0);
1766    }
1767
1768    // ---- catchup_once: 5xx error swallowed, state untouched ----
1769
1770    #[tokio::test]
1771    async fn test_catchup_once_peer_500_error_logged_no_panic() {
1772        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::Error500).await;
1773        let cfg = build_catchup_cfg(&url, 2000);
1774        let db = build_test_db();
1775
1776        // Must NOT panic. The function logs at debug! and continues.
1777        catchup_once(&cfg, &db).await;
1778
1779        assert_eq!(hits.load(Ordering::Relaxed), 1);
1780        let lock = db.lock().await;
1781        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1782        assert!(
1783            clock.entries.get("peer-0").is_none(),
1784            "500 must not advance sync state"
1785        );
1786    }
1787
1788    // ---- catchup_once: timeout swallowed ----
1789
1790    #[tokio::test]
1791    async fn test_catchup_once_peer_timeout_handled() {
1792        // Mock hangs for 2s, client timeout is 200ms → reqwest returns Err,
1793        // catchup logs at debug! and skips this peer.
1794        let (url, hits, _, _) =
1795            spawn_since_peer(SinceMockBehaviour::Hang(Duration::from_secs(2))).await;
1796        let cfg = build_catchup_cfg(&url, 200);
1797        let db = build_test_db();
1798
1799        let start = Instant::now();
1800        catchup_once(&cfg, &db).await;
1801        let elapsed = start.elapsed();
1802
1803        // Must return promptly after the client-timeout fires, not after
1804        // the full 2s mock-side hang.
1805        assert!(
1806            elapsed < Duration::from_millis(1500),
1807            "catchup_once should honour the client timeout, took {elapsed:?}"
1808        );
1809        assert_eq!(hits.load(Ordering::Relaxed), 1, "request was sent");
1810        let lock = db.lock().await;
1811        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1812        assert!(clock.entries.get("peer-0").is_none());
1813    }
1814
1815    // ---- catchup_once: malformed JSON body ----
1816
1817    #[tokio::test]
1818    async fn test_catchup_once_malformed_response_handled() {
1819        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::MalformedBody).await;
1820        let cfg = build_catchup_cfg(&url, 2000);
1821        let db = build_test_db();
1822
1823        // No panic — the function `tracing::warn!`s and skips the peer.
1824        catchup_once(&cfg, &db).await;
1825
1826        assert_eq!(hits.load(Ordering::Relaxed), 1);
1827        let lock = db.lock().await;
1828        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1829        assert!(
1830            clock.entries.get("peer-0").is_none(),
1831            "malformed body must not advance sync state"
1832        );
1833    }
1834
1835    // ---- catchup_once: only newer memories overwrite local ----
1836
1837    #[tokio::test]
1838    async fn test_catchup_once_inserts_only_newer_memories() {
1839        // Pre-seed local DB with a memory titled "shared" at t=10:00:01.
1840        // Mock peer returns:
1841        //   - "shared" at t=10:00:00  (older — must NOT clobber local)
1842        //   - "fresh"  at t=10:00:02  (new title — must insert)
1843        let db = build_test_db();
1844        {
1845            let lock = db.lock().await;
1846            let local = catchup_memory("shared", "2026-04-26T10:00:01Z");
1847            // Insert via the test path — this is the "we already have it
1848            // locally at a newer timestamp" precondition.
1849            crate::db::insert_if_newer(&lock.0, &local).unwrap();
1850            // Confirm pre-state.
1851            let cnt: i64 = lock
1852                .0
1853                .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
1854                .unwrap();
1855            assert_eq!(cnt, 1, "pre-seeded shared row");
1856        }
1857
1858        let mut stale_shared = catchup_memory("shared", "2026-04-26T10:00:00Z");
1859        // Distinct content so the "did the older catchup body win?" assertion
1860        // is meaningful — base catchup_memory derives content from title.
1861        stale_shared.content = "stale-from-catchup-peer".to_string();
1862        stale_shared.id = "cat-shared-OLD".to_string();
1863        let stale_shared_content = stale_shared.content.clone();
1864        let new_fresh = catchup_memory("fresh", "2026-04-26T10:00:02Z");
1865        let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![
1866            stale_shared,
1867            new_fresh,
1868        ]))
1869        .await;
1870        let cfg = build_catchup_cfg(&url, 2000);
1871
1872        catchup_once(&cfg, &db).await;
1873
1874        let lock = db.lock().await;
1875        // Both rows now exist.
1876        let cnt: i64 = lock
1877            .0
1878            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
1879            .unwrap();
1880        assert_eq!(cnt, 2, "fresh row inserted, shared kept");
1881        // The "shared" row's content must still be the locally-seeded
1882        // version (older catchup body did NOT win).
1883        let shared_content: String = lock
1884            .0
1885            .query_row(
1886                "SELECT content FROM memories WHERE title = 'shared' AND namespace = 'catchup'",
1887                [],
1888                |r| r.get(0),
1889            )
1890            .unwrap();
1891        assert_ne!(
1892            shared_content, stale_shared_content,
1893            "older catchup memory must NOT overwrite newer local row"
1894        );
1895        // sync_state advanced to the LATEST timestamp seen, not to the
1896        // one we actually applied — function tracks `latest_ts` over the
1897        // whole batch.
1898        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
1899        assert_eq!(
1900            clock.entries.get("peer-0").map(String::as_str),
1901            Some("2026-04-26T10:00:02Z"),
1902        );
1903    }
1904
1905    // ---- spawn_catchup_loop: runs at interval (paused-time) ----
1906
1907    #[tokio::test(start_paused = true)]
1908    async fn test_spawn_catchup_loop_runs_at_interval() {
1909        // The loop sleeps 5s up-front then ticks every `interval`. With
1910        // paused time, advance past the initial sleep and one full tick
1911        // and assert the mock saw at least one hit.
1912        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
1913        let cfg = build_catchup_cfg(&url, 5000);
1914        let db = build_test_db();
1915
1916        let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(60));
1917
1918        // Advance past the 5s startup delay + give the first catchup_once
1919        // a slice of real wall-clock to actually execute the network call.
1920        // Paused time still yields() between awaits; the network IO is
1921        // not virtualized — so we step in chunks separated by yields.
1922        for _ in 0..6 {
1923            tokio::time::advance(Duration::from_secs(1)).await;
1924            tokio::task::yield_now().await;
1925        }
1926        // Allow the spawned reqwest::send to actually complete on the
1927        // real runtime — a small real-time wait covers in-process axum
1928        // round-trip latency without paused-time interference.
1929        for _ in 0..50 {
1930            if hits.load(Ordering::Relaxed) >= 1 {
1931                break;
1932            }
1933            tokio::task::yield_now().await;
1934            tokio::time::advance(Duration::from_millis(10)).await;
1935        }
1936
1937        assert!(
1938            hits.load(Ordering::Relaxed) >= 1,
1939            "first catchup tick must hit the mock peer (got {})",
1940            hits.load(Ordering::Relaxed),
1941        );
1942
1943        handle.abort();
1944    }
1945
1946    // ---- spawn_catchup_loop: aborts cleanly on handle drop ----
1947
1948    #[tokio::test]
1949    async fn test_spawn_catchup_loop_aborts_cleanly_on_handle_drop() {
1950        // Drop the JoinHandle (via abort) and confirm the task ends quickly
1951        // — no lingering tasks, no panics from being killed mid-tick.
1952        let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
1953        let cfg = build_catchup_cfg(&url, 2000);
1954        let db = build_test_db();
1955
1956        let handle = spawn_catchup_loop(cfg, db, Duration::from_secs(crate::SECS_PER_HOUR as u64));
1957        // Don't let it run a full 5s startup-sleep. Abort and confirm
1958        // the join future resolves promptly with a Cancelled error.
1959        handle.abort();
1960        let result = tokio::time::timeout(Duration::from_millis(500), handle).await;
1961        let join = result.expect("aborted handle must resolve within 500ms");
1962        assert!(
1963            join.is_err() && join.unwrap_err().is_cancelled(),
1964            "handle.abort() must surface as is_cancelled() == true"
1965        );
1966    }
1967
1968    // ---- mTLS client-cert flow: build_config happy path ----
1969
1970    #[test]
1971    fn test_build_config_mtls_with_valid_files() {
1972        // Use the existing rcgen-generated test fixtures (PEM cert +
1973        // PKCS#8 key). The build path concatenates them into one PEM
1974        // and feeds that to `reqwest::Identity::from_pem`. We only need
1975        // to assert the client builds — TLS handshake itself isn't part
1976        // of this path's contract.
1977        let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1978            .join("tests/fixtures/tls/valid_cert.pem");
1979        let key = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1980            .join("tests/fixtures/tls/valid_key_pkcs8.pem");
1981        // Sanity: fixtures exist on disk.
1982        assert!(cert.exists(), "missing test fixture: {cert:?}");
1983        assert!(key.exists(), "missing test fixture: {key:?}");
1984
1985        let result = FederationConfig::build(
1986            2,
1987            &["http://peer.example".to_string()],
1988            Duration::from_millis(500),
1989            Some(&cert),
1990            Some(&key),
1991            None,
1992            "ai:builder".to_string(),
1993            None,
1994        );
1995        let cfg = match result {
1996            Ok(Some(c)) => c,
1997            Ok(None) => panic!("expected Some(FederationConfig), got None"),
1998            Err(e) => panic!("expected Ok, got Err: {e}"),
1999        };
2000        assert_eq!(cfg.peer_count(), 1);
2001    }
2002
2003    // ---- mTLS client-cert flow: missing key file errors ----
2004
2005    #[test]
2006    fn test_build_config_mtls_with_missing_files_returns_error() {
2007        // Cert path exists, key path doesn't → the second `read` errors
2008        // with "read --client-key:". This exercises the second arm of
2009        // the `(Some(cert), Some(key))` branch that the existing
2010        // `config_build_rejects_missing_client_cert_path` test (which
2011        // makes BOTH paths missing) doesn't reach.
2012        let cert = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2013            .join("tests/fixtures/tls/valid_cert.pem");
2014        let bogus_key = std::path::PathBuf::from("/definitely/missing/key.pem");
2015        assert!(cert.exists(), "missing test fixture: {cert:?}");
2016
2017        let result = FederationConfig::build(
2018            2,
2019            &["http://peer.example".to_string()],
2020            Duration::from_millis(500),
2021            Some(&cert),
2022            Some(&bogus_key),
2023            None,
2024            "ai:builder".to_string(),
2025            None,
2026        );
2027        let err = match result {
2028            Ok(_) => panic!("expected client-key read error"),
2029            Err(e) => e,
2030        };
2031        let msg = format!("{err}");
2032        assert!(
2033            msg.contains("read --client-key"),
2034            "expected client-key read error, got {msg:?}"
2035        );
2036    }
2037
2038    // -----------------------------------------------------------------
2039    // W12-G (v0.6.3) — federation.rs remaining edges (89.87% → 94%+).
2040    //
2041    // Targets the residual uncovered surface after W3 + W9 F9:
2042    //   - post_and_classify direct: persistent retry-fail and id-drift
2043    //     skip-retry paths.
2044    //   - bulk_catchup_push edge cases not previously reached
2045    //     (no-peers shortcut, mixed pass+fail outcomes).
2046    //   - Quorum-policy edges: W=1 single-peer-ack already returns,
2047    //     QuorumPolicy::majority convenience constructor, FederationConfig
2048    //     duplicate detection on trailing-slash and case differences.
2049    //   - Each broadcast_*_quorum has only the all-Ack and all-Fail
2050    //     paths — exercise the `Hang` (timeout-mid-loop) classification
2051    //     for the remaining variants so the inner `Ok(None) | Err(_)`
2052    //     break arm is hit on every flavour.
2053    //   - catchup_once: 5xx classified as "Ok(r) where !success" arm
2054    //     (F9 covers it once but with peer.id == "peer-0"; the
2055    //     ServerError + non-empty body path is already covered).
2056    //     New: peer URL whose `sync_push_url` does NOT carry the
2057    //     `/api/v1/sync/push` suffix — the trim_end_matches no-ops
2058    //     and the `since` URL is built from the raw base.
2059    //   - QuorumNotMetPayload: `from_err` on a peer-acks-empty result
2060    //     after the deadline (Unreachable variant via real broadcast).
2061    //
2062    // All tests reuse the in-process axum mock-peer infrastructure
2063    // (`spawn_mock_peer`, `spawn_since_peer`) and do not require disk.
2064    // -----------------------------------------------------------------
2065
2066    /// W12-G #1: `post_and_classify` returns `Fail` after retry also fails,
2067    /// and the failure string carries BOTH attempts' reasons (`first:` /
2068    /// `retry:` prefixes). Hits the `Fail(format!("first: {}; retry: {}"))`
2069    /// arm at lines ~437-440 directly — the outer broadcast tests only
2070    /// assert that quorum-not-met surfaces, not the format of the error.
2071    #[tokio::test]
2072    async fn post_and_classify_persistent_fail_concatenates_both_reasons() {
2073        let (url, count) = spawn_mock_peer(MockBehaviour::Fail).await;
2074        let client = reqwest::Client::builder()
2075            .timeout(Duration::from_millis(2000))
2076            .build()
2077            .unwrap();
2078        let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
2079        let target = format!("{url}/api/v1/sync/push");
2080
2081        let outcome =
2082            post_and_classify(&client, &target, &body, "mem-x", Some("mem-x"), None, None).await;
2083        match outcome {
2084            AckOutcome::Fail(reason) => {
2085                assert!(
2086                    reason.contains("first:") && reason.contains("retry:"),
2087                    "expected both attempts in reason, got {reason:?}"
2088                );
2089                // 5xx → both attempts should have classified as `http 500`.
2090                assert!(
2091                    reason.contains("http 500"),
2092                    "expected 5xx in reason, got {reason:?}"
2093                );
2094            }
2095            other => panic!("expected AckOutcome::Fail, got {other:?}"),
2096        }
2097        assert_eq!(
2098            count.load(Ordering::Relaxed),
2099            2,
2100            "first attempt + one retry = exactly two POSTs"
2101        );
2102    }
2103
2104    /// W12-G #2: `post_and_classify` does NOT retry on `IdDrift`. A peer
2105    /// that semantically disagrees on the id is not a transient failure;
2106    /// retrying would just observe the same disagreement. Hits the
2107    /// outer-match `IdDrift => IdDrift` arm at line ~410 (no inner retry
2108    /// dispatch) — distinct from the `Fail` arm that performs the retry.
2109    #[tokio::test]
2110    async fn post_and_classify_id_drift_does_not_retry() {
2111        // Hand-rolled mock that always 200's with a divergent id.
2112        let count = Arc::new(AtomicUsize::new(0));
2113        let cnt_clone = count.clone();
2114        let app = Router::new().route(
2115            "/api/v1/sync/push",
2116            post(move |AxumJson(_b): AxumJson<serde_json::Value>| {
2117                let c = cnt_clone.clone();
2118                async move {
2119                    c.fetch_add(1, Ordering::Relaxed);
2120                    (
2121                        StatusCode::OK,
2122                        AxumJson(serde_json::json!({"ids":["other-id"],"applied":1})),
2123                    )
2124                }
2125            }),
2126        );
2127        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2128        let addr = listener.local_addr().unwrap();
2129        tokio::spawn(async move {
2130            axum::serve(listener, app).await.ok();
2131        });
2132        let url = format!("http://{addr}/api/v1/sync/push");
2133
2134        let client = reqwest::Client::builder()
2135            .timeout(Duration::from_millis(2000))
2136            .build()
2137            .unwrap();
2138        let body = serde_json::json!({"sender_agent_id":"ai:test","memories":[]});
2139        let outcome =
2140            post_and_classify(&client, &url, &body, "mem-x", Some("mem-x"), None, None).await;
2141        assert!(
2142            matches!(outcome, AckOutcome::IdDrift),
2143            "expected IdDrift, got {outcome:?}"
2144        );
2145        assert_eq!(
2146            count.load(Ordering::Relaxed),
2147            1,
2148            "IdDrift must NOT trigger the retry path (only one POST)"
2149        );
2150    }
2151
2152    /// W12-G #3: `bulk_catchup_push` with no peers returns immediately
2153    /// without spawning. Hits the `if memories.is_empty() || config.peers
2154    /// .is_empty()` shortcut — the existing
2155    /// `bulk_catchup_push_empty_inputs_are_noop` covers `memories.is_empty()`
2156    /// only.
2157    #[tokio::test]
2158    async fn bulk_catchup_push_no_peers_is_noop() {
2159        let client = reqwest::Client::builder()
2160            .timeout(Duration::from_millis(500))
2161            .build()
2162            .unwrap();
2163        let cfg = FederationConfig {
2164            policy: QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30))
2165                .unwrap(),
2166            peers: Vec::new(),
2167            client,
2168            sender_agent_id: "ai:no-peers".to_string(),
2169            api_key: None,
2170            signing_key: None,
2171            #[cfg(feature = "sal")]
2172            dlq_sink: None,
2173        };
2174        // Non-empty memories list — the shortcut should still fire because
2175        // the peer list is empty.
2176        let mems = vec![sample_memory()];
2177        let errors = bulk_catchup_push(&cfg, &mems).await;
2178        assert!(
2179            errors.is_empty(),
2180            "no-peers catchup must return empty error vec immediately, got {errors:?}"
2181        );
2182    }
2183
2184    /// W12-G #4: `bulk_catchup_push` with mixed peer outcomes (one Ack,
2185    /// one Fail). The Ack peer must NOT appear in the error vec; the
2186    /// Fail peer MUST appear with its `peer.id` and an http-500 reason.
2187    /// Validates the per-peer error propagation more precisely than the
2188    /// existing `bulk_catchup_push_reports_peer_failures` — that test
2189    /// uses two failing peers.
2190    #[tokio::test]
2191    async fn bulk_catchup_push_mixed_outcomes_only_failing_peer_in_errors() {
2192        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2193        let (url2, count2) = spawn_mock_peer(MockBehaviour::Fail).await;
2194        let cfg = build_config(vec![url1, url2], 2, 2000);
2195        let mems = vec![sample_memory()];
2196        let errors = bulk_catchup_push(&cfg, &mems).await;
2197        assert_eq!(
2198            errors.len(),
2199            1,
2200            "exactly one failing peer should be in errors, got {errors:?}"
2201        );
2202        let (peer_id, reason) = &errors[0];
2203        // build_config assigns `peer-0:<url>` and `peer-1:<url>`. The
2204        // failing peer is the second one we registered.
2205        assert!(
2206            peer_id.starts_with("peer-1"),
2207            "failing peer should be peer-1, got {peer_id}"
2208        );
2209        assert!(
2210            reason.contains("http 500"),
2211            "expected http 500 reason, got {reason}"
2212        );
2213        // Both peers were called regardless.
2214        assert_eq!(count1.load(Ordering::Relaxed), 1);
2215        assert_eq!(count2.load(Ordering::Relaxed), 1);
2216    }
2217
2218    /// W12-G #5: W=1 quorum is met by the local commit alone — no peer
2219    /// ack needed. Even when every peer fails, the broadcast still
2220    /// returns Ok and `finalise_quorum` returns `Ok(1)`. Exercises the
2221    /// `is_quorum_met` early-exit path with `acks.len() == 0`.
2222    #[tokio::test]
2223    async fn quorum_w1_local_commit_alone_is_sufficient() {
2224        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2225        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2226        // W=1, N=3 — local commit is enough on its own.
2227        let cfg = build_config(vec![url1, url2], 1, 1000);
2228        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
2229            .await
2230            .unwrap();
2231        let count = finalise_quorum(&tracker).expect("W=1 must succeed on local commit alone");
2232        assert_eq!(count, 1, "W=1 quorum returns local-only count");
2233    }
2234
2235    /// W12-G #6: `QuorumPolicy::majority` builds the convenience config
2236    /// with `W = ceil((N+1)/2)`. N=3 → W=2; N=5 → W=3. The existing
2237    /// suite uses `QuorumPolicy::new` directly everywhere — `majority`
2238    /// goes uncovered.
2239    #[test]
2240    fn quorum_policy_majority_builds_with_ceil_n_plus_1_div_2() {
2241        let p3 = QuorumPolicy::majority(3).expect("N=3 majority builds");
2242        // public field for tests: re-derive via finalise round-trip if
2243        // the internal `w` is private. Instead use a lightweight
2244        // tracker-based check.
2245        let mut t = AckTracker::new(p3, Instant::now());
2246        t.record_local();
2247        // With W=2, local-only is NOT yet quorum.
2248        assert!(
2249            !t.is_quorum_met(Instant::now()),
2250            "majority-of-3 needs more than local"
2251        );
2252        t.record_peer_ack("peer-a");
2253        assert!(
2254            t.is_quorum_met(Instant::now()),
2255            "local + 1 peer ack = 2 = majority of 3"
2256        );
2257
2258        let p5 = QuorumPolicy::majority(5).expect("N=5 majority builds");
2259        let mut t5 = AckTracker::new(p5, Instant::now());
2260        t5.record_local();
2261        t5.record_peer_ack("a");
2262        assert!(
2263            !t5.is_quorum_met(Instant::now()),
2264            "majority-of-5 needs 3 acks"
2265        );
2266        t5.record_peer_ack("b");
2267        assert!(t5.is_quorum_met(Instant::now()), "local + 2 peers = 3");
2268    }
2269
2270    /// W12-G #7: `QuorumPolicy::majority(0)` rejects with InvalidPolicy.
2271    /// Hits the `n == 0` guard via the convenience constructor (the
2272    /// existing `quorum_not_met_payload_invalid_policy_branch` builds
2273    /// the error directly without going through `QuorumPolicy::new`).
2274    #[test]
2275    fn quorum_policy_majority_rejects_zero() {
2276        let err = QuorumPolicy::majority(0).expect_err("n=0 must be rejected");
2277        match err {
2278            QuorumError::InvalidPolicy { detail } => {
2279                assert!(
2280                    detail.contains("n must be"),
2281                    "expected n>=1 message, got {detail}"
2282                );
2283            }
2284            other => panic!("expected InvalidPolicy, got {other:?}"),
2285        }
2286    }
2287
2288    /// W12-G #8: `FederationConfig::build` rejects duplicate peers
2289    /// where the URLs differ only in trailing-slash. Existing test
2290    /// (`config_build_rejects_duplicate_peer_urls`) uses identical
2291    /// strings; this exercises the normalization branch
2292    /// (`trim_end_matches('/').to_ascii_lowercase()`).
2293    #[test]
2294    fn config_build_rejects_duplicate_peers_differing_only_in_trailing_slash() {
2295        let result = FederationConfig::build(
2296            2,
2297            &[
2298                "http://peer.example".to_string(),
2299                "http://peer.example/".to_string(),
2300            ],
2301            Duration::from_millis(500),
2302            None,
2303            None,
2304            None,
2305            "ai:dup-test".to_string(),
2306            None,
2307        );
2308        let err = match result {
2309            Ok(_) => panic!("trailing-slash dup must be rejected"),
2310            Err(e) => e,
2311        };
2312        let msg = format!("{err}");
2313        assert!(
2314            msg.contains("duplicate peer URL"),
2315            "expected duplicate-peer error, got {msg}"
2316        );
2317    }
2318
2319    /// W12-G #9: `FederationConfig::build` rejects duplicate peers where
2320    /// the URLs differ only in scheme/host casing. Mirrors the
2321    /// `to_ascii_lowercase` half of the normalization.
2322    #[test]
2323    fn config_build_rejects_duplicate_peers_differing_only_in_case() {
2324        let result = FederationConfig::build(
2325            2,
2326            &[
2327                "http://Peer.Example".to_string(),
2328                "http://peer.example".to_string(),
2329            ],
2330            Duration::from_millis(500),
2331            None,
2332            None,
2333            None,
2334            "ai:dup-case-test".to_string(),
2335            None,
2336        );
2337        let err = match result {
2338            Ok(_) => panic!("case-only dup must be rejected"),
2339            Err(e) => e,
2340        };
2341        let msg = format!("{err}");
2342        assert!(
2343            msg.contains("duplicate peer URL"),
2344            "expected duplicate-peer error, got {msg}"
2345        );
2346    }
2347
2348    /// W12-G #10: archive_quorum classifies a hanging peer as
2349    /// non-acking — the existing tests for archive_quorum use Ack and
2350    /// Fail only. With Hang behaviour and a tight 200ms timeout, the
2351    /// `Ok(None) | Err(_) => break` arm fires in the inner timeout
2352    /// match. (Ditto for restore/link/consolidate — covered together
2353    /// via a sweep below to keep this test focused.)
2354    #[tokio::test]
2355    async fn archive_quorum_hanging_peer_times_out_to_break_arm() {
2356        let (url1, _) = spawn_mock_peer(MockBehaviour::Hang).await;
2357        let (url2, _) = spawn_mock_peer(MockBehaviour::Hang).await;
2358        // W=2 with two hanging peers + 200ms timeout. The local commit
2359        // is the only source of acks; quorum cannot be met.
2360        let cfg = build_config(vec![url1, url2], 2, 200);
2361        let start = Instant::now();
2362        let tracker = broadcast_archive_quorum(&cfg, "mem-arch-id").await.unwrap();
2363        let elapsed = start.elapsed();
2364        // Loop must give up at the deadline, not hang for the full 10s
2365        // peer sleep.
2366        assert!(
2367            elapsed < Duration::from_secs(2),
2368            "archive_quorum must exit at deadline, took {elapsed:?}"
2369        );
2370        let err = finalise_quorum(&tracker).unwrap_err();
2371        assert!(
2372            matches!(err, QuorumError::QuorumNotMet { .. }),
2373            "expected QuorumNotMet, got {err:?}"
2374        );
2375    }
2376
2377    /// W12-G #11: `QuorumNotMetPayload::from_err` round-trip on a real
2378    /// `Unreachable` outcome from the broadcast loop. Existing direct
2379    /// tests build the QuorumError by hand; this end-to-end path has
2380    /// the broadcast actually classify the failure reason.
2381    #[tokio::test]
2382    async fn quorum_not_met_payload_unreachable_round_trip_from_broadcast() {
2383        // Two peers both Fail (not Hang) — we want the deadline to
2384        // elapse with zero peer acks. The broadcast finalises with
2385        // `Unreachable` because acks.is_empty() AND past deadline.
2386        let (url1, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2387        let (url2, _) = spawn_mock_peer(MockBehaviour::Fail).await;
2388        // Tight timeout so the deadline beats the 250ms backoff retry.
2389        let cfg = build_config(vec![url1, url2], 2, 100);
2390        let tracker = broadcast_store_quorum(&cfg, &sample_memory())
2391            .await
2392            .unwrap();
2393        // Wait past the deadline before finalising — this guarantees
2394        // `now > deadline` in finalise() so the Unreachable branch is
2395        // selected (rather than InFlight).
2396        tokio::time::sleep(Duration::from_millis(150)).await;
2397        let err = finalise_quorum(&tracker).unwrap_err();
2398        let payload = QuorumNotMetPayload::from_err(&err);
2399        assert_eq!(payload.error, "quorum_not_met");
2400        assert_eq!(payload.got, 1, "only local commit");
2401        assert_eq!(payload.needed, 2);
2402        assert!(
2403            payload.reason == "unreachable" || payload.reason == "timeout",
2404            "expected unreachable/timeout, got {}",
2405            payload.reason
2406        );
2407    }
2408
2409    /// W12-G #12: `catchup_once` against a peer with an unusual base URL
2410    /// (no `/api/v1/sync/push` suffix) — `trim_end_matches` no-ops, so
2411    /// the constructed `since` URL appends `/api/v1/sync/since` to the
2412    /// raw base. Exercises the trim-noop branch at the start of
2413    /// catchup_once.
2414    #[tokio::test]
2415    async fn catchup_once_peer_url_without_push_suffix_still_builds_since() {
2416        let (url, hits, _, last_peer) =
2417            spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![])).await;
2418        // Build a config whose peer.sync_push_url does NOT end in
2419        // `/api/v1/sync/push`. The trim_end_matches in catchup_once is
2420        // a no-op for this shape, so the base URL is the raw `url`.
2421        let client = reqwest::Client::builder()
2422            .timeout(Duration::from_millis(2000))
2423            .build()
2424            .unwrap();
2425        let cfg = FederationConfig {
2426            policy: QuorumPolicy::new(2, 1, Duration::from_millis(2000), Duration::from_secs(30))
2427                .unwrap(),
2428            peers: vec![PeerEndpoint {
2429                id: "peer-0".to_string(),
2430                // No /api/v1/sync/push suffix — verifies the trim is
2431                // tolerant of unexpected shapes.
2432                sync_push_url: url.clone(),
2433            }],
2434            client,
2435            sender_agent_id: "ai:no-suffix".to_string(),
2436            api_key: None,
2437            signing_key: None,
2438            #[cfg(feature = "sal")]
2439            dlq_sink: None,
2440        };
2441        let db = build_test_db();
2442        catchup_once(&cfg, &db).await;
2443        // The mock saw a hit at /api/v1/sync/since with the local agent id.
2444        assert_eq!(hits.load(Ordering::Relaxed), 1);
2445        assert_eq!(
2446            last_peer.lock().await.as_deref(),
2447            Some("ai:no-suffix"),
2448            "local agent id should be forwarded as ?peer="
2449        );
2450    }
2451
2452    /// W12-G #13: `catchup_once` skips memories that fail
2453    /// `validate_memory` (e.g. invalid `source` enum). The valid memory
2454    /// IS applied; sync_state advances to the latest TS seen. Exercises
2455    /// the `if crate::validate::validate_memory(&mem).is_err() { continue; }`
2456    /// branch which the F9 happy-path tests don't trigger.
2457    #[tokio::test]
2458    async fn catchup_once_skips_invalid_memory_but_applies_valid_neighbour() {
2459        // valid memory uses source="system" (whitelisted by validate_memory).
2460        let valid = catchup_memory("ok-mem", "2026-04-26T10:00:00Z");
2461        // invalid memory has source not in the allowlist (validate fails).
2462        let mut bad = catchup_memory("bad-source", "2026-04-26T10:00:01Z");
2463        bad.source = "made-up-source-not-in-allowlist".to_string();
2464        let mems = vec![valid.clone(), bad];
2465
2466        let (url, hits, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
2467        let cfg = build_catchup_cfg(&url, 2000);
2468        let db = build_test_db();
2469        catchup_once(&cfg, &db).await;
2470
2471        assert_eq!(hits.load(Ordering::Relaxed), 1);
2472        let lock = db.lock().await;
2473        // Only the valid memory was inserted.
2474        let count: i64 = lock
2475            .0
2476            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2477            .unwrap();
2478        assert_eq!(count, 1, "only the valid memory should land");
2479        let title: String = lock
2480            .0
2481            .query_row(
2482                "SELECT title FROM memories WHERE namespace='catchup' LIMIT 1",
2483                [],
2484                |r| r.get(0),
2485            )
2486            .unwrap();
2487        assert_eq!(title, "ok-mem");
2488        // sync_state advanced to the latest TS of the APPLIED rows
2489        // only — the validate-fail `continue` happens before the
2490        // `latest_ts` bump, so the invalid 10:00:01 row does NOT
2491        // contribute. Net: latest_ts == valid memory's timestamp.
2492        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2493        assert_eq!(
2494            clock.entries.get("peer-0").map(String::as_str),
2495            Some("2026-04-26T10:00:00Z"),
2496            "sync_state tracks latest_ts of validate-passing rows"
2497        );
2498    }
2499
2500    /// L11 (v0.7.0.1) — federation-replicate-then-read agent_id preservation.
2501    ///
2502    /// Scenario (NHI-D-fed-agentid-mutation):
2503    ///   1. openclaw-1 writes memory M with `metadata.agent_id="ai:alice@plan-c"`.
2504    ///   2. openclaw-2's catchup loop fetches M via `GET /api/v1/sync/since`.
2505    ///   3. openclaw-2 inserts M locally via `db::insert_if_newer`.
2506    ///   4. Read-back on openclaw-2 must surface the SAME `agent_id` —
2507    ///      "ai:alice@plan-c" — not openclaw-2's daemon identity, not the
2508    ///      receiver-side anonymous fallback.
2509    ///
2510    /// The contract is documented in CLAUDE.md §Agent Identity (NHI):
2511    /// > Once a memory is stored, `metadata.agent_id` is preserved across
2512    /// > update, dedup (UPSERT), MCP `memory_update`, HTTP `PUT /memories/{id}`,
2513    /// > import, sync, and consolidate.
2514    ///
2515    /// Pre-fix, the regression manifested when the same memory was also
2516    /// pushed through `POST /api/v1/memories` (the `create_memory` handler)
2517    /// — the HTTP resolver ignored `metadata.agent_id` and clobbered it with
2518    /// the per-request anonymous fallback. This test pins the catchup path
2519    /// directly so future refactors of `insert_if_newer` can't silently
2520    /// regress the federation contract.
2521    #[tokio::test]
2522    async fn l11_catchup_preserves_original_agent_id_through_replication() {
2523        // Build a peer-side memory carrying alice's claim.
2524        let mut alice_mem = catchup_memory("alice-note", "2026-05-10T10:00:00Z");
2525        alice_mem.metadata = serde_json::json!({
2526            "agent_id": "ai:alice@plan-c",
2527            "shared": "alice wrote this"
2528        });
2529
2530        let (url, hits, _, _) =
2531            spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![alice_mem.clone()])).await;
2532        let cfg = build_catchup_cfg(&url, 2000);
2533        let db = build_test_db();
2534
2535        catchup_once(&cfg, &db).await;
2536
2537        assert_eq!(hits.load(Ordering::Relaxed), 1, "catchup should hit once");
2538
2539        // Read back the replicated row and assert agent_id is intact.
2540        let lock = db.lock().await;
2541        let count: i64 = lock
2542            .0
2543            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2544            .unwrap();
2545        assert_eq!(count, 1, "alice's row must land on the receiver");
2546
2547        let (raw_metadata,): (String,) = lock
2548            .0
2549            .query_row(
2550                "SELECT metadata FROM memories WHERE title='alice-note'",
2551                [],
2552                |r| Ok((r.get(0)?,)),
2553            )
2554            .unwrap();
2555        let stored: serde_json::Value = serde_json::from_str(&raw_metadata).unwrap();
2556        assert_eq!(
2557            stored.get("agent_id").and_then(serde_json::Value::as_str),
2558            Some("ai:alice@plan-c"),
2559            "agent_id must survive federation replication verbatim — \
2560             observed rewrite to receiver identity is the L11 NHI-D \
2561             regression"
2562        );
2563        // Non-agent_id metadata fields must also round-trip.
2564        assert_eq!(
2565            stored.get("shared").and_then(serde_json::Value::as_str),
2566            Some("alice wrote this"),
2567            "sibling metadata fields must round-trip alongside agent_id"
2568        );
2569    }
2570
2571    /// W12-G #14: `AckTracker::record_peer_ack` is idempotent — recording
2572    /// the same peer id twice does not double-count. Exercised
2573    /// indirectly by the broadcast layer (the tracker is a HashSet under
2574    /// the hood) but never asserted directly.
2575    #[test]
2576    fn ack_tracker_record_peer_ack_is_idempotent() {
2577        let policy = QuorumPolicy::new(3, 2, Duration::from_secs(1), Duration::from_secs(30))
2578            .expect("policy");
2579        let mut t = AckTracker::new(policy, Instant::now());
2580        t.record_local();
2581        t.record_peer_ack("peer-a");
2582        t.record_peer_ack("peer-a"); // dup — must dedupe
2583        // 2 acks (local + 1 distinct peer) = 2 = W → quorum met.
2584        assert!(t.is_quorum_met(Instant::now()));
2585        // Adding a third distinct peer does not regress quorum.
2586        t.record_peer_ack("peer-b");
2587        assert!(t.is_quorum_met(Instant::now()));
2588    }
2589
2590    /// W12-G #15a: `catchup_once` against a peer whose 200 body lacks
2591    /// a `memories` key — `body.get("memories")` returns None and the
2592    /// loop `continue`s without applying anything or advancing
2593    /// sync_state. Hits the `None => continue` arm at line ~1478
2594    /// (the existing F9 tests always include the `memories` array).
2595    #[tokio::test]
2596    async fn catchup_once_body_without_memories_key_is_skipped() {
2597        // Hand-rolled handler returning `{"applied": 0}` (no memories key).
2598        let app = Router::new().route(
2599            "/api/v1/sync/since",
2600            axum::routing::get(|| async {
2601                (
2602                    StatusCode::OK,
2603                    AxumJson(serde_json::json!({"applied":0,"note":"empty cluster"})),
2604                )
2605            }),
2606        );
2607        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2608        let addr = listener.local_addr().unwrap();
2609        tokio::spawn(async move {
2610            axum::serve(listener, app).await.ok();
2611        });
2612        let url = format!("http://{addr}");
2613        let cfg = build_catchup_cfg(&url, 2000);
2614        let db = build_test_db();
2615        catchup_once(&cfg, &db).await;
2616        let lock = db.lock().await;
2617        let count: i64 = lock
2618            .0
2619            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2620            .unwrap();
2621        assert_eq!(count, 0, "no memories key → no inserts");
2622        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
2623        assert!(
2624            clock.entries.get("peer-0").is_none(),
2625            "no memories key → sync_state untouched"
2626        );
2627    }
2628
2629    /// W12-G #15b: `catchup_once` against a peer that returns a 200 with
2630    /// a `memories` array containing an unparseable element. The
2631    /// individual element is skipped (`serde_json::from_value` Err) and
2632    /// the rest of the batch is applied. Hits lines 1492-1494.
2633    #[tokio::test]
2634    async fn catchup_once_unparseable_individual_memory_is_skipped() {
2635        // `memories[0]` is a valid Memory, `memories[1]` is a JSON object
2636        // with the wrong shape (missing required fields).
2637        let valid_mem = serde_json::to_value(catchup_memory("ok", "2026-04-26T10:00:00Z")).unwrap();
2638        let bad_mem = serde_json::json!({"id":"oops","not_a_memory_field": true});
2639        let app = Router::new().route(
2640            "/api/v1/sync/since",
2641            axum::routing::get(move || {
2642                let valid = valid_mem.clone();
2643                let bad = bad_mem.clone();
2644                async move {
2645                    (
2646                        StatusCode::OK,
2647                        AxumJson(serde_json::json!({"memories": [valid, bad]})),
2648                    )
2649                }
2650            }),
2651        );
2652        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2653        let addr = listener.local_addr().unwrap();
2654        tokio::spawn(async move {
2655            axum::serve(listener, app).await.ok();
2656        });
2657        let url = format!("http://{addr}");
2658        let cfg = build_catchup_cfg(&url, 2000);
2659        let db = build_test_db();
2660        catchup_once(&cfg, &db).await;
2661        let lock = db.lock().await;
2662        // Only the parseable memory landed.
2663        let count: i64 = lock
2664            .0
2665            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
2666            .unwrap();
2667        assert_eq!(count, 1, "only parseable memory inserted");
2668    }
2669
2670    /// W12-G #16: id-drift on `broadcast_delete_quorum` exercises the
2671    /// `IdDrift => record_id_drift` arm at line ~591 (the existing
2672    /// `id_drift_peer_does_not_count_as_ack` only hits the store path).
2673    #[tokio::test]
2674    async fn delete_quorum_id_drift_peer_records_drift_not_ack() {
2675        let url1 = spawn_id_drift_peer().await;
2676        let url2 = spawn_id_drift_peer().await;
2677        let cfg = build_config(vec![url1, url2], 2, 1000);
2678        let tracker = broadcast_delete_quorum(&cfg, "mem-del-x").await.unwrap();
2679        // local + 0 peer acks = 1 < W=2 → not met.
2680        let err = finalise_quorum(&tracker).unwrap_err();
2681        assert!(
2682            matches!(err, QuorumError::QuorumNotMet { got: 1, .. }),
2683            "expected QuorumNotMet got=1, got {err:?}"
2684        );
2685        // Both peers reported drift.
2686        assert_eq!(
2687            tracker.id_drift_count(),
2688            2,
2689            "both peers should be recorded as drift"
2690        );
2691    }
2692
2693    /// W12-G #17: id-drift on `broadcast_archive_quorum` exercises the
2694    /// IdDrift arm at line ~679.
2695    #[tokio::test]
2696    async fn archive_quorum_id_drift_peer_records_drift_not_ack() {
2697        let url1 = spawn_id_drift_peer().await;
2698        let cfg = build_config(vec![url1], 2, 1000);
2699        let tracker = broadcast_archive_quorum(&cfg, "mem-arch-x").await.unwrap();
2700        let err = finalise_quorum(&tracker).unwrap_err();
2701        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2702        assert_eq!(tracker.id_drift_count(), 1);
2703    }
2704
2705    /// W12-G #18: id-drift on `broadcast_restore_quorum` exercises the
2706    /// IdDrift arm at line ~768.
2707    #[tokio::test]
2708    async fn restore_quorum_id_drift_peer_records_drift_not_ack() {
2709        let url1 = spawn_id_drift_peer().await;
2710        let cfg = build_config(vec![url1], 2, 1000);
2711        let tracker = broadcast_restore_quorum(&cfg, "mem-res-x").await.unwrap();
2712        let err = finalise_quorum(&tracker).unwrap_err();
2713        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2714        assert_eq!(tracker.id_drift_count(), 1);
2715    }
2716
2717    /// W12-G #19: id-drift on `broadcast_link_quorum` exercises the
2718    /// IdDrift arm at line ~851.
2719    #[tokio::test]
2720    async fn link_quorum_id_drift_peer_records_drift_not_ack() {
2721        let url1 = spawn_id_drift_peer().await;
2722        let cfg = build_config(vec![url1], 2, 1000);
2723        let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
2724        let err = finalise_quorum(&tracker).unwrap_err();
2725        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2726        assert_eq!(tracker.id_drift_count(), 1);
2727    }
2728
2729    /// W12-G #20: id-drift on `broadcast_consolidate_quorum` exercises
2730    /// the IdDrift arm at line ~935.
2731    #[tokio::test]
2732    async fn consolidate_quorum_id_drift_peer_records_drift_not_ack() {
2733        let url1 = spawn_id_drift_peer().await;
2734        let cfg = build_config(vec![url1], 2, 1000);
2735        let new_mem = sample_memory();
2736        let tracker = broadcast_consolidate_quorum(&cfg, &new_mem, &[])
2737            .await
2738            .unwrap();
2739        let err = finalise_quorum(&tracker).unwrap_err();
2740        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2741        assert_eq!(tracker.id_drift_count(), 1);
2742    }
2743
2744    /// W12-G #21: id-drift on `broadcast_pending_quorum` exercises the
2745    /// IdDrift arm at line ~1024.
2746    #[tokio::test]
2747    async fn pending_quorum_id_drift_peer_records_drift_not_ack() {
2748        let url1 = spawn_id_drift_peer().await;
2749        let cfg = build_config(vec![url1], 2, 1000);
2750        let tracker = broadcast_pending_quorum(&cfg, &sample_pending())
2751            .await
2752            .unwrap();
2753        let err = finalise_quorum(&tracker).unwrap_err();
2754        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2755        assert_eq!(tracker.id_drift_count(), 1);
2756    }
2757
2758    /// W12-G #22: id-drift on `broadcast_pending_decision_quorum`
2759    /// exercises the IdDrift arm at line ~1112.
2760    #[tokio::test]
2761    async fn pending_decision_quorum_id_drift_peer_records_drift_not_ack() {
2762        let url1 = spawn_id_drift_peer().await;
2763        let cfg = build_config(vec![url1], 2, 1000);
2764        let tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
2765            .await
2766            .unwrap();
2767        let err = finalise_quorum(&tracker).unwrap_err();
2768        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2769        assert_eq!(tracker.id_drift_count(), 1);
2770    }
2771
2772    /// W12-G #23: id-drift on `broadcast_namespace_meta_quorum`
2773    /// exercises the IdDrift arm at line ~1201.
2774    #[tokio::test]
2775    async fn namespace_meta_quorum_id_drift_peer_records_drift_not_ack() {
2776        let url1 = spawn_id_drift_peer().await;
2777        let cfg = build_config(vec![url1], 2, 1000);
2778        let tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
2779            .await
2780            .unwrap();
2781        let err = finalise_quorum(&tracker).unwrap_err();
2782        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2783        assert_eq!(tracker.id_drift_count(), 1);
2784    }
2785
2786    /// W12-G #24: id-drift on `broadcast_namespace_meta_clear_quorum`
2787    /// exercises the IdDrift arm at line ~1294.
2788    #[tokio::test]
2789    async fn namespace_meta_clear_quorum_id_drift_peer_records_drift_not_ack() {
2790        let url1 = spawn_id_drift_peer().await;
2791        let cfg = build_config(vec![url1], 2, 1000);
2792        let namespaces = vec!["app/team".to_string()];
2793        let tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
2794            .await
2795            .unwrap();
2796        let err = finalise_quorum(&tracker).unwrap_err();
2797        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
2798        assert_eq!(tracker.id_drift_count(), 1);
2799    }
2800
2801    /// W12-G #25: post-quorum detach for `broadcast_delete_quorum`
2802    /// fanout exercises the post-quorum spawn block at lines 608-616
2803    /// (the `if !joins.is_empty()` arm). With W=2 N=3 and one peer
2804    /// hanging, quorum is met by the two ack peers and the detached
2805    /// task drains the still-running join.
2806    #[tokio::test]
2807    async fn delete_quorum_post_quorum_detach_drains_remaining_peer() {
2808        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2809        let (url2, count2) = spawn_mock_peer(MockBehaviour::Ack).await;
2810        let (url3, count3) = spawn_mock_peer(MockBehaviour::Fail).await;
2811        let cfg = build_config(vec![url1, url2, url3], 2, 2000);
2812        let _tracker = broadcast_delete_quorum(&cfg, "mem-detach").await.unwrap();
2813        // Wait long enough for the detached failing peer to finish its
2814        // first attempt + 250ms backoff + retry.
2815        for _ in 0..100 {
2816            if count1.load(Ordering::Relaxed) >= 1
2817                && count2.load(Ordering::Relaxed) >= 1
2818                && count3.load(Ordering::Relaxed) >= 1
2819            {
2820                break;
2821            }
2822            tokio::time::sleep(Duration::from_millis(20)).await;
2823        }
2824        // Failing peer must have been called by the detach (it
2825        // wouldn't have been if the detach was aborted on quorum-met).
2826        assert!(
2827            count3.load(Ordering::Relaxed) >= 1,
2828            "failing peer must be reached by the detached fanout"
2829        );
2830    }
2831
2832    /// W12-G #15: `AckTracker::finalise` returns `InFlight` when called
2833    /// pre-deadline with insufficient acks. Distinct from Timeout
2834    /// (post-deadline w/ partial) and Unreachable (post-deadline w/ none).
2835    /// Validates the third reason variant directly.
2836    #[test]
2837    fn ack_tracker_finalise_pre_deadline_returns_in_flight() {
2838        // Long timeout so we are pre-deadline at finalise().
2839        let policy = QuorumPolicy::new(3, 2, Duration::from_secs(60), Duration::from_secs(30))
2840            .expect("policy");
2841        let now = Instant::now();
2842        let mut t = AckTracker::new(policy, now);
2843        t.record_local();
2844        // No peer acks yet — finalise pre-deadline should be InFlight.
2845        let err = t.finalise(now).unwrap_err();
2846        match err {
2847            QuorumError::QuorumNotMet {
2848                got,
2849                needed,
2850                reason,
2851            } => {
2852                assert_eq!(got, 1);
2853                assert_eq!(needed, 2);
2854                assert_eq!(
2855                    reason,
2856                    QuorumFailureReason::InFlight,
2857                    "pre-deadline insufficient-ack must classify as InFlight"
2858                );
2859            }
2860            other => panic!("expected QuorumNotMet, got {other:?}"),
2861        }
2862    }
2863
2864    // ---------------------------------------------------------------------
2865    // L0.7-4 Tier C — broadcast_*_quorum IdDrift + transient-retry coverage
2866    // ---------------------------------------------------------------------
2867    //
2868    // The existing tests cover broadcast_store_quorum's IdDrift/retry
2869    // paths but not the equivalents in archive/delete/restore/link/
2870    // consolidate/pending/decision/namespace-meta. Each broadcast
2871    // function duplicates the post-quorum detach logic so the
2872    // IdDrift / join-error / partial-quorum WARN branches are unique
2873    // per function — closing the gap requires hitting each one.
2874
2875    #[tokio::test]
2876    async fn delete_quorum_transient_peer_failure_retried_once() {
2877        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2878        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2879        let cfg = build_config(vec![url1, url2], 2, 2000);
2880        let _tracker = broadcast_delete_quorum(&cfg, "mem-del-retry")
2881            .await
2882            .unwrap();
2883        for _ in 0..200 {
2884            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2885                break;
2886            }
2887            tokio::time::sleep(Duration::from_millis(10)).await;
2888        }
2889        assert_eq!(
2890            count2.load(Ordering::Relaxed),
2891            2,
2892            "transient failure must retry"
2893        );
2894    }
2895
2896    #[tokio::test]
2897    async fn archive_quorum_transient_peer_failure_retried_once() {
2898        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2899        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2900        let cfg = build_config(vec![url1, url2], 2, 2000);
2901        let _tracker = broadcast_archive_quorum(&cfg, "mem-arc-retry")
2902            .await
2903            .unwrap();
2904        for _ in 0..200 {
2905            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2906                break;
2907            }
2908            tokio::time::sleep(Duration::from_millis(10)).await;
2909        }
2910        assert_eq!(count2.load(Ordering::Relaxed), 2);
2911    }
2912
2913    #[tokio::test]
2914    async fn restore_quorum_transient_peer_failure_retried_once() {
2915        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2916        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2917        let cfg = build_config(vec![url1, url2], 2, 2000);
2918        let _tracker = broadcast_restore_quorum(&cfg, "mem-res-retry")
2919            .await
2920            .unwrap();
2921        for _ in 0..200 {
2922            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2923                break;
2924            }
2925            tokio::time::sleep(Duration::from_millis(10)).await;
2926        }
2927        assert_eq!(count2.load(Ordering::Relaxed), 2);
2928    }
2929
2930    #[tokio::test]
2931    async fn link_quorum_transient_peer_failure_retried_once() {
2932        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2933        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2934        let cfg = build_config(vec![url1, url2], 2, 2000);
2935        let _tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
2936        for _ in 0..200 {
2937            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2938                break;
2939            }
2940            tokio::time::sleep(Duration::from_millis(10)).await;
2941        }
2942        assert_eq!(count2.load(Ordering::Relaxed), 2);
2943    }
2944
2945    #[tokio::test]
2946    async fn consolidate_quorum_transient_peer_failure_retried_once() {
2947        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2948        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2949        let cfg = build_config(vec![url1, url2], 2, 2000);
2950        let mem = sample_memory();
2951        let sources = vec!["src-1".to_string(), "src-2".to_string()];
2952        let _tracker = broadcast_consolidate_quorum(&cfg, &mem, &sources)
2953            .await
2954            .unwrap();
2955        for _ in 0..200 {
2956            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2957                break;
2958            }
2959            tokio::time::sleep(Duration::from_millis(10)).await;
2960        }
2961        assert_eq!(count2.load(Ordering::Relaxed), 2);
2962    }
2963
2964    #[tokio::test]
2965    async fn pending_quorum_transient_peer_failure_retried_once() {
2966        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2967        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2968        let cfg = build_config(vec![url1, url2], 2, 2000);
2969        let _tracker = broadcast_pending_quorum(&cfg, &sample_pending())
2970            .await
2971            .unwrap();
2972        for _ in 0..200 {
2973            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2974                break;
2975            }
2976            tokio::time::sleep(Duration::from_millis(10)).await;
2977        }
2978        assert_eq!(count2.load(Ordering::Relaxed), 2);
2979    }
2980
2981    #[tokio::test]
2982    async fn pending_decision_quorum_transient_peer_failure_retried_once() {
2983        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
2984        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
2985        let cfg = build_config(vec![url1, url2], 2, 2000);
2986        let _tracker = broadcast_pending_decision_quorum(&cfg, &sample_decision())
2987            .await
2988            .unwrap();
2989        for _ in 0..200 {
2990            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
2991                break;
2992            }
2993            tokio::time::sleep(Duration::from_millis(10)).await;
2994        }
2995        assert_eq!(count2.load(Ordering::Relaxed), 2);
2996    }
2997
2998    #[tokio::test]
2999    async fn namespace_meta_quorum_transient_peer_failure_retried_once() {
3000        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
3001        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
3002        let cfg = build_config(vec![url1, url2], 2, 2000);
3003        let _tracker = broadcast_namespace_meta_quorum(&cfg, &sample_namespace_meta())
3004            .await
3005            .unwrap();
3006        for _ in 0..200 {
3007            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
3008                break;
3009            }
3010            tokio::time::sleep(Duration::from_millis(10)).await;
3011        }
3012        assert_eq!(count2.load(Ordering::Relaxed), 2);
3013    }
3014
3015    #[tokio::test]
3016    async fn namespace_meta_clear_quorum_transient_peer_failure_retried_once() {
3017        let (url1, count1) = spawn_mock_peer(MockBehaviour::Ack).await;
3018        let (url2, count2) = spawn_mock_peer(MockBehaviour::FailThenAck { fail_until: 1 }).await;
3019        let cfg = build_config(vec![url1, url2], 2, 2000);
3020        let namespaces = vec!["ns/x".to_string()];
3021        let _tracker = broadcast_namespace_meta_clear_quorum(&cfg, &namespaces)
3022            .await
3023            .unwrap();
3024        for _ in 0..200 {
3025            if count1.load(Ordering::Relaxed) >= 1 && count2.load(Ordering::Relaxed) >= 2 {
3026                break;
3027            }
3028            tokio::time::sleep(Duration::from_millis(10)).await;
3029        }
3030        assert_eq!(count2.load(Ordering::Relaxed), 2);
3031    }
3032
3033    // ---- IdDrift variants for non-store broadcast functions ----
3034
3035    #[tokio::test]
3036    async fn delete_quorum_id_drift_does_not_count_as_ack() {
3037        let url1 = spawn_id_drift_peer().await;
3038        let url2 = spawn_id_drift_peer().await;
3039        let cfg = build_config(vec![url1, url2], 2, 1000);
3040        let tracker = broadcast_delete_quorum(&cfg, "mem-del-drift")
3041            .await
3042            .unwrap();
3043        let err = finalise_quorum(&tracker).unwrap_err();
3044        match err {
3045            QuorumError::QuorumNotMet { got, .. } => assert_eq!(got, 1),
3046            other => panic!("expected QuorumNotMet, got {other:?}"),
3047        }
3048    }
3049
3050    #[tokio::test]
3051    async fn archive_quorum_id_drift_does_not_count_as_ack() {
3052        let url1 = spawn_id_drift_peer().await;
3053        let url2 = spawn_id_drift_peer().await;
3054        let cfg = build_config(vec![url1, url2], 2, 1000);
3055        let tracker = broadcast_archive_quorum(&cfg, "mem-arc-drift")
3056            .await
3057            .unwrap();
3058        let err = finalise_quorum(&tracker).unwrap_err();
3059        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3060    }
3061
3062    #[tokio::test]
3063    async fn link_quorum_id_drift_does_not_count_as_ack() {
3064        let url1 = spawn_id_drift_peer().await;
3065        let url2 = spawn_id_drift_peer().await;
3066        let cfg = build_config(vec![url1, url2], 2, 1000);
3067        let tracker = broadcast_link_quorum(&cfg, &sample_link()).await.unwrap();
3068        let err = finalise_quorum(&tracker).unwrap_err();
3069        assert!(matches!(err, QuorumError::QuorumNotMet { .. }));
3070    }
3071
3072    // ---------------------------------------------------------------------
3073    // L0.7-4 Tier C — catchup_once_with_store SAL path coverage
3074    // ---------------------------------------------------------------------
3075    //
3076    // The non-SAL path through catchup_once is covered extensively above;
3077    // the SAL store branch (lines 184-218 of receive.rs) is uncovered.
3078    // These tests exercise the `Some(store)` path through a SqliteStore
3079    // handle so the store.apply_remote_memory() dispatch + sync_state
3080    // observe at end of batch are both hit.
3081
3082    #[cfg(feature = "sal")]
3083    #[tokio::test]
3084    async fn catchup_once_with_store_applies_via_sal_handle() {
3085        use super::receive::catchup_once_with_store;
3086        use crate::store::MemoryStore;
3087
3088        let mem = catchup_memory("sal-applied", "2026-04-26T10:00:00Z");
3089        let (url, hits, _, _) =
3090            spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![mem.clone()])).await;
3091        let cfg = build_catchup_cfg(&url, 2000);
3092        let db = build_test_db();
3093        // Build a SqliteStore on the same DB path the federation Db
3094        // owns. Since build_test_db returns an in-memory db that is
3095        // distinct from any SqliteStore-opened DB, we use a tempdir
3096        // for the SAL store and a separate in-memory db for the
3097        // Federation Db. The catchup path writes via the store; the
3098        // vector-clock advancement happens on the Federation Db.
3099        let dir = tempfile::tempdir().expect("tempdir");
3100        let store_path = dir.path().join("store.db");
3101        let store: Arc<dyn MemoryStore> = Arc::new(
3102            crate::store::sqlite::SqliteStore::open(&store_path).expect("open SqliteStore"),
3103        );
3104        catchup_once_with_store(&cfg, &db, Some(&store)).await;
3105
3106        assert_eq!(hits.load(Ordering::Relaxed), 1, "peer must be hit once");
3107        // The mem must have been applied via the SAL store handle —
3108        // read it back through the store's get() method.
3109        let ctx = crate::store::CallerContext::for_agent("test");
3110        let got = store
3111            .get(&ctx, &mem.id)
3112            .await
3113            .expect("SAL store should have the catchup memory");
3114        assert_eq!(got.title, "sal-applied");
3115
3116        // sync_state should have advanced to the memory's timestamp on
3117        // the Federation Db (sync_state is always tracked via the
3118        // local rusqlite handle even on SAL builds).
3119        let lock = db.lock().await;
3120        let clock = crate::db::sync_state_load(&lock.0, "ai:catchup-test").unwrap();
3121        assert_eq!(
3122            clock.entries.get("peer-0").map(String::as_str),
3123            Some("2026-04-26T10:00:00Z"),
3124        );
3125    }
3126
3127    /// `catchup_once_with_store` with `None` store falls back to the
3128    /// legacy rusqlite insert_if_newer path. Pin parity so the
3129    /// `else` branch (line 219-247 of receive.rs) is exercised by
3130    /// the SAL build.
3131    #[cfg(feature = "sal")]
3132    #[tokio::test]
3133    async fn catchup_once_with_store_none_uses_legacy_rusqlite() {
3134        use super::receive::catchup_once_with_store;
3135        let mem = catchup_memory("legacy-applied", "2026-04-26T10:00:00Z");
3136        let (url, hits, _, _) =
3137            spawn_since_peer(SinceMockBehaviour::ReturnMemories(vec![mem])).await;
3138        let cfg = build_catchup_cfg(&url, 2000);
3139        let db = build_test_db();
3140        catchup_once_with_store(&cfg, &db, None).await;
3141        assert_eq!(hits.load(Ordering::Relaxed), 1);
3142        let lock = db.lock().await;
3143        let count: i64 = lock
3144            .0
3145            .query_row("SELECT COUNT(*) FROM memories", [], |r| r.get(0))
3146            .unwrap();
3147        assert_eq!(count, 1, "legacy path must insert the row locally");
3148    }
3149
3150    /// SAL store path with an invalid memory in the batch — the
3151    /// `validate_memory` skip-branch must trigger and the valid
3152    /// neighbour must still apply via the store handle.
3153    #[cfg(feature = "sal")]
3154    #[tokio::test]
3155    async fn catchup_once_with_store_skips_invalid_memory_via_sal_path() {
3156        use super::receive::catchup_once_with_store;
3157        let valid = catchup_memory("sal-valid", "2026-04-26T10:00:00Z");
3158        let mut bad = catchup_memory("sal-bad", "2026-04-26T10:00:01Z");
3159        bad.source = "not-in-allowlist".to_string();
3160        let mems = vec![valid.clone(), bad];
3161
3162        let (url, _, _, _) = spawn_since_peer(SinceMockBehaviour::ReturnMemories(mems)).await;
3163        let cfg = build_catchup_cfg(&url, 2000);
3164        let db = build_test_db();
3165        let dir = tempfile::tempdir().expect("tempdir");
3166        let store: Arc<dyn crate::store::MemoryStore> = Arc::new(
3167            crate::store::sqlite::SqliteStore::open(dir.path().join("store.db"))
3168                .expect("open SqliteStore"),
3169        );
3170        catchup_once_with_store(&cfg, &db, Some(&store)).await;
3171        // Only the valid memory should be in the SAL store.
3172        let ctx = crate::store::CallerContext::for_agent("test");
3173        assert!(
3174            store.get(&ctx, &valid.id).await.is_ok(),
3175            "valid memory must land via SAL store"
3176        );
3177    }
3178}