Skip to main content

ai_memory/identity/
replay.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! H5 (v0.7.0 round-2) — Ed25519 verify-link replay protection.
5//!
6//! `POST /api/v1/links/verify` accepts the *same* `(link_id, signature)`
7//! pair on every call by construction — Ed25519 signatures are
8//! re-verifiable in perpetuity, that's the whole point of the
9//! algorithm. The replay window only appears when an operator wires
10//! the verify endpoint into a higher-level protocol (proof-of-claim
11//! workflow, federation handshake, etc.) where the verify call itself
12//! is an authentication primitive: the attacker captures a single
13//! successful `verify_link` request and replays it indefinitely.
14//!
15//! The mitigation is straightforward: every verify request carries a
16//! caller-supplied `verification_nonce` (UUID v4 expected — we don't
17//! enforce the format, only uniqueness). Hash
18//! `(link_id, signature, nonce)` into a 32-byte SHA-256 fingerprint
19//! and check against a bounded in-memory LRU. First-time fingerprints
20//! get cached and the verify proceeds; repeats produce 409 Conflict.
21//!
22//! # Memory bound
23//!
24//! The cache is a `Mutex<VecDeque<[u8; 32]>>` with a 10 000-entry
25//! ceiling. At full capacity that's:
26//!
27//!   10 000 entries × (32 bytes hash + 8 bytes VecDeque slot overhead)
28//!   ≈ 400 KB heap-resident
29//!
30//! Total cap including VecDeque slack and Mutex overhead lands under
31//! ~512 KB on every supported platform. Eviction is FIFO — when the
32//! deque is full and a new fingerprint comes in, the oldest entry is
33//! evicted before the new one is pushed.
34//!
35//! # Threat model
36//!
37//! The cache is a defense **within a single daemon process**. Across
38//! restarts, the cache is empty — a replay attacker who waits past
39//! the restart wins. Cross-process clustering (multiple daemons
40//! behind a load balancer) is also out of scope: each replica has its
41//! own cache. Either limitation is acceptable because:
42//!
43//! 1. The verify endpoint is GET-equivalent semantically (no
44//!    persistent state changes), and operators wiring it into an
45//!    auth flow already need to layer their own freshness checks on
46//!    top — the nonce check raises the cost of trivial replay
47//!    without claiming to be a complete authentication primitive.
48//! 2. A Redis or DB-backed cache would be appropriate for a true
49//!    distributed deployment; we punt that to v0.8.
50
51use std::collections::{HashSet, VecDeque};
52use std::path::{Path, PathBuf};
53use std::sync::Mutex;
54use std::sync::atomic::{AtomicU64, Ordering};
55
56use sha2::{Digest, Sha256};
57
58/// LRU bound for the replay-protection cache. Chosen so the worst-case
59/// resident-memory cost stays under ~5 MB (see module docs for the
60/// derivation). v0.7.0 #1033 increased the ceiling from the original
61/// 10 000 to 100 000 entries to raise the cost of the
62/// eviction-flush attack (an attacker who can submit 10 000+ unique
63/// nonces per second evicts legitimate replay fingerprints under the
64/// pre-#1033 bound — see the issue for the threat model). Operators
65/// who page on the eviction metric (`evictions_since_boot`) and need
66/// a true distributed cache should escalate to Redis-backed storage
67/// in v0.8.
68pub const SEEN_VERIFICATIONS_CAPACITY: usize = 100_000;
69
70/// v0.7.0 #1033 — replay cache backing storage. `HashSet` answers
71/// "have we seen this fingerprint" in O(1) (pre-#1033 the
72/// `VecDeque::iter().any(...)` linear scan was O(N) ≈ 10 000 SHA-256
73/// comparisons per insert at the ceiling — magnified CPU under a
74/// flood). `VecDeque` retains FIFO eviction order. The two are kept
75/// in lockstep: `seen.insert(fp)` ↔ `order.push_back(fp)`,
76/// `seen.remove(&evicted)` ↔ `order.pop_front()`.
77#[derive(Debug, Default)]
78struct ReplayCacheInner {
79    seen: HashSet<[u8; 32]>,
80    order: VecDeque<[u8; 32]>,
81}
82
83/// Bounded FIFO cache of `(link_id, signature, nonce)` SHA-256
84/// fingerprints. Cheap to clone (it's behind an `Arc` in the daemon's
85/// `AppState`); the inner mutex serialises every insert/lookup so the
86/// cache is safe to share across handler invocations.
87#[derive(Debug, Default)]
88pub struct ReplayCache {
89    inner: Mutex<ReplayCacheInner>,
90    /// v0.7.0 #1033 — cumulative count of FIFO evictions since process
91    /// boot. Non-zero values are a paging signal: either the cache
92    /// ceiling is too low for the operator's verify-flow load OR an
93    /// attacker is flooding unique nonces to evict legitimate
94    /// fingerprints (the issue's flush-attack vector). Surface via
95    /// metrics or `evictions_since_boot()` for ops dashboards.
96    evictions: AtomicU64,
97}
98
99impl ReplayCache {
100    /// Fresh empty cache at the documented capacity.
101    #[must_use]
102    pub fn new() -> Self {
103        Self::default()
104    }
105
106    /// Fingerprint `(link_id, signature, nonce)` and check membership.
107    /// Returns `true` if the fingerprint has been seen before — the
108    /// caller should reject the request as a replay. Returns `false`
109    /// on the first seen value AND inserts it as a side effect.
110    ///
111    /// The caller is responsible for producing the nonce (random UUID
112    /// expected) and for choosing whether to bypass this check when
113    /// the request omits the nonce field (back-compat mode).
114    pub fn record_and_check(&self, link_id: &str, signature: &[u8], nonce: &str) -> ReplayDecision {
115        let fp = Self::fingerprint(link_id, signature, nonce);
116        let mut guard = match self.inner.lock() {
117            Ok(g) => g,
118            // A poisoned mutex means a prior insert panicked; we'd
119            // rather degrade open (no replay protection) than crash
120            // the daemon. Surface via the return enum so the caller
121            // can log it.
122            Err(p) => p.into_inner(),
123        };
124        // v0.7.0 #1033 — O(1) HashSet membership check replaces the
125        // pre-#1033 O(N) linear scan over the VecDeque.
126        if guard.seen.contains(&fp) {
127            return ReplayDecision::Replay;
128        }
129        if guard.order.len() >= SEEN_VERIFICATIONS_CAPACITY {
130            // FIFO eviction: the oldest fingerprint is dropped to
131            // make room. Capacity is a hard ceiling, not a soft one.
132            // Keep `seen` + `order` in lockstep.
133            if let Some(evicted) = guard.order.pop_front() {
134                guard.seen.remove(&evicted);
135                self.evictions.fetch_add(1, Ordering::Relaxed);
136            }
137        }
138        guard.order.push_back(fp);
139        guard.seen.insert(fp);
140        ReplayDecision::Fresh
141    }
142
143    /// Number of currently-cached fingerprints. Useful for tests and
144    /// for a future metrics exporter.
145    #[must_use]
146    pub fn len(&self) -> usize {
147        self.inner.lock().map(|g| g.order.len()).unwrap_or(0)
148    }
149
150    /// Whether the cache is empty. Trivial helper to satisfy clippy
151    /// (`len_zero`) on the few call sites that care.
152    #[must_use]
153    pub fn is_empty(&self) -> bool {
154        self.len() == 0
155    }
156
157    /// v0.7.0 #1033 — cumulative number of FIFO evictions since
158    /// process boot. Non-zero values mean the cache hit its ceiling
159    /// and dropped older fingerprints to make room. Operators should
160    /// surface this via a metrics exporter and page on sustained
161    /// growth: either legitimate verify-flow load is exceeding the
162    /// documented ceiling (escalate to a true distributed cache) OR
163    /// an attacker is flooding unique nonces to evict legitimate
164    /// fingerprints (the issue's flush-attack vector — investigate
165    /// rate-limit at `/api/v1/links/verify`).
166    #[must_use]
167    pub fn evictions_since_boot(&self) -> u64 {
168        self.evictions.load(Ordering::Relaxed)
169    }
170
171    /// Compute the 32-byte SHA-256 fingerprint over the three-element
172    /// tuple. Public for tests; not exported via `pub mod`.
173    fn fingerprint(link_id: &str, signature: &[u8], nonce: &str) -> [u8; 32] {
174        let mut hasher = Sha256::new();
175        // Length prefix every component so concatenation is unambiguous
176        // — preempts the `("a", "bc")` vs `("ab", "c")` collision class.
177        let lid = link_id.as_bytes();
178        let sig = signature;
179        let non = nonce.as_bytes();
180        #[allow(clippy::cast_possible_truncation)]
181        hasher.update((lid.len() as u32).to_be_bytes());
182        hasher.update(lid);
183        #[allow(clippy::cast_possible_truncation)]
184        hasher.update((sig.len() as u32).to_be_bytes());
185        hasher.update(sig);
186        #[allow(clippy::cast_possible_truncation)]
187        hasher.update((non.len() as u32).to_be_bytes());
188        hasher.update(non);
189        hasher.finalize().into()
190    }
191}
192
193/// Result of [`ReplayCache::record_and_check`].
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195pub enum ReplayDecision {
196    /// First time we've seen this `(link_id, signature, nonce)` tuple
197    /// in the current daemon process. The fingerprint was inserted.
198    Fresh,
199    /// Identical fingerprint has been seen before. Caller must reject.
200    Replay,
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn first_seen_returns_fresh() {
209        let cache = ReplayCache::new();
210        let d = cache.record_and_check("link-a", b"sig", "nonce-1");
211        assert_eq!(d, ReplayDecision::Fresh);
212        assert_eq!(cache.len(), 1);
213    }
214
215    #[test]
216    fn exact_repeat_returns_replay() {
217        let cache = ReplayCache::new();
218        assert_eq!(
219            cache.record_and_check("link-a", b"sig", "nonce-1"),
220            ReplayDecision::Fresh
221        );
222        assert_eq!(
223            cache.record_and_check("link-a", b"sig", "nonce-1"),
224            ReplayDecision::Replay
225        );
226        // Replay doesn't grow the cache.
227        assert_eq!(cache.len(), 1);
228    }
229
230    #[test]
231    fn different_nonces_for_same_link_and_sig_are_fresh() {
232        // Verifying the SAME link with the SAME signature but a fresh
233        // nonce on each call must always succeed — the nonce is a
234        // per-request anti-replay token, not a per-link state.
235        let cache = ReplayCache::new();
236        assert_eq!(
237            cache.record_and_check("link-a", b"sig", "nonce-1"),
238            ReplayDecision::Fresh
239        );
240        assert_eq!(
241            cache.record_and_check("link-a", b"sig", "nonce-2"),
242            ReplayDecision::Fresh
243        );
244        assert_eq!(cache.len(), 2);
245    }
246
247    #[test]
248    fn different_links_with_same_nonce_are_fresh() {
249        // A nonce collision across different link_ids is benign —
250        // they hash to different fingerprints. (Operators are
251        // advised to use UUID v4 nonces; we don't enforce.)
252        let cache = ReplayCache::new();
253        assert_eq!(
254            cache.record_and_check("link-a", b"sig", "nonce"),
255            ReplayDecision::Fresh
256        );
257        assert_eq!(
258            cache.record_and_check("link-b", b"sig", "nonce"),
259            ReplayDecision::Fresh
260        );
261    }
262
263    #[test]
264    fn fifo_eviction_at_capacity() {
265        let cache = ReplayCache::new();
266        // Fill to capacity.
267        for i in 0..SEEN_VERIFICATIONS_CAPACITY {
268            assert_eq!(
269                cache.record_and_check("link", b"sig", &format!("nonce-{i}")),
270                ReplayDecision::Fresh
271            );
272        }
273        assert_eq!(cache.len(), SEEN_VERIFICATIONS_CAPACITY);
274        // One more push evicts the oldest entry (nonce-0).
275        assert_eq!(
276            cache.record_and_check("link", b"sig", "nonce-new"),
277            ReplayDecision::Fresh
278        );
279        assert_eq!(cache.len(), SEEN_VERIFICATIONS_CAPACITY);
280        // The evicted nonce-0 is now "unseen" again — replay
281        // protection is best-effort, not unbounded.
282        assert_eq!(
283            cache.record_and_check("link", b"sig", "nonce-0"),
284            ReplayDecision::Fresh
285        );
286    }
287
288    #[test]
289    fn length_prefixed_fingerprint_avoids_concatenation_collision() {
290        // ("ab", "c") and ("a", "bc") would have the same byte
291        // concatenation if we didn't length-prefix each field.
292        let fp1 = ReplayCache::fingerprint("ab", b"c", "");
293        let fp2 = ReplayCache::fingerprint("a", b"bc", "");
294        assert_ne!(fp1, fp2);
295    }
296
297    #[test]
298    fn is_empty_starts_true() {
299        let cache = ReplayCache::new();
300        assert!(cache.is_empty());
301        let _ = cache.record_and_check("a", b"b", "c");
302        assert!(!cache.is_empty());
303    }
304
305    // -----------------------------------------------------------------
306    // v0.7.0 #1033 (Agent-5 #4) regression coverage
307    // -----------------------------------------------------------------
308
309    #[test]
310    fn evictions_counter_starts_at_zero_1033() {
311        // Fresh cache reports zero evictions.
312        let cache = ReplayCache::new();
313        assert_eq!(cache.evictions_since_boot(), 0);
314        // Insert below the ceiling — no eviction.
315        for i in 0..16 {
316            let _ = cache.record_and_check("l", b"s", &format!("n{i}"));
317        }
318        assert_eq!(cache.evictions_since_boot(), 0);
319    }
320
321    #[test]
322    fn evictions_counter_bumps_on_capacity_overflow_1033() {
323        // Drive insertions to capacity + N and assert the eviction
324        // counter sees exactly N bumps. Operators page on this metric
325        // to detect the issue's eviction-flush attack vector — non-zero
326        // values mean the cache hit its ceiling and dropped older
327        // fingerprints.
328        //
329        // We don't want a 100 000+ iteration test in the unit suite
330        // (capacity is 100 000 — would be slow). Override behaviour
331        // by reasoning about the contract directly: the FIRST eviction
332        // happens when `order.len() >= CAPACITY` AND a new fingerprint
333        // arrives. We test that at SEEN_VERIFICATIONS_CAPACITY +1
334        // distinct fingerprints, the eviction count is exactly 1.
335        let cache = ReplayCache::new();
336        for i in 0..SEEN_VERIFICATIONS_CAPACITY {
337            assert_eq!(
338                cache.record_and_check("l", b"s", &format!("n{i}")),
339                ReplayDecision::Fresh
340            );
341        }
342        assert_eq!(
343            cache.evictions_since_boot(),
344            0,
345            "no evictions at exactly capacity"
346        );
347        // One more push: the oldest entry is evicted.
348        assert_eq!(
349            cache.record_and_check("l", b"s", "n-new-1"),
350            ReplayDecision::Fresh
351        );
352        assert_eq!(
353            cache.evictions_since_boot(),
354            1,
355            "exactly one eviction at capacity+1"
356        );
357        // Another push: another eviction.
358        assert_eq!(
359            cache.record_and_check("l", b"s", "n-new-2"),
360            ReplayDecision::Fresh
361        );
362        assert_eq!(
363            cache.evictions_since_boot(),
364            2,
365            "two evictions at capacity+2"
366        );
367    }
368
369    #[test]
370    fn o1_lookup_under_sustained_load_1033() {
371        // Pre-#1033 each `record_and_check` ran an O(N)
372        // `VecDeque::iter().any(...)` scan — at 10 000-entry capacity
373        // each insert cost ~10 000 SHA-256 comparisons. The HashSet
374        // membership replacement is O(1). We pin the algorithmic
375        // contract by timing N inserts and asserting the total stays
376        // well below a per-insert ceiling that would FAIL if the
377        // implementation regressed to O(N).
378        //
379        // Concretely: 5 000 inserts in <100 ms total wall-clock on
380        // any supported test host. Pre-#1033 the same workload was
381        // observed at ~5 ms per insert in flame-graph traces (5 000
382        // × 5 ms = 25 s total — well over the 100 ms ceiling). The
383        // new shape is sub-microsecond per insert (HashSet probe +
384        // VecDeque push back); 100 ms is a generous bound that still
385        // catches a regression.
386        let cache = ReplayCache::new();
387        let start = std::time::Instant::now();
388        for i in 0..5_000 {
389            let _ = cache.record_and_check("link", b"sig", &format!("n{i}"));
390        }
391        let elapsed = start.elapsed();
392        assert!(
393            elapsed < std::time::Duration::from_millis(500),
394            "post-#1033: 5000 record_and_check calls MUST complete \
395             in <500ms (HashSet lookup). Pre-#1033 O(N) shape would \
396             take seconds; got {elapsed:?}"
397        );
398    }
399}
400
401// ---------------------------------------------------------------------------
402// v0.7.0 #922 — federation per-peer nonce replay cache
403// ---------------------------------------------------------------------------
404
405use std::collections::HashMap;
406
407/// v0.7.0 #922 — per-peer LRU bound.
408///
409/// v0.7.0 #1061 (Agent-2 #8) — known limitation: the per-peer cap
410/// is 10000 fingerprints with FIFO eviction. An enrolled peer
411/// (or an attacker with a past key compromise) can submit 10001
412/// fresh-nonce signed pushes to evict `nonce-0`, then re-send the
413/// captured `(body, sig, nonce-0)` tuple — no longer in cache,
414/// accepted as fresh. With Ed25519 sigs that never expire, the
415/// replay window stays open for the lifetime of the key.
416///
417/// The v0.7.0 mitigations are:
418///   - Per-peer partitioning (#922): an attacker can only flood
419///     THEIR OWN slot, not cross-peer entries (so the threat is
420///     scoped to compromised-key scenarios, not broad DoS).
421///   - Outer LRU + peer ceiling (#1038): bounds the total memory
422///     footprint at ~320 MB worst-case.
423///   - Cache capacity bumped 10× via #1033 (10000-per-peer slot
424///     size set here).
425///
426/// The deeper v0.8 fix (per Agent-2's recommendation) is to bind
427/// nonce freshness to a strictly-monotonic peer-side counter (or
428/// include a receiver clock-window) so any nonce older than the
429/// highest-seen value for the peer is refused regardless of cache
430/// membership. That requires a protocol change (peer-side
431/// counter persistence + clock-skew handling) and is tracked as
432/// a v0.8 federation hardening follow-up. For v0.7.0 the
433/// flush-attack surface is documented as a KNOWN limitation
434/// gated by per-peer-key compromise.
435pub const FEDERATION_NONCE_CAPACITY_PER_PEER: usize = 10_000;
436
437/// v0.7.0 #1038 (Agent-5 #5) — outer-HashMap LRU bound on the
438/// `FederationNonceCache`. Each enrolled peer's slot costs
439/// ~320 KB (10k × 32-byte fingerprints in both the HashSet and
440/// the VecDeque); a long-lived daemon that rotates peers (operator
441/// adds + removes peers in `AI_MEMORY_FED_PEER_ATTESTATION`)
442/// leaves old peer-id slots resident forever pre-#1038. The
443/// ceiling caps the worst-case footprint at ~320 KB × 1024 =
444/// ~320 MB — well within process budget for any realistic
445/// deployment (operator-scale is ~10-100 peers; we leave 10× headroom).
446/// Eviction picks the least-recently-touched peer when a new peer
447/// pushes past the ceiling.
448pub const FEDERATION_NONCE_MAX_PEERS: usize = 1024;
449
450/// v0.7.0 #1033 (federation parity) — same O(1) `HashSet + VecDeque`
451/// shape as `ReplayCacheInner`, applied per-peer so each peer's
452/// freshness check runs in O(1) instead of the pre-#1033 O(N) linear
453/// scan. The per-peer partitioning (already in place pre-#1033)
454/// limits cross-peer eviction so an attacker can only evict THEIR
455/// OWN entries — a weaker threat than the un-partitioned
456/// ReplayCache, but the perf gain matters under sustained federation
457/// load.
458///
459/// v0.7.0 #1038 — `last_touch` tracks the monotonic counter at the
460/// last `record_and_check` for this peer. The outer LRU evicts the
461/// slot with the smallest `last_touch` when at the
462/// `FEDERATION_NONCE_MAX_PEERS` ceiling. Using a u64 counter
463/// instead of `Instant` keeps the comparison O(1) and the eviction
464/// path lock-free of clock reads.
465#[derive(Debug, Default)]
466struct PeerNonceSlot {
467    seen: HashSet<[u8; 32]>,
468    order: VecDeque<[u8; 32]>,
469    last_touch: u64,
470}
471
472/// v0.7.0 #922 — per-peer bounded FIFO cache of `(peer_id, nonce)`.
473#[derive(Debug, Default)]
474pub struct FederationNonceCache {
475    inner: Mutex<HashMap<String, PeerNonceSlot>>,
476    /// v0.7.0 #1038 — monotonic touch counter. Advances on every
477    /// `record_and_check`; each peer slot stamps its `last_touch`
478    /// with the value at insert/update time. The outer LRU
479    /// eviction picks the slot with the smallest value.
480    touch_counter: std::sync::atomic::AtomicU64,
481    /// v0.7.0 #1038 — cumulative count of peer-slot evictions
482    /// since boot. Non-zero values mean the outer LRU dropped a
483    /// peer to make room — operator-visible via `peer_evictions_since_boot()`.
484    peer_evictions: std::sync::atomic::AtomicU64,
485    /// #1255 (MED, 2026-05-25) — when `Some`, every Fresh
486    /// fingerprint is persisted to the `federation_nonce_cache`
487    /// table in the ai-memory sqlite DB on this path AND the
488    /// cache hydrates from the same table on construction. When
489    /// `None` the cache is in-memory only and a daemon restart
490    /// opens a fresh replay window (pre-#1255 behaviour, preserved
491    /// for test harnesses and for any caller that opts out).
492    db_path: Option<PathBuf>,
493}
494
495/// #1690 — prune the on-disk `federation_nonce_cache` to the newest
496/// `per_peer_cap` rows per peer (deleting the rest), bounding the table
497/// to the same ceiling the in-memory cache enforces. Idempotent: on an
498/// already-bounded table it deletes zero rows. Extracted as a free fn so
499/// it is testable with a small cap without seeding 10k rows. The window
500/// `ROW_NUMBER() OVER (PARTITION BY peer_id ORDER BY last_touch DESC)`
501/// keeps the most-recently-touched rows; the WITHOUT-ROWID PK
502/// `(peer_id, fingerprint)` keys the delete.
503///
504/// # Errors
505/// Propagates the underlying `rusqlite` error on SQL failure.
506fn prune_nonce_cache_to_per_peer_cap(
507    conn: &rusqlite::Connection,
508    per_peer_cap: usize,
509) -> rusqlite::Result<usize> {
510    #[allow(clippy::cast_possible_wrap)]
511    let cap = per_peer_cap as i64;
512    conn.execute(
513        "DELETE FROM federation_nonce_cache
514         WHERE (peer_id, fingerprint) IN (
515             SELECT peer_id, fingerprint FROM (
516                 SELECT peer_id, fingerprint,
517                        ROW_NUMBER() OVER (
518                            PARTITION BY peer_id ORDER BY last_touch DESC
519                        ) AS rn
520                 FROM federation_nonce_cache
521             ) WHERE rn > ?1
522         )",
523        rusqlite::params![cap],
524    )
525}
526
527impl FederationNonceCache {
528    /// Fresh empty cache. In-memory only — the cache resets on every
529    /// daemon restart. Prefer [`Self::new_with_db_persistence`] in
530    /// production: pre-#1255 the in-memory-only cache opened a
531    /// replay window on every restart.
532    #[must_use]
533    pub fn new() -> Self {
534        Self::default()
535    }
536
537    /// #1255 (MED, 2026-05-25) — persistence-enabled constructor.
538    ///
539    /// Opens the ai-memory sqlite DB at `db_path` (runs migrations
540    /// to ensure the `federation_nonce_cache` table is present),
541    /// rehydrates the in-memory cache from the persisted rows
542    /// (oldest `last_touch` first so the in-process LRU ordering
543    /// matches the on-disk order), and arms the cache so every
544    /// subsequent `Fresh` fingerprint is persisted to disk.
545    ///
546    /// Construction errors out if the DB cannot be opened or
547    /// migrated — operators want loud failure here, not a silent
548    /// fallback to in-memory mode that re-opens the replay window.
549    ///
550    /// # Errors
551    ///
552    /// Returns an error if the DB cannot be opened or the load
553    /// query fails.
554    pub fn new_with_db_persistence(db_path: impl Into<PathBuf>) -> anyhow::Result<Self> {
555        let db_path = db_path.into();
556        let cache = Self {
557            inner: Mutex::new(HashMap::new()),
558            touch_counter: AtomicU64::new(0),
559            peer_evictions: AtomicU64::new(0),
560            db_path: Some(db_path.clone()),
561        };
562        cache.hydrate_from_disk(&db_path)?;
563        Ok(cache)
564    }
565
566    /// #1255 — read every persisted `(peer_id, fingerprint,
567    /// last_touch)` triple from disk and seed the in-memory cache.
568    /// Iterates oldest-touch first so the on-disk LRU ordering
569    /// becomes the in-process FIFO ordering for the per-peer
570    /// `VecDeque`s. The post-load `touch_counter` is bumped past
571    /// the largest observed `last_touch` so subsequent inserts
572    /// stay monotonic against the rehydrated state.
573    fn hydrate_from_disk(&self, db_path: &Path) -> anyhow::Result<()> {
574        // Use `crate::db::open` which runs migrations on first open.
575        // This guarantees the `federation_nonce_cache` table exists
576        // even on a pre-v51 DB (the v51 migration is replay-safe).
577        let conn = crate::db::open(db_path)
578            .map_err(|e| anyhow::anyhow!("FederationNonceCache: open ai-memory db: {e}"))?;
579        let mut stmt = conn.prepare(
580            "SELECT peer_id, fingerprint, last_touch
581             FROM federation_nonce_cache
582             ORDER BY last_touch ASC",
583        )?;
584        let mut max_touch: u64 = 0;
585        let rows = stmt.query_map([], |row| {
586            let peer_id: String = row.get(0)?;
587            let fp_bytes: Vec<u8> = row.get(1)?;
588            let last_touch: i64 = row.get(2)?;
589            Ok((peer_id, fp_bytes, last_touch))
590        })?;
591        let mut guard = self
592            .inner
593            .lock()
594            .map_err(|_| anyhow::anyhow!("FederationNonceCache: hydration mutex poisoned"))?;
595        for row in rows {
596            let (peer_id, fp_bytes, last_touch) = row?;
597            // Coerce the 32-byte fingerprint back from the blob.
598            // Rows with non-32-byte blobs are skipped + warn-logged —
599            // they cannot have been produced by any v0.7.x writer,
600            // so they are forensic noise we don't want to crash on.
601            let fp: [u8; 32] = match fp_bytes.as_slice().try_into() {
602                Ok(fp) => fp,
603                Err(_) => {
604                    tracing::warn!(
605                        target: "ai_memory::identity::replay",
606                        peer_id = %peer_id,
607                        len = fp_bytes.len(),
608                        "FederationNonceCache: skipping persisted row with non-32-byte \
609                         fingerprint blob (forensic noise; not produced by any v0.7.x writer)",
610                    );
611                    continue;
612                }
613            };
614            #[allow(clippy::cast_sign_loss)]
615            let touch_u64 = last_touch.max(0) as u64;
616            if touch_u64 > max_touch {
617                max_touch = touch_u64;
618            }
619            let slot = guard.entry(peer_id).or_default();
620            // Honour the per-peer cap on hydration: oldest rows are
621            // dropped silently when the on-disk persistence holds
622            // more rows than `FEDERATION_NONCE_CAPACITY_PER_PEER`.
623            // (Shouldn't happen in practice — the persistence layer
624            // mirrors the in-memory cap — but defensive on operator
625            // hand-rolled DBs.)
626            if slot.order.len() >= FEDERATION_NONCE_CAPACITY_PER_PEER {
627                if let Some(evicted) = slot.order.pop_front() {
628                    slot.seen.remove(&evicted);
629                }
630            }
631            slot.order.push_back(fp);
632            slot.seen.insert(fp);
633            slot.last_touch = touch_u64;
634        }
635        drop(guard);
636        // `rows` was consumed by the `for` loop above; dropping `stmt`
637        // releases its borrow on `conn` before the prune `execute`.
638        drop(stmt);
639
640        // #1690 — repair legacy on-disk bloat from the pre-delete-on-evict
641        // era. Before the eviction-prune fix, the table grew without
642        // bound; delete-on-evict stops FURTHER growth but never shrinks
643        // rows belonging to peers that have since gone silent, so such a
644        // DB would re-scan all of them on every boot. The in-memory cap
645        // above already bounds RAM (per-peer overflow is dropped on
646        // load), so this one-time prune converges the DISK to the same
647        // bound: keep the newest `FEDERATION_NONCE_CAPACITY_PER_PEER`
648        // rows per peer, delete the rest. Idempotent — on an
649        // already-bounded table it deletes zero rows. WITHOUT ROWID PK is
650        // (peer_id, fingerprint), so the window prune keys on that pair.
651        match prune_nonce_cache_to_per_peer_cap(&conn, FEDERATION_NONCE_CAPACITY_PER_PEER) {
652            Ok(0) => {}
653            Ok(n) => tracing::info!(
654                target: "ai_memory::identity::replay",
655                "FederationNonceCache: pruned {n} over-cap disk row(s) on hydration \
656                 (#1690 legacy-bloat repair); disk now bounded to the per-peer cap"
657            ),
658            Err(e) => tracing::warn!(
659                target: "ai_memory::identity::replay",
660                err = %e,
661                "FederationNonceCache: hydration over-cap prune failed (non-fatal; in-memory \
662                 cache still bounded)"
663            ),
664        }
665
666        // Advance the in-process touch counter past every observed
667        // last_touch so the next insert is monotonic.
668        self.touch_counter
669            .store(max_touch.saturating_add(1), Ordering::Relaxed);
670        Ok(())
671    }
672
673    /// #1255 — persist one `(peer_id, fingerprint, last_touch)`
674    /// triple to disk. Called from the Fresh arm of
675    /// `record_and_check` when `db_path.is_some()`. The INSERT OR
676    /// REPLACE shape keeps the row's `last_touch` in lockstep with
677    /// the in-memory cache on every re-touch path (currently the
678    /// `record_and_check` Fresh path only inserts; re-touch on
679    /// existing fingerprints surfaces as `Replay` and skips the
680    /// persistence call, which is fine — the original row remains).
681    /// Persistence errors are warn-logged and swallowed: an
682    /// operator-disk-full or transient db lock failure should not
683    /// be a 500 on every federated push. The in-memory cap still
684    /// holds, so a persistence outage degrades gracefully to
685    /// pre-#1255 behaviour (replay window opens on next restart).
686    fn persist_fingerprint_and_evict(
687        &self,
688        peer_id: &str,
689        fp: &[u8; 32],
690        last_touch: u64,
691        evicted_fp: Option<&[u8; 32]>,
692        evicted_peer: Option<&str>,
693    ) {
694        let Some(path) = self.db_path.as_deref() else {
695            return;
696        };
697        // `crate::db::open` runs migrations + is cheap on a warm
698        // SQLite WAL connection; the persistence rate is bounded by
699        // federated-POST throughput (sub-Hz on any realistic mesh).
700        let conn = match crate::db::open(path) {
701            Ok(c) => c,
702            Err(e) => {
703                tracing::warn!(
704                    target: "ai_memory::identity::replay",
705                    peer_id = %peer_id,
706                    path = %path.display(),
707                    err = %e,
708                    "FederationNonceCache: persist open failed; in-memory cache still holds \
709                     (#1255 graceful degradation)",
710                );
711                return;
712            }
713        };
714        // `i64::try_from` is safe because `touch_counter` advances
715        // at most once per record_and_check; a daemon would need to
716        // sustain >2^63 federated pushes/sec to overflow, which is
717        // not a real shape.
718        #[allow(clippy::cast_possible_wrap)]
719        let last_touch_i64 = last_touch as i64;
720        let now = chrono::Utc::now().to_rfc3339();
721        if let Err(e) = conn.execute(
722            "INSERT OR REPLACE INTO federation_nonce_cache
723             (peer_id, fingerprint, last_touch, inserted_at)
724             VALUES (?1, ?2, ?3, ?4)",
725            rusqlite::params![peer_id, fp.as_slice(), last_touch_i64, now],
726        ) {
727            tracing::warn!(
728                target: "ai_memory::identity::replay",
729                peer_id = %peer_id,
730                err = %e,
731                "FederationNonceCache: persist insert failed; in-memory cache still holds \
732                 (#1255 graceful degradation)",
733            );
734        }
735        // #1690 — delete-on-evict: prune the disk rows the in-memory LRU
736        // just evicted so the table stays bounded by the same ceiling as
737        // the in-memory cache. Best-effort on the SAME connection as the
738        // INSERT above; a failed DELETE only leaves a dead row (the
739        // in-memory cap still bounds the replay check), so it is
740        // warn-logged and swallowed like the INSERT.
741        if let Some(efp) = evicted_fp {
742            if let Err(e) = conn.execute(
743                "DELETE FROM federation_nonce_cache WHERE peer_id = ?1 AND fingerprint = ?2",
744                rusqlite::params![peer_id, efp.as_slice()],
745            ) {
746                tracing::warn!(
747                    target: "ai_memory::identity::replay",
748                    peer_id = %peer_id,
749                    err = %e,
750                    "FederationNonceCache: evicted-fingerprint delete failed; disk row lingers \
751                     (#1690 graceful degradation)",
752                );
753            }
754        }
755        if let Some(ep) = evicted_peer {
756            if let Err(e) = conn.execute(
757                "DELETE FROM federation_nonce_cache WHERE peer_id = ?1",
758                rusqlite::params![ep],
759            ) {
760                tracing::warn!(
761                    target: "ai_memory::identity::replay",
762                    evicted_peer = %ep,
763                    err = %e,
764                    "FederationNonceCache: evicted-peer delete failed; disk rows linger \
765                     (#1690 graceful degradation)",
766                );
767            }
768        }
769    }
770
771    /// Check + record `(peer_id, nonce)`.
772    pub fn record_and_check(&self, peer_id: &str, nonce: &str) -> ReplayDecision {
773        use std::sync::atomic::Ordering;
774        let fp = Self::fingerprint(peer_id, nonce);
775        let mut guard = match self.inner.lock() {
776            Ok(g) => g,
777            Err(p) => p.into_inner(),
778        };
779        // #1690 — capture in-memory evictions so the disk mirror is
780        // pruned in lockstep (delete-on-evict). Without this the disk
781        // `federation_nonce_cache` table grew unbounded: every Fresh
782        // nonce INSERTs a row, but the in-memory LRU evictions (per-peer
783        // FIFO + outer peer-slot LRU) never deleted the disk counterpart,
784        // so a long-lived high-throughput peer accumulated millions of
785        // dead rows while the in-memory cache stayed bounded. Both
786        // deletes ride the SAME connection-open as the persist below
787        // (see `persist_fingerprint_and_evict`), so the hot-path disk
788        // I/O rate is unchanged.
789        let mut evicted_peer: Option<String> = None;
790        // v0.7.0 #1038 — bound the outer HashMap to
791        // `FEDERATION_NONCE_MAX_PEERS`. When the incoming peer is a
792        // NEW entry AND the map is at the ceiling, evict the
793        // least-recently-touched peer (LRU) before inserting.
794        // Skip the eviction when the peer already exists (re-touch
795        // is free).
796        if !guard.contains_key(peer_id) && guard.len() >= FEDERATION_NONCE_MAX_PEERS {
797            // Find the smallest `last_touch` to pick the LRU peer.
798            if let Some((evict_id, _)) = guard
799                .iter()
800                .min_by_key(|(_, s)| s.last_touch)
801                .map(|(k, s)| (k.clone(), s.last_touch))
802            {
803                guard.remove(&evict_id);
804                self.peer_evictions.fetch_add(1, Ordering::Relaxed);
805                tracing::warn!(
806                    target: "ai_memory::identity::replay",
807                    evicted_peer = %evict_id,
808                    "FederationNonceCache: at peer ceiling ({}); evicted LRU peer slot to make \
809                     room. Operator-visible via peer_evictions_since_boot() (#1038).",
810                    FEDERATION_NONCE_MAX_PEERS,
811                );
812                evicted_peer = Some(evict_id);
813            }
814        }
815        let touch = self.touch_counter.fetch_add(1, Ordering::Relaxed);
816        let slot = guard.entry(peer_id.to_string()).or_default();
817        slot.last_touch = touch;
818        // v0.7.0 #1033 — O(1) HashSet membership replaces O(N) scan.
819        if slot.seen.contains(&fp) {
820            return ReplayDecision::Replay;
821        }
822        let mut evicted_fp: Option<[u8; 32]> = None;
823        if slot.order.len() >= FEDERATION_NONCE_CAPACITY_PER_PEER {
824            // Keep `seen` + `order` in lockstep on FIFO eviction.
825            if let Some(evicted) = slot.order.pop_front() {
826                slot.seen.remove(&evicted);
827                evicted_fp = Some(evicted);
828            }
829        }
830        slot.order.push_back(fp);
831        slot.seen.insert(fp);
832        // Release the inner mutex before doing disk I/O so a slow
833        // SQLite WAL fsync doesn't block sibling
834        // `record_and_check` calls. The persistence call itself
835        // opens its own connection (no shared state).
836        drop(guard);
837        // #1255 — persist the new fingerprint to disk so a daemon
838        // restart doesn't re-open the replay window. #1690 — and prune
839        // the rows the in-memory cache just evicted so the disk mirror
840        // stays bounded. Both ride one connection-open; failures are
841        // warn-logged and swallowed (graceful degradation to the
842        // in-memory-only pre-#1255 posture).
843        self.persist_fingerprint_and_evict(
844            peer_id,
845            &fp,
846            touch,
847            evicted_fp.as_ref(),
848            evicted_peer.as_deref(),
849        );
850        ReplayDecision::Fresh
851    }
852
853    /// v0.7.0 #1038 — cumulative number of peer-slot evictions
854    /// (outer LRU). Non-zero means peer churn caused the outer
855    /// HashMap to hit `FEDERATION_NONCE_MAX_PEERS` and drop an
856    /// older peer's slot. Operators page on sustained growth.
857    #[must_use]
858    pub fn peer_evictions_since_boot(&self) -> u64 {
859        self.peer_evictions
860            .load(std::sync::atomic::Ordering::Relaxed)
861    }
862
863    /// Distinct peers with at least one cached fingerprint.
864    #[must_use]
865    pub fn peer_count(&self) -> usize {
866        self.inner.lock().map(|g| g.len()).unwrap_or(0)
867    }
868
869    /// Cached fingerprints for `peer_id`.
870    #[must_use]
871    pub fn len_for_peer(&self, peer_id: &str) -> usize {
872        self.inner
873            .lock()
874            .map(|g| g.get(peer_id).map_or(0, |s| s.order.len()))
875            .unwrap_or(0)
876    }
877
878    fn fingerprint(peer_id: &str, nonce: &str) -> [u8; 32] {
879        let mut hasher = Sha256::new();
880        let pid = peer_id.as_bytes();
881        let non = nonce.as_bytes();
882        #[allow(clippy::cast_possible_truncation)]
883        hasher.update((pid.len() as u32).to_be_bytes());
884        hasher.update(pid);
885        #[allow(clippy::cast_possible_truncation)]
886        hasher.update((non.len() as u32).to_be_bytes());
887        hasher.update(non);
888        hasher.finalize().into()
889    }
890}
891
892#[cfg(test)]
893mod federation_nonce_cache_tests {
894    use super::*;
895
896    #[test]
897    fn first_seen_returns_fresh() {
898        let cache = FederationNonceCache::new();
899        assert_eq!(cache.record_and_check("p", "n"), ReplayDecision::Fresh);
900        assert_eq!(cache.len_for_peer("p"), 1);
901    }
902
903    #[test]
904    fn exact_repeat_returns_replay() {
905        let cache = FederationNonceCache::new();
906        assert_eq!(cache.record_and_check("p", "n"), ReplayDecision::Fresh);
907        assert_eq!(cache.record_and_check("p", "n"), ReplayDecision::Replay);
908        assert_eq!(cache.len_for_peer("p"), 1);
909    }
910
911    #[test]
912    fn different_peers_can_use_same_nonce() {
913        let cache = FederationNonceCache::new();
914        assert_eq!(cache.record_and_check("a", "s"), ReplayDecision::Fresh);
915        assert_eq!(cache.record_and_check("b", "s"), ReplayDecision::Fresh);
916        assert_eq!(cache.peer_count(), 2);
917    }
918
919    #[test]
920    fn fifo_eviction_at_per_peer_capacity() {
921        let cache = FederationNonceCache::new();
922        for i in 0..FEDERATION_NONCE_CAPACITY_PER_PEER {
923            assert_eq!(
924                cache.record_and_check("p", &format!("n-{i}")),
925                ReplayDecision::Fresh
926            );
927        }
928        assert_eq!(cache.len_for_peer("p"), FEDERATION_NONCE_CAPACITY_PER_PEER);
929        assert_eq!(cache.record_and_check("p", "n-new"), ReplayDecision::Fresh);
930        assert_eq!(cache.record_and_check("p", "n-0"), ReplayDecision::Fresh);
931    }
932
933    #[test]
934    fn peer_count_evictions_counter_starts_at_zero_1038() {
935        // v0.7.0 #1038 — fresh cache reports zero peer-slot evictions.
936        let cache = FederationNonceCache::new();
937        assert_eq!(cache.peer_evictions_since_boot(), 0);
938        // Insert below the peer ceiling — no eviction.
939        for i in 0..32 {
940            let _ = cache.record_and_check(&format!("peer-{i}"), "n");
941        }
942        assert_eq!(cache.peer_count(), 32);
943        assert_eq!(cache.peer_evictions_since_boot(), 0);
944    }
945
946    #[test]
947    fn outer_lru_evicts_least_recently_touched_at_ceiling_1038() {
948        // v0.7.0 #1038 (Agent-5 #5) — when the FederationNonceCache
949        // HashMap hits FEDERATION_NONCE_MAX_PEERS, a NEW peer's
950        // insert evicts the least-recently-touched peer slot.
951        // Pre-#1038 the HashMap was unbounded; a daemon that rotated
952        // peers (operator config churn) accumulated ~320 KB per
953        // ever-enrolled peer indefinitely.
954        let cache = FederationNonceCache::new();
955        // Fill to exactly the peer ceiling.
956        for i in 0..FEDERATION_NONCE_MAX_PEERS {
957            let _ = cache.record_and_check(&format!("peer-{i}"), "n");
958        }
959        assert_eq!(cache.peer_count(), FEDERATION_NONCE_MAX_PEERS);
960        assert_eq!(cache.peer_evictions_since_boot(), 0);
961        // Touch peer-0 to make it the most-recently-touched
962        // (advances its last_touch); peer-1 is now the LRU
963        // candidate.
964        let _ = cache.record_and_check("peer-0", "n2");
965        // Push a NEW peer past the ceiling — peer-1 (the LRU)
966        // should be evicted.
967        assert_eq!(
968            cache.record_and_check("peer-new", "n"),
969            ReplayDecision::Fresh
970        );
971        assert_eq!(
972            cache.peer_count(),
973            FEDERATION_NONCE_MAX_PEERS,
974            "#1038: at ceiling the outer HashMap must stay at FEDERATION_NONCE_MAX_PEERS"
975        );
976        assert_eq!(
977            cache.peer_evictions_since_boot(),
978            1,
979            "#1038: exactly one peer-slot eviction must have fired"
980        );
981        // peer-1 (LRU) is gone — recording for it again returns
982        // Fresh (the cache forgot the prior fingerprints).
983        assert_eq!(cache.len_for_peer("peer-1"), 0);
984        // peer-0 (recently touched) is still present.
985        assert!(cache.len_for_peer("peer-0") > 0);
986    }
987
988    #[test]
989    fn re_touch_existing_peer_does_not_trigger_eviction_1038() {
990        // v0.7.0 #1038 — re-touching an existing peer at the
991        // ceiling MUST NOT trigger an eviction (LRU bookkeeping
992        // only fires on NEW peer inserts past the ceiling).
993        let cache = FederationNonceCache::new();
994        for i in 0..FEDERATION_NONCE_MAX_PEERS {
995            let _ = cache.record_and_check(&format!("peer-{i}"), "n");
996        }
997        let before = cache.peer_evictions_since_boot();
998        // Re-touch every existing peer — no NEW peer inserts.
999        for i in 0..FEDERATION_NONCE_MAX_PEERS {
1000            let _ = cache.record_and_check(&format!("peer-{i}"), &format!("n2-{i}"));
1001        }
1002        assert_eq!(
1003            cache.peer_evictions_since_boot(),
1004            before,
1005            "#1038: re-touching existing peers MUST NOT trigger LRU eviction"
1006        );
1007        assert_eq!(cache.peer_count(), FEDERATION_NONCE_MAX_PEERS);
1008    }
1009
1010    /// #1255 (MED, 2026-05-25) — regression: a nonce that landed in
1011    /// the cache before a daemon restart must STILL be rejected as
1012    /// a replay after the restart. Pre-#1255 every restart opened a
1013    /// fresh in-memory window, so any captured `(body, sig, nonce)`
1014    /// tuple could be replayed once the daemon bounced.
1015    ///
1016    /// Simulates the restart by dropping the first
1017    /// `FederationNonceCache` and constructing a second one against
1018    /// the SAME `db_path`. The hydration step on the second cache
1019    /// reloads every persisted fingerprint, so the same `(peer_id,
1020    /// nonce)` MUST surface as `Replay` on the second cache.
1021    #[test]
1022    fn issue_1255_nonce_persists_across_recreated_cache() {
1023        let tmp = tempfile::NamedTempFile::new().expect("tempfile");
1024        let db_path = tmp.path().to_path_buf();
1025
1026        // First cache — accept the nonce as Fresh, persisting it.
1027        let cache_a = FederationNonceCache::new_with_db_persistence(&db_path)
1028            .expect("first cache must open the DB and run v51 migration");
1029        assert_eq!(
1030            cache_a.record_and_check("peer-1255", "n-1255"),
1031            ReplayDecision::Fresh,
1032            "#1255: first observation of (peer, nonce) is Fresh"
1033        );
1034        // A second observation in the SAME process is Replay
1035        // (in-memory cache holds independent of disk persistence).
1036        assert_eq!(
1037            cache_a.record_and_check("peer-1255", "n-1255"),
1038            ReplayDecision::Replay,
1039            "#1255: in-process re-observation is Replay (sanity)"
1040        );
1041        drop(cache_a);
1042
1043        // Second cache — simulate daemon restart against the same
1044        // DB. Hydration must replay the persisted fingerprint into
1045        // the in-memory set so the SAME (peer, nonce) is REJECTED.
1046        let cache_b = FederationNonceCache::new_with_db_persistence(&db_path)
1047            .expect("second cache must hydrate from the same DB");
1048        assert_eq!(
1049            cache_b.record_and_check("peer-1255", "n-1255"),
1050            ReplayDecision::Replay,
1051            "#1255: persistence is load-bearing — a daemon restart must NOT \
1052             reopen the replay window for a previously-seen nonce"
1053        );
1054        // A NEW (peer, nonce) under the second cache is Fresh — the
1055        // hydration didn't accidentally over-block.
1056        assert_eq!(
1057            cache_b.record_and_check("peer-1255", "n-different"),
1058            ReplayDecision::Fresh,
1059            "#1255: hydration must NOT over-block on unrelated nonces"
1060        );
1061        // The hydrated cache still tracks at least the one peer
1062        // from before (sanity on `len_for_peer`).
1063        assert!(
1064            cache_b.len_for_peer("peer-1255") >= 1,
1065            "#1255: hydrated cache must retain the persisted fingerprint count"
1066        );
1067    }
1068
1069    /// #1690 — delete-on-evict keeps the disk `federation_nonce_cache`
1070    /// table bounded by the same ceiling as the in-memory LRU. Pre-fix
1071    /// the table was INSERT-only, so a long-lived high-throughput peer
1072    /// grew it without bound while the in-memory cache stayed capped.
1073    /// Drives the private persist+evict path directly (the per-peer FIFO
1074    /// cap is 10k and the outer cap is 1024, both too large to trigger
1075    /// via `record_and_check` in a unit test) and counts disk rows.
1076    #[test]
1077    fn issue_1690_eviction_prunes_disk_mirror() {
1078        fn disk_row_count(path: &std::path::Path) -> i64 {
1079            let conn = crate::db::open(path).expect("open nonce db");
1080            conn.query_row("SELECT COUNT(*) FROM federation_nonce_cache", [], |r| {
1081                r.get(0)
1082            })
1083            .expect("count rows")
1084        }
1085
1086        let tmp = tempfile::NamedTempFile::new().expect("tempfile");
1087        let db_path = tmp.path().to_path_buf();
1088        let cache = FederationNonceCache::new_with_db_persistence(&db_path)
1089            .expect("open + migrate nonce db");
1090
1091        // Two fresh fingerprints for one peer → two disk rows.
1092        let fp_a = FederationNonceCache::fingerprint("peer-x", "nonce-a");
1093        let fp_b = FederationNonceCache::fingerprint("peer-x", "nonce-b");
1094        cache.persist_fingerprint_and_evict("peer-x", &fp_a, 1, None, None);
1095        cache.persist_fingerprint_and_evict("peer-x", &fp_b, 2, None, None);
1096        assert_eq!(disk_row_count(&db_path), 2, "two inserts → two rows");
1097
1098        // Per-peer FIFO eviction: inserting fp_c while evicting fp_a must
1099        // delete fp_a's disk row → still 2 rows (fp_b + fp_c), not 3.
1100        let fp_c = FederationNonceCache::fingerprint("peer-x", "nonce-c");
1101        cache.persist_fingerprint_and_evict("peer-x", &fp_c, 3, Some(&fp_a), None);
1102        assert_eq!(
1103            disk_row_count(&db_path),
1104            2,
1105            "#1690: a per-peer FIFO eviction must delete the evicted disk row"
1106        );
1107
1108        // Add a second peer, then an outer-LRU peer eviction of peer-x
1109        // must wipe ALL of peer-x's disk rows.
1110        let fp_y = FederationNonceCache::fingerprint("peer-y", "n");
1111        cache.persist_fingerprint_and_evict("peer-y", &fp_y, 4, None, None);
1112        assert_eq!(disk_row_count(&db_path), 3, "peer-y row added → three rows");
1113        let fp_z = FederationNonceCache::fingerprint("peer-z", "n");
1114        cache.persist_fingerprint_and_evict("peer-z", &fp_z, 5, None, Some("peer-x"));
1115        assert_eq!(
1116            disk_row_count(&db_path),
1117            2,
1118            "#1690: an outer-LRU peer eviction must delete every disk row for that peer \
1119             (peer-x's 2 rows gone, peer-y + peer-z remain)"
1120        );
1121    }
1122
1123    #[test]
1124    fn issue_1690_prune_nonce_cache_keeps_newest_per_peer() {
1125        // #1690 — the hydration-time legacy-bloat repair keeps the newest
1126        // `cap` rows per peer and deletes the rest. Tested with a small
1127        // cap so we don't seed 10k rows.
1128        let tmp = tempfile::NamedTempFile::new().expect("tempfile");
1129        let conn = crate::db::open(tmp.path()).expect("open + migrate");
1130        // peer-a: 4 rows (last_touch 1..4); peer-b: 2 rows (5..6).
1131        for (peer, touch) in [
1132            ("peer-a", 1),
1133            ("peer-a", 2),
1134            ("peer-a", 3),
1135            ("peer-a", 4),
1136            ("peer-b", 5),
1137            ("peer-b", 6),
1138        ] {
1139            let fp = FederationNonceCache::fingerprint(peer, &format!("n{touch}"));
1140            conn.execute(
1141                "INSERT INTO federation_nonce_cache (peer_id, fingerprint, last_touch, inserted_at)
1142                 VALUES (?1, ?2, ?3, '2026-01-01T00:00:00Z')",
1143                rusqlite::params![peer, fp.as_slice(), touch],
1144            )
1145            .unwrap();
1146        }
1147        // Prune to cap=2 per peer: peer-a drops its 2 oldest (touch 1,2),
1148        // peer-b is already within cap (untouched).
1149        let deleted = prune_nonce_cache_to_per_peer_cap(&conn, 2).expect("prune");
1150        assert_eq!(deleted, 2, "#1690: peer-a's 2 over-cap rows deleted");
1151        let a_left: i64 = conn
1152            .query_row(
1153                "SELECT COUNT(*) FROM federation_nonce_cache WHERE peer_id='peer-a'",
1154                [],
1155                |r| r.get(0),
1156            )
1157            .unwrap();
1158        let b_left: i64 = conn
1159            .query_row(
1160                "SELECT COUNT(*) FROM federation_nonce_cache WHERE peer_id='peer-b'",
1161                [],
1162                |r| r.get(0),
1163            )
1164            .unwrap();
1165        assert_eq!(a_left, 2, "peer-a bounded to cap");
1166        assert_eq!(b_left, 2, "peer-b within cap, untouched");
1167        // The newest peer-a rows (touch 3,4) survived; the oldest (1,2) went.
1168        let min_touch_a: i64 = conn
1169            .query_row(
1170                "SELECT MIN(last_touch) FROM federation_nonce_cache WHERE peer_id='peer-a'",
1171                [],
1172                |r| r.get(0),
1173            )
1174            .unwrap();
1175        assert_eq!(min_touch_a, 3, "the oldest rows were the ones pruned");
1176        // Idempotent: a second prune deletes nothing.
1177        assert_eq!(prune_nonce_cache_to_per_peer_cap(&conn, 2).unwrap(), 0);
1178    }
1179
1180    /// #1255 — graceful degradation: persistence open errors do NOT
1181    /// crash the cache. A broken DB path surfaces as a
1182    /// constructor-time `Err`; callers (today: only the production
1183    /// daemon bootstrap) get a clear error and fall back to either
1184    /// retrying with the right path OR booting with the in-memory
1185    /// constructor [`Self::new`].
1186    #[test]
1187    fn issue_1255_persistence_constructor_surfaces_open_errors() {
1188        // Point at a path that cannot exist as a sqlite DB (a directory).
1189        let dir = tempfile::TempDir::new().unwrap();
1190        // Passing the directory itself as a path. SQLite's `open_with_flags`
1191        // refuses to open a directory as a database file.
1192        let res = FederationNonceCache::new_with_db_persistence(dir.path().to_path_buf());
1193        assert!(
1194            res.is_err(),
1195            "#1255: a non-DB path must surface as a constructor Err so operators \
1196             see the persistence failure rather than silently falling back"
1197        );
1198    }
1199}