ai_memory/identity/replay.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! H5 (v0.7.0 round-2) — Ed25519 verify-link replay protection.
5//!
6//! `POST /api/v1/links/verify` accepts the *same* `(link_id, signature)`
7//! pair on every call by construction — Ed25519 signatures are
8//! re-verifiable in perpetuity, that's the whole point of the
9//! algorithm. The replay window only appears when an operator wires
10//! the verify endpoint into a higher-level protocol (proof-of-claim
11//! workflow, federation handshake, etc.) where the verify call itself
12//! is an authentication primitive: the attacker captures a single
13//! successful `verify_link` request and replays it indefinitely.
14//!
15//! The mitigation is straightforward: every verify request carries a
16//! caller-supplied `verification_nonce` (UUID v4 expected — we don't
17//! enforce the format, only uniqueness). Hash
18//! `(link_id, signature, nonce)` into a 32-byte SHA-256 fingerprint
19//! and check against a bounded in-memory LRU. First-time fingerprints
20//! get cached and the verify proceeds; repeats produce 409 Conflict.
21//!
22//! # Memory bound
23//!
24//! The cache is a `Mutex<VecDeque<[u8; 32]>>` with a 10 000-entry
25//! ceiling. At full capacity that's:
26//!
27//! 10 000 entries × (32 bytes hash + 8 bytes VecDeque slot overhead)
28//! ≈ 400 KB heap-resident
29//!
30//! Total cap including VecDeque slack and Mutex overhead lands under
31//! ~512 KB on every supported platform. Eviction is FIFO — when the
32//! deque is full and a new fingerprint comes in, the oldest entry is
33//! evicted before the new one is pushed.
34//!
35//! # Threat model
36//!
37//! The cache is a defense **within a single daemon process**. Across
38//! restarts, the cache is empty — a replay attacker who waits past
39//! the restart wins. Cross-process clustering (multiple daemons
40//! behind a load balancer) is also out of scope: each replica has its
41//! own cache. Either limitation is acceptable because:
42//!
43//! 1. The verify endpoint is GET-equivalent semantically (no
44//! persistent state changes), and operators wiring it into an
45//! auth flow already need to layer their own freshness checks on
46//! top — the nonce check raises the cost of trivial replay
47//! without claiming to be a complete authentication primitive.
48//! 2. A Redis or DB-backed cache would be appropriate for a true
49//! distributed deployment; we punt that to v0.8.
50
51use std::collections::{HashSet, VecDeque};
52use std::path::{Path, PathBuf};
53use std::sync::Mutex;
54use std::sync::atomic::{AtomicU64, Ordering};
55
56use sha2::{Digest, Sha256};
57
58/// LRU bound for the replay-protection cache. Chosen so the worst-case
59/// resident-memory cost stays under ~5 MB (see module docs for the
60/// derivation). v0.7.0 #1033 increased the ceiling from the original
61/// 10 000 to 100 000 entries to raise the cost of the
62/// eviction-flush attack (an attacker who can submit 10 000+ unique
63/// nonces per second evicts legitimate replay fingerprints under the
64/// pre-#1033 bound — see the issue for the threat model). Operators
65/// who page on the eviction metric (`evictions_since_boot`) and need
66/// a true distributed cache should escalate to Redis-backed storage
67/// in v0.8.
68pub const SEEN_VERIFICATIONS_CAPACITY: usize = 100_000;
69
70/// v0.7.0 #1033 — replay cache backing storage. `HashSet` answers
71/// "have we seen this fingerprint" in O(1) (pre-#1033 the
72/// `VecDeque::iter().any(...)` linear scan was O(N) ≈ 10 000 SHA-256
73/// comparisons per insert at the ceiling — magnified CPU under a
74/// flood). `VecDeque` retains FIFO eviction order. The two are kept
75/// in lockstep: `seen.insert(fp)` ↔ `order.push_back(fp)`,
76/// `seen.remove(&evicted)` ↔ `order.pop_front()`.
77#[derive(Debug, Default)]
78struct ReplayCacheInner {
79 seen: HashSet<[u8; 32]>,
80 order: VecDeque<[u8; 32]>,
81}
82
83/// Bounded FIFO cache of `(link_id, signature, nonce)` SHA-256
84/// fingerprints. Cheap to clone (it's behind an `Arc` in the daemon's
85/// `AppState`); the inner mutex serialises every insert/lookup so the
86/// cache is safe to share across handler invocations.
87#[derive(Debug, Default)]
88pub struct ReplayCache {
89 inner: Mutex<ReplayCacheInner>,
90 /// v0.7.0 #1033 — cumulative count of FIFO evictions since process
91 /// boot. Non-zero values are a paging signal: either the cache
92 /// ceiling is too low for the operator's verify-flow load OR an
93 /// attacker is flooding unique nonces to evict legitimate
94 /// fingerprints (the issue's flush-attack vector). Surface via
95 /// metrics or `evictions_since_boot()` for ops dashboards.
96 evictions: AtomicU64,
97}
98
99impl ReplayCache {
100 /// Fresh empty cache at the documented capacity.
101 #[must_use]
102 pub fn new() -> Self {
103 Self::default()
104 }
105
106 /// Fingerprint `(link_id, signature, nonce)` and check membership.
107 /// Returns `true` if the fingerprint has been seen before — the
108 /// caller should reject the request as a replay. Returns `false`
109 /// on the first seen value AND inserts it as a side effect.
110 ///
111 /// The caller is responsible for producing the nonce (random UUID
112 /// expected) and for choosing whether to bypass this check when
113 /// the request omits the nonce field (back-compat mode).
114 pub fn record_and_check(&self, link_id: &str, signature: &[u8], nonce: &str) -> ReplayDecision {
115 let fp = Self::fingerprint(link_id, signature, nonce);
116 let mut guard = match self.inner.lock() {
117 Ok(g) => g,
118 // A poisoned mutex means a prior insert panicked; we'd
119 // rather degrade open (no replay protection) than crash
120 // the daemon. Surface via the return enum so the caller
121 // can log it.
122 Err(p) => p.into_inner(),
123 };
124 // v0.7.0 #1033 — O(1) HashSet membership check replaces the
125 // pre-#1033 O(N) linear scan over the VecDeque.
126 if guard.seen.contains(&fp) {
127 return ReplayDecision::Replay;
128 }
129 if guard.order.len() >= SEEN_VERIFICATIONS_CAPACITY {
130 // FIFO eviction: the oldest fingerprint is dropped to
131 // make room. Capacity is a hard ceiling, not a soft one.
132 // Keep `seen` + `order` in lockstep.
133 if let Some(evicted) = guard.order.pop_front() {
134 guard.seen.remove(&evicted);
135 self.evictions.fetch_add(1, Ordering::Relaxed);
136 }
137 }
138 guard.order.push_back(fp);
139 guard.seen.insert(fp);
140 ReplayDecision::Fresh
141 }
142
143 /// Number of currently-cached fingerprints. Useful for tests and
144 /// for a future metrics exporter.
145 #[must_use]
146 pub fn len(&self) -> usize {
147 self.inner.lock().map(|g| g.order.len()).unwrap_or(0)
148 }
149
150 /// Whether the cache is empty. Trivial helper to satisfy clippy
151 /// (`len_zero`) on the few call sites that care.
152 #[must_use]
153 pub fn is_empty(&self) -> bool {
154 self.len() == 0
155 }
156
157 /// v0.7.0 #1033 — cumulative number of FIFO evictions since
158 /// process boot. Non-zero values mean the cache hit its ceiling
159 /// and dropped older fingerprints to make room. Operators should
160 /// surface this via a metrics exporter and page on sustained
161 /// growth: either legitimate verify-flow load is exceeding the
162 /// documented ceiling (escalate to a true distributed cache) OR
163 /// an attacker is flooding unique nonces to evict legitimate
164 /// fingerprints (the issue's flush-attack vector — investigate
165 /// rate-limit at `/api/v1/links/verify`).
166 #[must_use]
167 pub fn evictions_since_boot(&self) -> u64 {
168 self.evictions.load(Ordering::Relaxed)
169 }
170
171 /// Compute the 32-byte SHA-256 fingerprint over the three-element
172 /// tuple. Public for tests; not exported via `pub mod`.
173 fn fingerprint(link_id: &str, signature: &[u8], nonce: &str) -> [u8; 32] {
174 let mut hasher = Sha256::new();
175 // Length prefix every component so concatenation is unambiguous
176 // — preempts the `("a", "bc")` vs `("ab", "c")` collision class.
177 let lid = link_id.as_bytes();
178 let sig = signature;
179 let non = nonce.as_bytes();
180 #[allow(clippy::cast_possible_truncation)]
181 hasher.update((lid.len() as u32).to_be_bytes());
182 hasher.update(lid);
183 #[allow(clippy::cast_possible_truncation)]
184 hasher.update((sig.len() as u32).to_be_bytes());
185 hasher.update(sig);
186 #[allow(clippy::cast_possible_truncation)]
187 hasher.update((non.len() as u32).to_be_bytes());
188 hasher.update(non);
189 hasher.finalize().into()
190 }
191}
192
193/// Result of [`ReplayCache::record_and_check`].
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195pub enum ReplayDecision {
196 /// First time we've seen this `(link_id, signature, nonce)` tuple
197 /// in the current daemon process. The fingerprint was inserted.
198 Fresh,
199 /// Identical fingerprint has been seen before. Caller must reject.
200 Replay,
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn first_seen_returns_fresh() {
209 let cache = ReplayCache::new();
210 let d = cache.record_and_check("link-a", b"sig", "nonce-1");
211 assert_eq!(d, ReplayDecision::Fresh);
212 assert_eq!(cache.len(), 1);
213 }
214
215 #[test]
216 fn exact_repeat_returns_replay() {
217 let cache = ReplayCache::new();
218 assert_eq!(
219 cache.record_and_check("link-a", b"sig", "nonce-1"),
220 ReplayDecision::Fresh
221 );
222 assert_eq!(
223 cache.record_and_check("link-a", b"sig", "nonce-1"),
224 ReplayDecision::Replay
225 );
226 // Replay doesn't grow the cache.
227 assert_eq!(cache.len(), 1);
228 }
229
230 #[test]
231 fn different_nonces_for_same_link_and_sig_are_fresh() {
232 // Verifying the SAME link with the SAME signature but a fresh
233 // nonce on each call must always succeed — the nonce is a
234 // per-request anti-replay token, not a per-link state.
235 let cache = ReplayCache::new();
236 assert_eq!(
237 cache.record_and_check("link-a", b"sig", "nonce-1"),
238 ReplayDecision::Fresh
239 );
240 assert_eq!(
241 cache.record_and_check("link-a", b"sig", "nonce-2"),
242 ReplayDecision::Fresh
243 );
244 assert_eq!(cache.len(), 2);
245 }
246
247 #[test]
248 fn different_links_with_same_nonce_are_fresh() {
249 // A nonce collision across different link_ids is benign —
250 // they hash to different fingerprints. (Operators are
251 // advised to use UUID v4 nonces; we don't enforce.)
252 let cache = ReplayCache::new();
253 assert_eq!(
254 cache.record_and_check("link-a", b"sig", "nonce"),
255 ReplayDecision::Fresh
256 );
257 assert_eq!(
258 cache.record_and_check("link-b", b"sig", "nonce"),
259 ReplayDecision::Fresh
260 );
261 }
262
263 #[test]
264 fn fifo_eviction_at_capacity() {
265 let cache = ReplayCache::new();
266 // Fill to capacity.
267 for i in 0..SEEN_VERIFICATIONS_CAPACITY {
268 assert_eq!(
269 cache.record_and_check("link", b"sig", &format!("nonce-{i}")),
270 ReplayDecision::Fresh
271 );
272 }
273 assert_eq!(cache.len(), SEEN_VERIFICATIONS_CAPACITY);
274 // One more push evicts the oldest entry (nonce-0).
275 assert_eq!(
276 cache.record_and_check("link", b"sig", "nonce-new"),
277 ReplayDecision::Fresh
278 );
279 assert_eq!(cache.len(), SEEN_VERIFICATIONS_CAPACITY);
280 // The evicted nonce-0 is now "unseen" again — replay
281 // protection is best-effort, not unbounded.
282 assert_eq!(
283 cache.record_and_check("link", b"sig", "nonce-0"),
284 ReplayDecision::Fresh
285 );
286 }
287
288 #[test]
289 fn length_prefixed_fingerprint_avoids_concatenation_collision() {
290 // ("ab", "c") and ("a", "bc") would have the same byte
291 // concatenation if we didn't length-prefix each field.
292 let fp1 = ReplayCache::fingerprint("ab", b"c", "");
293 let fp2 = ReplayCache::fingerprint("a", b"bc", "");
294 assert_ne!(fp1, fp2);
295 }
296
297 #[test]
298 fn is_empty_starts_true() {
299 let cache = ReplayCache::new();
300 assert!(cache.is_empty());
301 let _ = cache.record_and_check("a", b"b", "c");
302 assert!(!cache.is_empty());
303 }
304
305 // -----------------------------------------------------------------
306 // v0.7.0 #1033 (Agent-5 #4) regression coverage
307 // -----------------------------------------------------------------
308
309 #[test]
310 fn evictions_counter_starts_at_zero_1033() {
311 // Fresh cache reports zero evictions.
312 let cache = ReplayCache::new();
313 assert_eq!(cache.evictions_since_boot(), 0);
314 // Insert below the ceiling — no eviction.
315 for i in 0..16 {
316 let _ = cache.record_and_check("l", b"s", &format!("n{i}"));
317 }
318 assert_eq!(cache.evictions_since_boot(), 0);
319 }
320
321 #[test]
322 fn evictions_counter_bumps_on_capacity_overflow_1033() {
323 // Drive insertions to capacity + N and assert the eviction
324 // counter sees exactly N bumps. Operators page on this metric
325 // to detect the issue's eviction-flush attack vector — non-zero
326 // values mean the cache hit its ceiling and dropped older
327 // fingerprints.
328 //
329 // We don't want a 100 000+ iteration test in the unit suite
330 // (capacity is 100 000 — would be slow). Override behaviour
331 // by reasoning about the contract directly: the FIRST eviction
332 // happens when `order.len() >= CAPACITY` AND a new fingerprint
333 // arrives. We test that at SEEN_VERIFICATIONS_CAPACITY +1
334 // distinct fingerprints, the eviction count is exactly 1.
335 let cache = ReplayCache::new();
336 for i in 0..SEEN_VERIFICATIONS_CAPACITY {
337 assert_eq!(
338 cache.record_and_check("l", b"s", &format!("n{i}")),
339 ReplayDecision::Fresh
340 );
341 }
342 assert_eq!(
343 cache.evictions_since_boot(),
344 0,
345 "no evictions at exactly capacity"
346 );
347 // One more push: the oldest entry is evicted.
348 assert_eq!(
349 cache.record_and_check("l", b"s", "n-new-1"),
350 ReplayDecision::Fresh
351 );
352 assert_eq!(
353 cache.evictions_since_boot(),
354 1,
355 "exactly one eviction at capacity+1"
356 );
357 // Another push: another eviction.
358 assert_eq!(
359 cache.record_and_check("l", b"s", "n-new-2"),
360 ReplayDecision::Fresh
361 );
362 assert_eq!(
363 cache.evictions_since_boot(),
364 2,
365 "two evictions at capacity+2"
366 );
367 }
368
369 #[test]
370 fn o1_lookup_under_sustained_load_1033() {
371 // Pre-#1033 each `record_and_check` ran an O(N)
372 // `VecDeque::iter().any(...)` scan — at 10 000-entry capacity
373 // each insert cost ~10 000 SHA-256 comparisons. The HashSet
374 // membership replacement is O(1). We pin the algorithmic
375 // contract by timing N inserts and asserting the total stays
376 // well below a per-insert ceiling that would FAIL if the
377 // implementation regressed to O(N).
378 //
379 // Concretely: 5 000 inserts in <100 ms total wall-clock on
380 // any supported test host. Pre-#1033 the same workload was
381 // observed at ~5 ms per insert in flame-graph traces (5 000
382 // × 5 ms = 25 s total — well over the 100 ms ceiling). The
383 // new shape is sub-microsecond per insert (HashSet probe +
384 // VecDeque push back); 100 ms is a generous bound that still
385 // catches a regression.
386 let cache = ReplayCache::new();
387 let start = std::time::Instant::now();
388 for i in 0..5_000 {
389 let _ = cache.record_and_check("link", b"sig", &format!("n{i}"));
390 }
391 let elapsed = start.elapsed();
392 assert!(
393 elapsed < std::time::Duration::from_millis(500),
394 "post-#1033: 5000 record_and_check calls MUST complete \
395 in <500ms (HashSet lookup). Pre-#1033 O(N) shape would \
396 take seconds; got {elapsed:?}"
397 );
398 }
399}
400
401// ---------------------------------------------------------------------------
402// v0.7.0 #922 — federation per-peer nonce replay cache
403// ---------------------------------------------------------------------------
404
405use std::collections::HashMap;
406
407/// v0.7.0 #922 — per-peer LRU bound.
408///
409/// v0.7.0 #1061 (Agent-2 #8) — known limitation: the per-peer cap
410/// is 10000 fingerprints with FIFO eviction. An enrolled peer
411/// (or an attacker with a past key compromise) can submit 10001
412/// fresh-nonce signed pushes to evict `nonce-0`, then re-send the
413/// captured `(body, sig, nonce-0)` tuple — no longer in cache,
414/// accepted as fresh. With Ed25519 sigs that never expire, the
415/// replay window stays open for the lifetime of the key.
416///
417/// The v0.7.0 mitigations are:
418/// - Per-peer partitioning (#922): an attacker can only flood
419/// THEIR OWN slot, not cross-peer entries (so the threat is
420/// scoped to compromised-key scenarios, not broad DoS).
421/// - Outer LRU + peer ceiling (#1038): bounds the total memory
422/// footprint at ~320 MB worst-case.
423/// - Cache capacity bumped 10× via #1033 (10000-per-peer slot
424/// size set here).
425///
426/// The deeper v0.8 fix (per Agent-2's recommendation) is to bind
427/// nonce freshness to a strictly-monotonic peer-side counter (or
428/// include a receiver clock-window) so any nonce older than the
429/// highest-seen value for the peer is refused regardless of cache
430/// membership. That requires a protocol change (peer-side
431/// counter persistence + clock-skew handling) and is tracked as
432/// a v0.8 federation hardening follow-up. For v0.7.0 the
433/// flush-attack surface is documented as a KNOWN limitation
434/// gated by per-peer-key compromise.
435pub const FEDERATION_NONCE_CAPACITY_PER_PEER: usize = 10_000;
436
437/// v0.7.0 #1038 (Agent-5 #5) — outer-HashMap LRU bound on the
438/// `FederationNonceCache`. Each enrolled peer's slot costs
439/// ~320 KB (10k × 32-byte fingerprints in both the HashSet and
440/// the VecDeque); a long-lived daemon that rotates peers (operator
441/// adds + removes peers in `AI_MEMORY_FED_PEER_ATTESTATION`)
442/// leaves old peer-id slots resident forever pre-#1038. The
443/// ceiling caps the worst-case footprint at ~320 KB × 1024 =
444/// ~320 MB — well within process budget for any realistic
445/// deployment (operator-scale is ~10-100 peers; we leave 10× headroom).
446/// Eviction picks the least-recently-touched peer when a new peer
447/// pushes past the ceiling.
448pub const FEDERATION_NONCE_MAX_PEERS: usize = 1024;
449
450/// v0.7.0 #1033 (federation parity) — same O(1) `HashSet + VecDeque`
451/// shape as `ReplayCacheInner`, applied per-peer so each peer's
452/// freshness check runs in O(1) instead of the pre-#1033 O(N) linear
453/// scan. The per-peer partitioning (already in place pre-#1033)
454/// limits cross-peer eviction so an attacker can only evict THEIR
455/// OWN entries — a weaker threat than the un-partitioned
456/// ReplayCache, but the perf gain matters under sustained federation
457/// load.
458///
459/// v0.7.0 #1038 — `last_touch` tracks the monotonic counter at the
460/// last `record_and_check` for this peer. The outer LRU evicts the
461/// slot with the smallest `last_touch` when at the
462/// `FEDERATION_NONCE_MAX_PEERS` ceiling. Using a u64 counter
463/// instead of `Instant` keeps the comparison O(1) and the eviction
464/// path lock-free of clock reads.
465#[derive(Debug, Default)]
466struct PeerNonceSlot {
467 seen: HashSet<[u8; 32]>,
468 order: VecDeque<[u8; 32]>,
469 last_touch: u64,
470}
471
472/// v0.7.0 #922 — per-peer bounded FIFO cache of `(peer_id, nonce)`.
473#[derive(Debug, Default)]
474pub struct FederationNonceCache {
475 inner: Mutex<HashMap<String, PeerNonceSlot>>,
476 /// v0.7.0 #1038 — monotonic touch counter. Advances on every
477 /// `record_and_check`; each peer slot stamps its `last_touch`
478 /// with the value at insert/update time. The outer LRU
479 /// eviction picks the slot with the smallest value.
480 touch_counter: std::sync::atomic::AtomicU64,
481 /// v0.7.0 #1038 — cumulative count of peer-slot evictions
482 /// since boot. Non-zero values mean the outer LRU dropped a
483 /// peer to make room — operator-visible via `peer_evictions_since_boot()`.
484 peer_evictions: std::sync::atomic::AtomicU64,
485 /// #1255 (MED, 2026-05-25) — when `Some`, every Fresh
486 /// fingerprint is persisted to the `federation_nonce_cache`
487 /// table in the ai-memory sqlite DB on this path AND the
488 /// cache hydrates from the same table on construction. When
489 /// `None` the cache is in-memory only and a daemon restart
490 /// opens a fresh replay window (pre-#1255 behaviour, preserved
491 /// for test harnesses and for any caller that opts out).
492 db_path: Option<PathBuf>,
493}
494
495/// #1690 — prune the on-disk `federation_nonce_cache` to the newest
496/// `per_peer_cap` rows per peer (deleting the rest), bounding the table
497/// to the same ceiling the in-memory cache enforces. Idempotent: on an
498/// already-bounded table it deletes zero rows. Extracted as a free fn so
499/// it is testable with a small cap without seeding 10k rows. The window
500/// `ROW_NUMBER() OVER (PARTITION BY peer_id ORDER BY last_touch DESC)`
501/// keeps the most-recently-touched rows; the WITHOUT-ROWID PK
502/// `(peer_id, fingerprint)` keys the delete.
503///
504/// # Errors
505/// Propagates the underlying `rusqlite` error on SQL failure.
506fn prune_nonce_cache_to_per_peer_cap(
507 conn: &rusqlite::Connection,
508 per_peer_cap: usize,
509) -> rusqlite::Result<usize> {
510 #[allow(clippy::cast_possible_wrap)]
511 let cap = per_peer_cap as i64;
512 conn.execute(
513 "DELETE FROM federation_nonce_cache
514 WHERE (peer_id, fingerprint) IN (
515 SELECT peer_id, fingerprint FROM (
516 SELECT peer_id, fingerprint,
517 ROW_NUMBER() OVER (
518 PARTITION BY peer_id ORDER BY last_touch DESC
519 ) AS rn
520 FROM federation_nonce_cache
521 ) WHERE rn > ?1
522 )",
523 rusqlite::params![cap],
524 )
525}
526
527impl FederationNonceCache {
528 /// Fresh empty cache. In-memory only — the cache resets on every
529 /// daemon restart. Prefer [`Self::new_with_db_persistence`] in
530 /// production: pre-#1255 the in-memory-only cache opened a
531 /// replay window on every restart.
532 #[must_use]
533 pub fn new() -> Self {
534 Self::default()
535 }
536
537 /// #1255 (MED, 2026-05-25) — persistence-enabled constructor.
538 ///
539 /// Opens the ai-memory sqlite DB at `db_path` (runs migrations
540 /// to ensure the `federation_nonce_cache` table is present),
541 /// rehydrates the in-memory cache from the persisted rows
542 /// (oldest `last_touch` first so the in-process LRU ordering
543 /// matches the on-disk order), and arms the cache so every
544 /// subsequent `Fresh` fingerprint is persisted to disk.
545 ///
546 /// Construction errors out if the DB cannot be opened or
547 /// migrated — operators want loud failure here, not a silent
548 /// fallback to in-memory mode that re-opens the replay window.
549 ///
550 /// # Errors
551 ///
552 /// Returns an error if the DB cannot be opened or the load
553 /// query fails.
554 pub fn new_with_db_persistence(db_path: impl Into<PathBuf>) -> anyhow::Result<Self> {
555 let db_path = db_path.into();
556 let cache = Self {
557 inner: Mutex::new(HashMap::new()),
558 touch_counter: AtomicU64::new(0),
559 peer_evictions: AtomicU64::new(0),
560 db_path: Some(db_path.clone()),
561 };
562 cache.hydrate_from_disk(&db_path)?;
563 Ok(cache)
564 }
565
566 /// #1255 — read every persisted `(peer_id, fingerprint,
567 /// last_touch)` triple from disk and seed the in-memory cache.
568 /// Iterates oldest-touch first so the on-disk LRU ordering
569 /// becomes the in-process FIFO ordering for the per-peer
570 /// `VecDeque`s. The post-load `touch_counter` is bumped past
571 /// the largest observed `last_touch` so subsequent inserts
572 /// stay monotonic against the rehydrated state.
573 fn hydrate_from_disk(&self, db_path: &Path) -> anyhow::Result<()> {
574 // Use `crate::db::open` which runs migrations on first open.
575 // This guarantees the `federation_nonce_cache` table exists
576 // even on a pre-v51 DB (the v51 migration is replay-safe).
577 let conn = crate::db::open(db_path)
578 .map_err(|e| anyhow::anyhow!("FederationNonceCache: open ai-memory db: {e}"))?;
579 let mut stmt = conn.prepare(
580 "SELECT peer_id, fingerprint, last_touch
581 FROM federation_nonce_cache
582 ORDER BY last_touch ASC",
583 )?;
584 let mut max_touch: u64 = 0;
585 let rows = stmt.query_map([], |row| {
586 let peer_id: String = row.get(0)?;
587 let fp_bytes: Vec<u8> = row.get(1)?;
588 let last_touch: i64 = row.get(2)?;
589 Ok((peer_id, fp_bytes, last_touch))
590 })?;
591 let mut guard = self
592 .inner
593 .lock()
594 .map_err(|_| anyhow::anyhow!("FederationNonceCache: hydration mutex poisoned"))?;
595 for row in rows {
596 let (peer_id, fp_bytes, last_touch) = row?;
597 // Coerce the 32-byte fingerprint back from the blob.
598 // Rows with non-32-byte blobs are skipped + warn-logged —
599 // they cannot have been produced by any v0.7.x writer,
600 // so they are forensic noise we don't want to crash on.
601 let fp: [u8; 32] = match fp_bytes.as_slice().try_into() {
602 Ok(fp) => fp,
603 Err(_) => {
604 tracing::warn!(
605 target: "ai_memory::identity::replay",
606 peer_id = %peer_id,
607 len = fp_bytes.len(),
608 "FederationNonceCache: skipping persisted row with non-32-byte \
609 fingerprint blob (forensic noise; not produced by any v0.7.x writer)",
610 );
611 continue;
612 }
613 };
614 #[allow(clippy::cast_sign_loss)]
615 let touch_u64 = last_touch.max(0) as u64;
616 if touch_u64 > max_touch {
617 max_touch = touch_u64;
618 }
619 let slot = guard.entry(peer_id).or_default();
620 // Honour the per-peer cap on hydration: oldest rows are
621 // dropped silently when the on-disk persistence holds
622 // more rows than `FEDERATION_NONCE_CAPACITY_PER_PEER`.
623 // (Shouldn't happen in practice — the persistence layer
624 // mirrors the in-memory cap — but defensive on operator
625 // hand-rolled DBs.)
626 if slot.order.len() >= FEDERATION_NONCE_CAPACITY_PER_PEER {
627 if let Some(evicted) = slot.order.pop_front() {
628 slot.seen.remove(&evicted);
629 }
630 }
631 slot.order.push_back(fp);
632 slot.seen.insert(fp);
633 slot.last_touch = touch_u64;
634 }
635 drop(guard);
636 // `rows` was consumed by the `for` loop above; dropping `stmt`
637 // releases its borrow on `conn` before the prune `execute`.
638 drop(stmt);
639
640 // #1690 — repair legacy on-disk bloat from the pre-delete-on-evict
641 // era. Before the eviction-prune fix, the table grew without
642 // bound; delete-on-evict stops FURTHER growth but never shrinks
643 // rows belonging to peers that have since gone silent, so such a
644 // DB would re-scan all of them on every boot. The in-memory cap
645 // above already bounds RAM (per-peer overflow is dropped on
646 // load), so this one-time prune converges the DISK to the same
647 // bound: keep the newest `FEDERATION_NONCE_CAPACITY_PER_PEER`
648 // rows per peer, delete the rest. Idempotent — on an
649 // already-bounded table it deletes zero rows. WITHOUT ROWID PK is
650 // (peer_id, fingerprint), so the window prune keys on that pair.
651 match prune_nonce_cache_to_per_peer_cap(&conn, FEDERATION_NONCE_CAPACITY_PER_PEER) {
652 Ok(0) => {}
653 Ok(n) => tracing::info!(
654 target: "ai_memory::identity::replay",
655 "FederationNonceCache: pruned {n} over-cap disk row(s) on hydration \
656 (#1690 legacy-bloat repair); disk now bounded to the per-peer cap"
657 ),
658 Err(e) => tracing::warn!(
659 target: "ai_memory::identity::replay",
660 err = %e,
661 "FederationNonceCache: hydration over-cap prune failed (non-fatal; in-memory \
662 cache still bounded)"
663 ),
664 }
665
666 // Advance the in-process touch counter past every observed
667 // last_touch so the next insert is monotonic.
668 self.touch_counter
669 .store(max_touch.saturating_add(1), Ordering::Relaxed);
670 Ok(())
671 }
672
673 /// #1255 — persist one `(peer_id, fingerprint, last_touch)`
674 /// triple to disk. Called from the Fresh arm of
675 /// `record_and_check` when `db_path.is_some()`. The INSERT OR
676 /// REPLACE shape keeps the row's `last_touch` in lockstep with
677 /// the in-memory cache on every re-touch path (currently the
678 /// `record_and_check` Fresh path only inserts; re-touch on
679 /// existing fingerprints surfaces as `Replay` and skips the
680 /// persistence call, which is fine — the original row remains).
681 /// Persistence errors are warn-logged and swallowed: an
682 /// operator-disk-full or transient db lock failure should not
683 /// be a 500 on every federated push. The in-memory cap still
684 /// holds, so a persistence outage degrades gracefully to
685 /// pre-#1255 behaviour (replay window opens on next restart).
686 fn persist_fingerprint_and_evict(
687 &self,
688 peer_id: &str,
689 fp: &[u8; 32],
690 last_touch: u64,
691 evicted_fp: Option<&[u8; 32]>,
692 evicted_peer: Option<&str>,
693 ) {
694 let Some(path) = self.db_path.as_deref() else {
695 return;
696 };
697 // `crate::db::open` runs migrations + is cheap on a warm
698 // SQLite WAL connection; the persistence rate is bounded by
699 // federated-POST throughput (sub-Hz on any realistic mesh).
700 let conn = match crate::db::open(path) {
701 Ok(c) => c,
702 Err(e) => {
703 tracing::warn!(
704 target: "ai_memory::identity::replay",
705 peer_id = %peer_id,
706 path = %path.display(),
707 err = %e,
708 "FederationNonceCache: persist open failed; in-memory cache still holds \
709 (#1255 graceful degradation)",
710 );
711 return;
712 }
713 };
714 // `i64::try_from` is safe because `touch_counter` advances
715 // at most once per record_and_check; a daemon would need to
716 // sustain >2^63 federated pushes/sec to overflow, which is
717 // not a real shape.
718 #[allow(clippy::cast_possible_wrap)]
719 let last_touch_i64 = last_touch as i64;
720 let now = chrono::Utc::now().to_rfc3339();
721 if let Err(e) = conn.execute(
722 "INSERT OR REPLACE INTO federation_nonce_cache
723 (peer_id, fingerprint, last_touch, inserted_at)
724 VALUES (?1, ?2, ?3, ?4)",
725 rusqlite::params![peer_id, fp.as_slice(), last_touch_i64, now],
726 ) {
727 tracing::warn!(
728 target: "ai_memory::identity::replay",
729 peer_id = %peer_id,
730 err = %e,
731 "FederationNonceCache: persist insert failed; in-memory cache still holds \
732 (#1255 graceful degradation)",
733 );
734 }
735 // #1690 — delete-on-evict: prune the disk rows the in-memory LRU
736 // just evicted so the table stays bounded by the same ceiling as
737 // the in-memory cache. Best-effort on the SAME connection as the
738 // INSERT above; a failed DELETE only leaves a dead row (the
739 // in-memory cap still bounds the replay check), so it is
740 // warn-logged and swallowed like the INSERT.
741 if let Some(efp) = evicted_fp {
742 if let Err(e) = conn.execute(
743 "DELETE FROM federation_nonce_cache WHERE peer_id = ?1 AND fingerprint = ?2",
744 rusqlite::params![peer_id, efp.as_slice()],
745 ) {
746 tracing::warn!(
747 target: "ai_memory::identity::replay",
748 peer_id = %peer_id,
749 err = %e,
750 "FederationNonceCache: evicted-fingerprint delete failed; disk row lingers \
751 (#1690 graceful degradation)",
752 );
753 }
754 }
755 if let Some(ep) = evicted_peer {
756 if let Err(e) = conn.execute(
757 "DELETE FROM federation_nonce_cache WHERE peer_id = ?1",
758 rusqlite::params![ep],
759 ) {
760 tracing::warn!(
761 target: "ai_memory::identity::replay",
762 evicted_peer = %ep,
763 err = %e,
764 "FederationNonceCache: evicted-peer delete failed; disk rows linger \
765 (#1690 graceful degradation)",
766 );
767 }
768 }
769 }
770
771 /// Check + record `(peer_id, nonce)`.
772 pub fn record_and_check(&self, peer_id: &str, nonce: &str) -> ReplayDecision {
773 use std::sync::atomic::Ordering;
774 let fp = Self::fingerprint(peer_id, nonce);
775 let mut guard = match self.inner.lock() {
776 Ok(g) => g,
777 Err(p) => p.into_inner(),
778 };
779 // #1690 — capture in-memory evictions so the disk mirror is
780 // pruned in lockstep (delete-on-evict). Without this the disk
781 // `federation_nonce_cache` table grew unbounded: every Fresh
782 // nonce INSERTs a row, but the in-memory LRU evictions (per-peer
783 // FIFO + outer peer-slot LRU) never deleted the disk counterpart,
784 // so a long-lived high-throughput peer accumulated millions of
785 // dead rows while the in-memory cache stayed bounded. Both
786 // deletes ride the SAME connection-open as the persist below
787 // (see `persist_fingerprint_and_evict`), so the hot-path disk
788 // I/O rate is unchanged.
789 let mut evicted_peer: Option<String> = None;
790 // v0.7.0 #1038 — bound the outer HashMap to
791 // `FEDERATION_NONCE_MAX_PEERS`. When the incoming peer is a
792 // NEW entry AND the map is at the ceiling, evict the
793 // least-recently-touched peer (LRU) before inserting.
794 // Skip the eviction when the peer already exists (re-touch
795 // is free).
796 if !guard.contains_key(peer_id) && guard.len() >= FEDERATION_NONCE_MAX_PEERS {
797 // Find the smallest `last_touch` to pick the LRU peer.
798 if let Some((evict_id, _)) = guard
799 .iter()
800 .min_by_key(|(_, s)| s.last_touch)
801 .map(|(k, s)| (k.clone(), s.last_touch))
802 {
803 guard.remove(&evict_id);
804 self.peer_evictions.fetch_add(1, Ordering::Relaxed);
805 tracing::warn!(
806 target: "ai_memory::identity::replay",
807 evicted_peer = %evict_id,
808 "FederationNonceCache: at peer ceiling ({}); evicted LRU peer slot to make \
809 room. Operator-visible via peer_evictions_since_boot() (#1038).",
810 FEDERATION_NONCE_MAX_PEERS,
811 );
812 evicted_peer = Some(evict_id);
813 }
814 }
815 let touch = self.touch_counter.fetch_add(1, Ordering::Relaxed);
816 let slot = guard.entry(peer_id.to_string()).or_default();
817 slot.last_touch = touch;
818 // v0.7.0 #1033 — O(1) HashSet membership replaces O(N) scan.
819 if slot.seen.contains(&fp) {
820 return ReplayDecision::Replay;
821 }
822 let mut evicted_fp: Option<[u8; 32]> = None;
823 if slot.order.len() >= FEDERATION_NONCE_CAPACITY_PER_PEER {
824 // Keep `seen` + `order` in lockstep on FIFO eviction.
825 if let Some(evicted) = slot.order.pop_front() {
826 slot.seen.remove(&evicted);
827 evicted_fp = Some(evicted);
828 }
829 }
830 slot.order.push_back(fp);
831 slot.seen.insert(fp);
832 // Release the inner mutex before doing disk I/O so a slow
833 // SQLite WAL fsync doesn't block sibling
834 // `record_and_check` calls. The persistence call itself
835 // opens its own connection (no shared state).
836 drop(guard);
837 // #1255 — persist the new fingerprint to disk so a daemon
838 // restart doesn't re-open the replay window. #1690 — and prune
839 // the rows the in-memory cache just evicted so the disk mirror
840 // stays bounded. Both ride one connection-open; failures are
841 // warn-logged and swallowed (graceful degradation to the
842 // in-memory-only pre-#1255 posture).
843 self.persist_fingerprint_and_evict(
844 peer_id,
845 &fp,
846 touch,
847 evicted_fp.as_ref(),
848 evicted_peer.as_deref(),
849 );
850 ReplayDecision::Fresh
851 }
852
853 /// v0.7.0 #1038 — cumulative number of peer-slot evictions
854 /// (outer LRU). Non-zero means peer churn caused the outer
855 /// HashMap to hit `FEDERATION_NONCE_MAX_PEERS` and drop an
856 /// older peer's slot. Operators page on sustained growth.
857 #[must_use]
858 pub fn peer_evictions_since_boot(&self) -> u64 {
859 self.peer_evictions
860 .load(std::sync::atomic::Ordering::Relaxed)
861 }
862
863 /// Distinct peers with at least one cached fingerprint.
864 #[must_use]
865 pub fn peer_count(&self) -> usize {
866 self.inner.lock().map(|g| g.len()).unwrap_or(0)
867 }
868
869 /// Cached fingerprints for `peer_id`.
870 #[must_use]
871 pub fn len_for_peer(&self, peer_id: &str) -> usize {
872 self.inner
873 .lock()
874 .map(|g| g.get(peer_id).map_or(0, |s| s.order.len()))
875 .unwrap_or(0)
876 }
877
878 fn fingerprint(peer_id: &str, nonce: &str) -> [u8; 32] {
879 let mut hasher = Sha256::new();
880 let pid = peer_id.as_bytes();
881 let non = nonce.as_bytes();
882 #[allow(clippy::cast_possible_truncation)]
883 hasher.update((pid.len() as u32).to_be_bytes());
884 hasher.update(pid);
885 #[allow(clippy::cast_possible_truncation)]
886 hasher.update((non.len() as u32).to_be_bytes());
887 hasher.update(non);
888 hasher.finalize().into()
889 }
890}
891
892#[cfg(test)]
893mod federation_nonce_cache_tests {
894 use super::*;
895
896 #[test]
897 fn first_seen_returns_fresh() {
898 let cache = FederationNonceCache::new();
899 assert_eq!(cache.record_and_check("p", "n"), ReplayDecision::Fresh);
900 assert_eq!(cache.len_for_peer("p"), 1);
901 }
902
903 #[test]
904 fn exact_repeat_returns_replay() {
905 let cache = FederationNonceCache::new();
906 assert_eq!(cache.record_and_check("p", "n"), ReplayDecision::Fresh);
907 assert_eq!(cache.record_and_check("p", "n"), ReplayDecision::Replay);
908 assert_eq!(cache.len_for_peer("p"), 1);
909 }
910
911 #[test]
912 fn different_peers_can_use_same_nonce() {
913 let cache = FederationNonceCache::new();
914 assert_eq!(cache.record_and_check("a", "s"), ReplayDecision::Fresh);
915 assert_eq!(cache.record_and_check("b", "s"), ReplayDecision::Fresh);
916 assert_eq!(cache.peer_count(), 2);
917 }
918
919 #[test]
920 fn fifo_eviction_at_per_peer_capacity() {
921 let cache = FederationNonceCache::new();
922 for i in 0..FEDERATION_NONCE_CAPACITY_PER_PEER {
923 assert_eq!(
924 cache.record_and_check("p", &format!("n-{i}")),
925 ReplayDecision::Fresh
926 );
927 }
928 assert_eq!(cache.len_for_peer("p"), FEDERATION_NONCE_CAPACITY_PER_PEER);
929 assert_eq!(cache.record_and_check("p", "n-new"), ReplayDecision::Fresh);
930 assert_eq!(cache.record_and_check("p", "n-0"), ReplayDecision::Fresh);
931 }
932
933 #[test]
934 fn peer_count_evictions_counter_starts_at_zero_1038() {
935 // v0.7.0 #1038 — fresh cache reports zero peer-slot evictions.
936 let cache = FederationNonceCache::new();
937 assert_eq!(cache.peer_evictions_since_boot(), 0);
938 // Insert below the peer ceiling — no eviction.
939 for i in 0..32 {
940 let _ = cache.record_and_check(&format!("peer-{i}"), "n");
941 }
942 assert_eq!(cache.peer_count(), 32);
943 assert_eq!(cache.peer_evictions_since_boot(), 0);
944 }
945
946 #[test]
947 fn outer_lru_evicts_least_recently_touched_at_ceiling_1038() {
948 // v0.7.0 #1038 (Agent-5 #5) — when the FederationNonceCache
949 // HashMap hits FEDERATION_NONCE_MAX_PEERS, a NEW peer's
950 // insert evicts the least-recently-touched peer slot.
951 // Pre-#1038 the HashMap was unbounded; a daemon that rotated
952 // peers (operator config churn) accumulated ~320 KB per
953 // ever-enrolled peer indefinitely.
954 let cache = FederationNonceCache::new();
955 // Fill to exactly the peer ceiling.
956 for i in 0..FEDERATION_NONCE_MAX_PEERS {
957 let _ = cache.record_and_check(&format!("peer-{i}"), "n");
958 }
959 assert_eq!(cache.peer_count(), FEDERATION_NONCE_MAX_PEERS);
960 assert_eq!(cache.peer_evictions_since_boot(), 0);
961 // Touch peer-0 to make it the most-recently-touched
962 // (advances its last_touch); peer-1 is now the LRU
963 // candidate.
964 let _ = cache.record_and_check("peer-0", "n2");
965 // Push a NEW peer past the ceiling — peer-1 (the LRU)
966 // should be evicted.
967 assert_eq!(
968 cache.record_and_check("peer-new", "n"),
969 ReplayDecision::Fresh
970 );
971 assert_eq!(
972 cache.peer_count(),
973 FEDERATION_NONCE_MAX_PEERS,
974 "#1038: at ceiling the outer HashMap must stay at FEDERATION_NONCE_MAX_PEERS"
975 );
976 assert_eq!(
977 cache.peer_evictions_since_boot(),
978 1,
979 "#1038: exactly one peer-slot eviction must have fired"
980 );
981 // peer-1 (LRU) is gone — recording for it again returns
982 // Fresh (the cache forgot the prior fingerprints).
983 assert_eq!(cache.len_for_peer("peer-1"), 0);
984 // peer-0 (recently touched) is still present.
985 assert!(cache.len_for_peer("peer-0") > 0);
986 }
987
988 #[test]
989 fn re_touch_existing_peer_does_not_trigger_eviction_1038() {
990 // v0.7.0 #1038 — re-touching an existing peer at the
991 // ceiling MUST NOT trigger an eviction (LRU bookkeeping
992 // only fires on NEW peer inserts past the ceiling).
993 let cache = FederationNonceCache::new();
994 for i in 0..FEDERATION_NONCE_MAX_PEERS {
995 let _ = cache.record_and_check(&format!("peer-{i}"), "n");
996 }
997 let before = cache.peer_evictions_since_boot();
998 // Re-touch every existing peer — no NEW peer inserts.
999 for i in 0..FEDERATION_NONCE_MAX_PEERS {
1000 let _ = cache.record_and_check(&format!("peer-{i}"), &format!("n2-{i}"));
1001 }
1002 assert_eq!(
1003 cache.peer_evictions_since_boot(),
1004 before,
1005 "#1038: re-touching existing peers MUST NOT trigger LRU eviction"
1006 );
1007 assert_eq!(cache.peer_count(), FEDERATION_NONCE_MAX_PEERS);
1008 }
1009
1010 /// #1255 (MED, 2026-05-25) — regression: a nonce that landed in
1011 /// the cache before a daemon restart must STILL be rejected as
1012 /// a replay after the restart. Pre-#1255 every restart opened a
1013 /// fresh in-memory window, so any captured `(body, sig, nonce)`
1014 /// tuple could be replayed once the daemon bounced.
1015 ///
1016 /// Simulates the restart by dropping the first
1017 /// `FederationNonceCache` and constructing a second one against
1018 /// the SAME `db_path`. The hydration step on the second cache
1019 /// reloads every persisted fingerprint, so the same `(peer_id,
1020 /// nonce)` MUST surface as `Replay` on the second cache.
1021 #[test]
1022 fn issue_1255_nonce_persists_across_recreated_cache() {
1023 let tmp = tempfile::NamedTempFile::new().expect("tempfile");
1024 let db_path = tmp.path().to_path_buf();
1025
1026 // First cache — accept the nonce as Fresh, persisting it.
1027 let cache_a = FederationNonceCache::new_with_db_persistence(&db_path)
1028 .expect("first cache must open the DB and run v51 migration");
1029 assert_eq!(
1030 cache_a.record_and_check("peer-1255", "n-1255"),
1031 ReplayDecision::Fresh,
1032 "#1255: first observation of (peer, nonce) is Fresh"
1033 );
1034 // A second observation in the SAME process is Replay
1035 // (in-memory cache holds independent of disk persistence).
1036 assert_eq!(
1037 cache_a.record_and_check("peer-1255", "n-1255"),
1038 ReplayDecision::Replay,
1039 "#1255: in-process re-observation is Replay (sanity)"
1040 );
1041 drop(cache_a);
1042
1043 // Second cache — simulate daemon restart against the same
1044 // DB. Hydration must replay the persisted fingerprint into
1045 // the in-memory set so the SAME (peer, nonce) is REJECTED.
1046 let cache_b = FederationNonceCache::new_with_db_persistence(&db_path)
1047 .expect("second cache must hydrate from the same DB");
1048 assert_eq!(
1049 cache_b.record_and_check("peer-1255", "n-1255"),
1050 ReplayDecision::Replay,
1051 "#1255: persistence is load-bearing — a daemon restart must NOT \
1052 reopen the replay window for a previously-seen nonce"
1053 );
1054 // A NEW (peer, nonce) under the second cache is Fresh — the
1055 // hydration didn't accidentally over-block.
1056 assert_eq!(
1057 cache_b.record_and_check("peer-1255", "n-different"),
1058 ReplayDecision::Fresh,
1059 "#1255: hydration must NOT over-block on unrelated nonces"
1060 );
1061 // The hydrated cache still tracks at least the one peer
1062 // from before (sanity on `len_for_peer`).
1063 assert!(
1064 cache_b.len_for_peer("peer-1255") >= 1,
1065 "#1255: hydrated cache must retain the persisted fingerprint count"
1066 );
1067 }
1068
1069 /// #1690 — delete-on-evict keeps the disk `federation_nonce_cache`
1070 /// table bounded by the same ceiling as the in-memory LRU. Pre-fix
1071 /// the table was INSERT-only, so a long-lived high-throughput peer
1072 /// grew it without bound while the in-memory cache stayed capped.
1073 /// Drives the private persist+evict path directly (the per-peer FIFO
1074 /// cap is 10k and the outer cap is 1024, both too large to trigger
1075 /// via `record_and_check` in a unit test) and counts disk rows.
1076 #[test]
1077 fn issue_1690_eviction_prunes_disk_mirror() {
1078 fn disk_row_count(path: &std::path::Path) -> i64 {
1079 let conn = crate::db::open(path).expect("open nonce db");
1080 conn.query_row("SELECT COUNT(*) FROM federation_nonce_cache", [], |r| {
1081 r.get(0)
1082 })
1083 .expect("count rows")
1084 }
1085
1086 let tmp = tempfile::NamedTempFile::new().expect("tempfile");
1087 let db_path = tmp.path().to_path_buf();
1088 let cache = FederationNonceCache::new_with_db_persistence(&db_path)
1089 .expect("open + migrate nonce db");
1090
1091 // Two fresh fingerprints for one peer → two disk rows.
1092 let fp_a = FederationNonceCache::fingerprint("peer-x", "nonce-a");
1093 let fp_b = FederationNonceCache::fingerprint("peer-x", "nonce-b");
1094 cache.persist_fingerprint_and_evict("peer-x", &fp_a, 1, None, None);
1095 cache.persist_fingerprint_and_evict("peer-x", &fp_b, 2, None, None);
1096 assert_eq!(disk_row_count(&db_path), 2, "two inserts → two rows");
1097
1098 // Per-peer FIFO eviction: inserting fp_c while evicting fp_a must
1099 // delete fp_a's disk row → still 2 rows (fp_b + fp_c), not 3.
1100 let fp_c = FederationNonceCache::fingerprint("peer-x", "nonce-c");
1101 cache.persist_fingerprint_and_evict("peer-x", &fp_c, 3, Some(&fp_a), None);
1102 assert_eq!(
1103 disk_row_count(&db_path),
1104 2,
1105 "#1690: a per-peer FIFO eviction must delete the evicted disk row"
1106 );
1107
1108 // Add a second peer, then an outer-LRU peer eviction of peer-x
1109 // must wipe ALL of peer-x's disk rows.
1110 let fp_y = FederationNonceCache::fingerprint("peer-y", "n");
1111 cache.persist_fingerprint_and_evict("peer-y", &fp_y, 4, None, None);
1112 assert_eq!(disk_row_count(&db_path), 3, "peer-y row added → three rows");
1113 let fp_z = FederationNonceCache::fingerprint("peer-z", "n");
1114 cache.persist_fingerprint_and_evict("peer-z", &fp_z, 5, None, Some("peer-x"));
1115 assert_eq!(
1116 disk_row_count(&db_path),
1117 2,
1118 "#1690: an outer-LRU peer eviction must delete every disk row for that peer \
1119 (peer-x's 2 rows gone, peer-y + peer-z remain)"
1120 );
1121 }
1122
1123 #[test]
1124 fn issue_1690_prune_nonce_cache_keeps_newest_per_peer() {
1125 // #1690 — the hydration-time legacy-bloat repair keeps the newest
1126 // `cap` rows per peer and deletes the rest. Tested with a small
1127 // cap so we don't seed 10k rows.
1128 let tmp = tempfile::NamedTempFile::new().expect("tempfile");
1129 let conn = crate::db::open(tmp.path()).expect("open + migrate");
1130 // peer-a: 4 rows (last_touch 1..4); peer-b: 2 rows (5..6).
1131 for (peer, touch) in [
1132 ("peer-a", 1),
1133 ("peer-a", 2),
1134 ("peer-a", 3),
1135 ("peer-a", 4),
1136 ("peer-b", 5),
1137 ("peer-b", 6),
1138 ] {
1139 let fp = FederationNonceCache::fingerprint(peer, &format!("n{touch}"));
1140 conn.execute(
1141 "INSERT INTO federation_nonce_cache (peer_id, fingerprint, last_touch, inserted_at)
1142 VALUES (?1, ?2, ?3, '2026-01-01T00:00:00Z')",
1143 rusqlite::params![peer, fp.as_slice(), touch],
1144 )
1145 .unwrap();
1146 }
1147 // Prune to cap=2 per peer: peer-a drops its 2 oldest (touch 1,2),
1148 // peer-b is already within cap (untouched).
1149 let deleted = prune_nonce_cache_to_per_peer_cap(&conn, 2).expect("prune");
1150 assert_eq!(deleted, 2, "#1690: peer-a's 2 over-cap rows deleted");
1151 let a_left: i64 = conn
1152 .query_row(
1153 "SELECT COUNT(*) FROM federation_nonce_cache WHERE peer_id='peer-a'",
1154 [],
1155 |r| r.get(0),
1156 )
1157 .unwrap();
1158 let b_left: i64 = conn
1159 .query_row(
1160 "SELECT COUNT(*) FROM federation_nonce_cache WHERE peer_id='peer-b'",
1161 [],
1162 |r| r.get(0),
1163 )
1164 .unwrap();
1165 assert_eq!(a_left, 2, "peer-a bounded to cap");
1166 assert_eq!(b_left, 2, "peer-b within cap, untouched");
1167 // The newest peer-a rows (touch 3,4) survived; the oldest (1,2) went.
1168 let min_touch_a: i64 = conn
1169 .query_row(
1170 "SELECT MIN(last_touch) FROM federation_nonce_cache WHERE peer_id='peer-a'",
1171 [],
1172 |r| r.get(0),
1173 )
1174 .unwrap();
1175 assert_eq!(min_touch_a, 3, "the oldest rows were the ones pruned");
1176 // Idempotent: a second prune deletes nothing.
1177 assert_eq!(prune_nonce_cache_to_per_peer_cap(&conn, 2).unwrap(), 0);
1178 }
1179
1180 /// #1255 — graceful degradation: persistence open errors do NOT
1181 /// crash the cache. A broken DB path surfaces as a
1182 /// constructor-time `Err`; callers (today: only the production
1183 /// daemon bootstrap) get a clear error and fall back to either
1184 /// retrying with the right path OR booting with the in-memory
1185 /// constructor [`Self::new`].
1186 #[test]
1187 fn issue_1255_persistence_constructor_surfaces_open_errors() {
1188 // Point at a path that cannot exist as a sqlite DB (a directory).
1189 let dir = tempfile::TempDir::new().unwrap();
1190 // Passing the directory itself as a path. SQLite's `open_with_flags`
1191 // refuses to open a directory as a database file.
1192 let res = FederationNonceCache::new_with_db_persistence(dir.path().to_path_buf());
1193 assert!(
1194 res.is_err(),
1195 "#1255: a non-DB path must surface as a constructor Err so operators \
1196 see the persistence failure rather than silently falling back"
1197 );
1198 }
1199}