ant_node/replication/commitment_state.rs
1//! Responder-side commitment builder + rotation state.
2//!
3//! Phase 2b of the v12 storage-bound audit design. Builds, signs, and
4//! caches a [`StorageCommitment`] over the responder's currently-stored
5//! key set; serves audit lookups by `expected_commitment_hash`; retains
6//! the previous commitment across one rotation so an audit pinned to it
7//! does not false-fail at the rotation boundary (v5/v12 §4 retention).
8//!
9//! Rotation strategy:
10//!
11//! - `rotate(new_built)` atomically replaces `current` with `new_built`
12//! and demotes the prior `current` to `previous`. The prior
13//! `previous` is dropped.
14//! - `lookup(hash)` reads the in-memory map and returns an [`Arc`] to
15//! the matching `BuiltCommitment`, keeping it alive for the audit
16//! response regardless of subsequent rotation (mirrors the `ArcSwap`
17//! semantics specified in v6 §2: an in-flight reader holding its
18//! `Arc` is unaffected by a concurrent rotate).
19//!
20//! No persistent disk state. Trees are rebuilt from `LmdbStorage` at
21//! the next rotation tick. Memory cost is bounded by
22//! `2 × (key_count × ~64 bytes + signature_size)` — for 10k keys, ~1.3 MB.
23
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26
27use parking_lot::RwLock;
28use saorsa_pqc::api::sig::MlDsaSecretKey;
29
30use crate::ant_protocol::XorName;
31use crate::replication::commitment::{
32 commitment_hash, sign_commitment, CommitmentError, MerkleTree, StorageCommitment,
33};
34
35/// Auditor-side per-peer commitment state.
36///
37/// Holds two things that together implement v10/v12 §2 step 5 and §6:
38/// - `last_commitment`: the most recently received, verified, signed
39/// commitment from this peer. `None` if we've evicted it (TTL,
40/// sybil cap, peer-removed) or never received one.
41/// - `commitment_capable`: a **sticky** boolean that flips to `true`
42/// on the first successful gossip ingest and NEVER reverts. Used
43/// by holder-eligibility (§6) and bootstrap-claim shield: a peer
44/// that has at least once proven it speaks v12 is forever held to
45/// that standard. Without stickiness, a peer could flip the flag
46/// off by silencing its gossip and downgrade to the weaker legacy
47/// audit path.
48#[derive(Debug, Clone)]
49pub struct PeerCommitmentRecord {
50 /// Last verified commitment, or `None` if evicted/expired. PRIVATE so it can
51 /// only be mutated through [`Self::set_commitment`] / [`Self::clear_commitment`],
52 /// which keep `cached_hash` in lockstep (codex#2 — a stray
53 /// `record.last_commitment = …` would otherwise stale the cached hash). Read
54 /// it via [`Self::last_commitment`].
55 last_commitment: Option<StorageCommitment>,
56 /// `commitment_hash(last_commitment)`, cached so the per-cycle verifier
57 /// snapshot doesn't re-serialize + re-hash every peer's ~5 KiB commitment
58 /// each verification round (§13). Kept in sync via [`Self::set_commitment`]
59 /// / [`Self::clear_commitment`]; `None` exactly when `last_commitment` is
60 /// `None`.
61 cached_hash: Option<[u8; 32]>,
62 /// Sticky: true once this peer has gossiped a valid commitment.
63 /// Set on ingest. Never set back to false except by full
64 /// `PeerRemoved` cleanup.
65 pub commitment_capable: bool,
66 /// When `last_commitment` was received. Used for TTL on the
67 /// commitment itself (independent of the `commitment_capable`
68 /// stickiness — losing the commitment via TTL doesn't make us
69 /// forget the peer ever spoke v12).
70 pub received_at: Instant,
71 /// Last time we performed an ML-DSA signature verify for this
72 /// peer's commitment. Used to enforce the §2 step 3 rate limit
73 /// (at most one sig verify per peer per 60s).
74 pub last_sig_verify_at: Instant,
75}
76
77impl PeerCommitmentRecord {
78 /// Construct from a freshly-verified commitment. `commitment_capable`
79 /// is set to `true` here and must remain so for the lifetime of the
80 /// record.
81 #[must_use]
82 pub fn from_verified(commitment: StorageCommitment, now: Instant) -> Self {
83 let cached_hash = commitment_hash(&commitment);
84 Self {
85 last_commitment: Some(commitment),
86 cached_hash,
87 commitment_capable: true,
88 received_at: now,
89 last_sig_verify_at: now,
90 }
91 }
92
93 /// Mark commitment-capable without storing a commitment (used when
94 /// we've TTL-expired the commitment itself but want to remember the
95 /// peer has spoken v12 before).
96 #[must_use]
97 pub fn capable_but_no_commitment(now: Instant) -> Self {
98 Self {
99 last_commitment: None,
100 cached_hash: None,
101 commitment_capable: true,
102 received_at: now,
103 last_sig_verify_at: now,
104 }
105 }
106
107 /// The stored commitment, if any. Read-only view of the private field.
108 #[must_use]
109 pub fn last_commitment(&self) -> Option<&StorageCommitment> {
110 self.last_commitment.as_ref()
111 }
112
113 /// The cached `commitment_hash` of the stored commitment (§13) — `None`
114 /// when no commitment is held. Avoids re-serializing/re-hashing on every
115 /// verifier snapshot.
116 #[must_use]
117 pub fn commitment_hash(&self) -> Option<[u8; 32]> {
118 self.cached_hash
119 }
120
121 /// Replace the stored commitment and refresh the cached hash together, so
122 /// the two never drift.
123 pub fn set_commitment(&mut self, commitment: StorageCommitment, now: Instant) {
124 self.cached_hash = commitment_hash(&commitment);
125 self.last_commitment = Some(commitment);
126 self.received_at = now;
127 }
128
129 /// Drop the stored commitment and its cached hash together.
130 pub fn clear_commitment(&mut self) {
131 self.last_commitment = None;
132 self.cached_hash = None;
133 }
134}
135
136/// A fully-built commitment: signed wire blob, cached hash, Merkle tree
137/// for inclusion proofs, and a sorted leaf-index lookup for the auditor's
138/// `leaf_index` field.
139///
140/// Held inside an [`Arc`] so audit responders can grab a reference and
141/// build a reply without holding the [`ResponderCommitmentState`] read
142/// lock for the duration of the response.
143pub struct BuiltCommitment {
144 /// The signed wire blob.
145 commitment: StorageCommitment,
146 /// `commitment_hash(commitment)` — cached so audit lookups don't
147 /// re-serialize on every match.
148 cached_hash: [u8; 32],
149 /// The Merkle tree behind the commitment. `path_for(key)` produces the
150 /// inclusion proof and `key_index(key)` reconstructs a key's leaf index in
151 /// `O(log n)` — so no separate `sorted_keys` Vec is kept (it duplicated the
152 /// keys already in `tree.leaves`, §14).
153 tree: MerkleTree,
154}
155
156impl BuiltCommitment {
157 /// Build a commitment over `entries = [(key, bytes_hash), ...]` and
158 /// sign it with `secret_key`.
159 ///
160 /// `entries` does not need to be sorted (the inner [`MerkleTree`]
161 /// sorts internally); `sender_peer_id` is bound into the signature
162 /// and the commitment.
163 ///
164 /// # Errors
165 ///
166 /// Returns the wrapped [`CommitmentError`] on empty key sets,
167 /// over-cap key counts, duplicates, or signing failures.
168 pub fn build(
169 entries: Vec<(XorName, [u8; 32])>,
170 sender_peer_id: &[u8; 32],
171 secret_key: &MlDsaSecretKey,
172 sender_public_key: &[u8],
173 ) -> Result<Self, CommitmentError> {
174 let tree = MerkleTree::build(entries)?;
175 Self::build_from_tree(tree, sender_peer_id, secret_key, sender_public_key)
176 }
177
178 /// Sign and wrap an ALREADY-BUILT Merkle tree. Lets callers that already
179 /// built the tree (e.g. the rotation no-op-root check, §11) avoid rebuilding
180 /// it inside [`Self::build`].
181 ///
182 /// # Errors
183 ///
184 /// Propagates signing / serialization failures, identical to [`Self::build`].
185 pub fn build_from_tree(
186 tree: MerkleTree,
187 sender_peer_id: &[u8; 32],
188 secret_key: &MlDsaSecretKey,
189 sender_public_key: &[u8],
190 ) -> Result<Self, CommitmentError> {
191 let root = tree.root();
192 let key_count = tree.key_count();
193 let signature = sign_commitment(
194 secret_key,
195 &root,
196 key_count,
197 sender_peer_id,
198 sender_public_key,
199 )?;
200 let commitment = StorageCommitment {
201 root,
202 key_count,
203 sender_peer_id: *sender_peer_id,
204 sender_public_key: sender_public_key.to_vec(),
205 signature,
206 };
207 // `commitment_hash` only returns None on a postcard serialization
208 // failure, which for our fixed-size commitment cannot occur in
209 // practice (ML-DSA-65 signature is 3293 bytes). If it ever
210 // somehow does, surface as a SignatureFailed so callers don't
211 // need a new error variant for an unreachable case.
212 let cached_hash = commitment_hash(&commitment).ok_or_else(|| {
213 CommitmentError::SignatureFailed("commitment serialization failed".to_string())
214 })?;
215 Ok(Self {
216 commitment,
217 cached_hash,
218 tree,
219 })
220 }
221
222 /// The signed wire blob.
223 #[must_use]
224 pub fn commitment(&self) -> &StorageCommitment {
225 &self.commitment
226 }
227
228 /// The cached commitment hash. Equal to
229 /// [`crate::replication::commitment::commitment_hash`]
230 /// `(self.commitment())`.
231 #[must_use]
232 pub fn hash(&self) -> [u8; 32] {
233 self.cached_hash
234 }
235
236 /// The Merkle tree behind this commitment.
237 ///
238 /// Used by the subtree-audit responder to plan a proof (select the
239 /// nonce-determined branch and read its sibling cut-hashes).
240 #[must_use]
241 pub fn tree(&self) -> &MerkleTree {
242 &self.tree
243 }
244
245 /// Inclusion path + leaf index for `key`, if it is in this
246 /// commitment. Returns `None` if `key` is not committed.
247 #[must_use]
248 pub fn proof_for(&self, key: &XorName) -> Option<(Vec<[u8; 32]>, u32)> {
249 let idx = self.tree.key_index(key)?;
250 let path = self.tree.path_for(key)?;
251 // u32 cast safe because MerkleTree::build rejects > MAX_COMMITMENT_KEY_COUNT.
252 let leaf_index = u32::try_from(idx).unwrap_or(u32::MAX);
253 Some((path, leaf_index))
254 }
255
256 /// Whether `key` is committed in this tree. Allocation-free membership
257 /// check (binary search over the sorted leaf keys) — equivalent to
258 /// `proof_for(key).is_some()` but without building the inclusion path, for
259 /// hot callers (e.g. the pruner's `is_held` veto) that only need the
260 /// boolean.
261 #[must_use]
262 pub fn contains_key(&self, key: &XorName) -> bool {
263 self.tree.contains_key(key)
264 }
265}
266
267/// Number of recently-gossiped commitments a responder stays answerable for
268/// (ADR-0002 "you stay answerable for what you publish").
269///
270/// The auditor only ever pins a commitment it received via gossip, so retaining
271/// the last two **actually-gossiped** commitments (plus the current one)
272/// guarantees an honest node can always answer a pin the auditor could have
273/// formed. Two — not one — absorbs the race where the auditor pins the
274/// commitment a node published just before its newest one. Retention is keyed on
275/// gossip emission, NOT on the rotation timer: a node that rebuilds its tree
276/// faster than it gossips never drops a commitment it actually put on the wire,
277/// so it is never wrongly failed for "unknown commitment hash".
278const RETAINED_GOSSIPED_COMMITMENTS: usize = 2;
279
280/// How long a gossiped commitment stays answerable after it was last put on the
281/// wire. Retention (and therefore the pruner's `is_held` deletion veto) is
282/// anchored to gossip emission, not to the rotation timer or to distinct-hash
283/// churn: a commitment record expires this long after its last `mark_gossiped`,
284/// even if the node keeps re-gossiping nothing new (the steady-state no-op
285/// rotation case) or stops being responsible for all its keys.
286///
287/// Sized so it strictly dominates the longest realistic auditor pin lifetime —
288/// well above the neighbor-sync gossip cadence and per-peer cooldown (≤1 h) —
289/// while staying far below the prune hysteresis (days), so once a stale key
290/// stops being gossiped the pruner reclaims it promptly. At
291/// `RETAINED_GOSSIPED_COMMITMENTS = 2` this is `(2 + 1) ×` the 1 h rotation
292/// interval = 3 h.
293pub(crate) const GOSSIP_ANSWERABILITY_TTL: Duration = Duration::from_secs(3 * 3600);
294
295/// Responder retention state (ADR-0002).
296///
297/// Keeps the current (latest-rotated) commitment plus every commitment whose
298/// hash is among the last `RETAINED_GOSSIPED_COMMITMENTS` *gossiped* hashes.
299/// A built-but-never-gossiped commitment is dropped on the next rotation unless
300/// it gets gossiped. Rotation and gossip are the only paths that mutate this.
301pub struct ResponderCommitmentState {
302 inner: RwLock<Inner>,
303}
304
305/// A commitment hash that was emitted on the wire, with the wall-clock time it
306/// was last gossiped. The `last_gossiped_at` is the answerability anchor: the
307/// record (and any slot it retains) expires `GOSSIP_ANSWERABILITY_TTL` after
308/// this instant, independent of rotation ticks or distinct-hash churn.
309#[derive(Clone, Copy)]
310struct GossipedAt {
311 hash: [u8; 32],
312 last_gossiped_at: Instant,
313}
314
315struct Inner {
316 /// Newest-first. When `has_current` is true, `slots[0]` is the current
317 /// (advertised) commitment; the rest — and, once retired, `slots[0]` too —
318 /// are retained only because their hash is still in `recently_gossiped` and
319 /// not yet expired.
320 slots: Vec<Arc<BuiltCommitment>>,
321 /// Whether `slots[0]` is the live, advertised current commitment. Set by
322 /// `rotate`; cleared by `retire_current` (and when the slot set empties).
323 /// When false, `current()` returns `None` — the node stops advertising and
324 /// re-gossiping the stale root, so it ages out by its gossip TTL — while
325 /// `lookup_by_hash` still answers any in-flight pin until then. This
326 /// decouples ADVERTISE (gossiped as current, refreshes the TTL) from ANSWER
327 /// (still resolvable during the TTL window).
328 has_current: bool,
329 /// The last `RETAINED_GOSSIPED_COMMITMENTS` commitments actually emitted on
330 /// the wire, newest-first, each stamped with when it was last gossiped. A
331 /// commitment is retained iff it is the live current one or its hash appears
332 /// here with an unexpired stamp.
333 recently_gossiped: Vec<GossipedAt>,
334}
335
336impl Default for ResponderCommitmentState {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342impl ResponderCommitmentState {
343 /// Empty state: no commitments yet. Audits before the first rotation
344 /// see `None` lookups and the auditor falls back to the legacy plain
345 /// digest path.
346 #[must_use]
347 pub fn new() -> Self {
348 Self {
349 inner: RwLock::new(Inner {
350 slots: Vec::with_capacity(RETAINED_GOSSIPED_COMMITMENTS + 1),
351 has_current: false,
352 recently_gossiped: Vec::with_capacity(RETAINED_GOSSIPED_COMMITMENTS),
353 }),
354 }
355 }
356
357 /// Rotate: the freshly-rebuilt commitment becomes `current`. Slots that are
358 /// neither the new current nor among the last gossiped hashes are dropped
359 /// (a built-but-never-gossiped commitment does not linger).
360 pub fn rotate(&self, new_current: BuiltCommitment) {
361 let new_current = Arc::new(new_current);
362 let mut guard = self.inner.write();
363 guard.slots.insert(0, new_current);
364 guard.has_current = true;
365 prune_slots(&mut guard, Instant::now());
366 }
367
368 /// Retire the current commitment WITHOUT clearing retention: stop
369 /// advertising it (so `current()` returns `None`, the gossip-emit sites stop
370 /// re-emitting and re-stamping it, and it can age out by its gossip TTL),
371 /// while keeping it answerable via `lookup_by_hash` for any in-flight pin a
372 /// peer already formed — until that pin's gossip stamp expires.
373 ///
374 /// Called when the node has no key it is still responsible for: it must no
375 /// longer claim to hold that data going forward, but must not strand a peer
376 /// mid-audit on a root it gossiped moments ago. A never-gossiped current is
377 /// simply dropped (nothing to stay answerable for).
378 pub fn retire_current(&self) {
379 let mut guard = self.inner.write();
380 guard.has_current = false;
381 prune_slots(&mut guard, Instant::now());
382 }
383
384 /// Record that `hash` was emitted on the wire (gossiped). Keeps the last
385 /// `RETAINED_GOSSIPED_COMMITMENTS` gossiped hashes so the matching
386 /// commitments stay answerable (ADR-0002). Call at every gossip-emit site.
387 ///
388 /// Re-gossiping a hash already present **refreshes** its answerability
389 /// deadline to now and moves it to the front: every time the node actually
390 /// puts a root on the wire — including re-emitting the current root in the
391 /// steady-state no-op-rotation case — its retention legitimately extends.
392 /// Conversely a root that stops being gossiped expires
393 /// `GOSSIP_ANSWERABILITY_TTL` after its last emission, which is what lets
394 /// an out-of-range key age out even when the no-op guard freezes the
395 /// committed key set.
396 pub fn mark_gossiped(&self, hash: [u8; 32]) {
397 let now = Instant::now();
398 let mut guard = self.inner.write();
399 mark_gossiped_locked(&mut guard, hash, now);
400 }
401
402 /// Atomically snapshot the current commitment to advertise AND mark it
403 /// gossiped, under a single lock. Returns the commitment to put on the wire,
404 /// or `None` if there is no live current (never rotated, or retired).
405 ///
406 /// This is the ONLY correct way to gossip the current commitment: doing
407 /// `current()` then a separate `mark_gossiped()` is a TOCTOU — a concurrent
408 /// `retire_current`/`rotate` between the two could drop the slot, so the node
409 /// would emit a root the responder no longer retains (a peer pinning it would
410 /// get "unknown commitment hash" → false failure). Taking the snapshot and
411 /// the stamp in one critical section guarantees anything emitted is
412 /// simultaneously retained for its answerability TTL.
413 #[must_use]
414 pub fn current_for_gossip(&self) -> Option<Arc<BuiltCommitment>> {
415 let now = Instant::now();
416 let mut guard = self.inner.write();
417 if !guard.has_current {
418 return None;
419 }
420 let current = guard.slots.first().map(Arc::clone)?;
421 mark_gossiped_locked(&mut guard, current.cached_hash, now);
422 Some(current)
423 }
424
425 /// Expire retention purely by the wall clock, without building, signing, or
426 /// rotating anything. Call once per rotation tick so a gossiped commitment's
427 /// answerability deadline advances even when the rotation no-op guard
428 /// returns early (unchanged committed set) or when the node has no
429 /// responsible keys to commit to. This is the time-driven half of the
430 /// retention contract — without it, a frozen `recently_gossiped` entry would
431 /// keep a stale key `is_held` forever.
432 pub fn age_out(&self) {
433 let mut guard = self.inner.write();
434 prune_slots(&mut guard, Instant::now());
435 }
436
437 /// Look up a commitment by its hash. Returns `Some(arc)` if `hash`
438 /// matches any retained slot. The returned `Arc` keeps the
439 /// [`BuiltCommitment`] alive for as long as the caller holds it,
440 /// even if a concurrent `rotate` ages it out of the retention buffer.
441 #[must_use]
442 pub fn lookup_by_hash(&self, hash: &[u8; 32]) -> Option<Arc<BuiltCommitment>> {
443 let guard = self.inner.read();
444 for c in &guard.slots {
445 if &c.cached_hash == hash {
446 return Some(Arc::clone(c));
447 }
448 }
449 None
450 }
451
452 /// Whether `key` is committed under any retained slot (the current
453 /// commitment plus the last-2-gossiped ones) — i.e. whether a peer could
454 /// still pin a recently gossiped root and demand this key's bytes in a
455 /// round-2 byte challenge.
456 ///
457 /// This is the SAME predicate the round-2 responder uses to decide a key is
458 /// "committed" (`handle_subtree_byte_challenge` calls `built.proof_for(key)`
459 /// on the pinned slot, which is committed iff `contains_key`), folded over
460 /// every retained slot. The pruner consults it before deleting an
461 /// out-of-range key, so "the pruner will not delete it" and "the responder
462 /// still owes an answer for it" are provably the same boolean and cannot
463 /// drift. `slots` holds at most `RETAINED_GOSSIPED_COMMITMENTS` + 1
464 /// commitments, and `contains_key` is an allocation-free binary search, so
465 /// this is a short, allocation-free read.
466 #[must_use]
467 pub fn is_held(&self, key: &XorName) -> bool {
468 self.inner.read().slots.iter().any(|c| c.contains_key(key))
469 }
470
471 /// Snapshot the current commitment to ADVERTISE, if any. Used by the gossip
472 /// piggyback path: emit `state.current()` on the next outbound
473 /// `NeighborSyncRequest`/`Response`. Returns `None` once the current
474 /// commitment has been retired (the node has no responsible keys), so the
475 /// node stops re-gossiping a stale root even though `lookup_by_hash` may
476 /// still answer it during its remaining TTL.
477 #[must_use]
478 pub fn current(&self) -> Option<Arc<BuiltCommitment>> {
479 let guard = self.inner.read();
480 if guard.has_current {
481 guard.slots.first().map(Arc::clone)
482 } else {
483 None
484 }
485 }
486
487 /// Number of commitment slots currently retained (the current commitment
488 /// plus any still-answerable recently-gossiped ones). Used only for the
489 /// v12 `commitment_rotated` event's `retained_slots` field; carries no
490 /// behavioural meaning.
491 #[must_use]
492 pub fn retained_slot_count(&self) -> usize {
493 self.inner.read().slots.len()
494 }
495
496 /// Drop every retained slot. Called when the local store has
497 /// transitioned to empty: keeping the previously-advertised
498 /// commitment alive would invite audit failures (we can no longer
499 /// answer for any of the keys we committed to), and would leave
500 /// remote auditors pinning a hash this node will never satisfy
501 /// again. After clearing, the gossip piggyback path will emit
502 /// `commitment: None` until a fresh rotation occurs.
503 ///
504 /// This is the one sanctioned escape from the "callers MUST NOT
505 /// clear retention by any other mechanism" invariant — empty
506 /// storage means there is nothing to retain.
507 pub fn clear_all(&self) {
508 let mut guard = self.inner.write();
509 guard.slots.clear();
510 guard.has_current = false;
511 guard.recently_gossiped.clear();
512 }
513}
514
515/// Enforce retention as of `now`: first expire any gossip record older than
516/// `GOSSIP_ANSWERABILITY_TTL`, then keep the live current slot (only while
517/// `has_current`) and any slot whose hash is still among the unexpired
518/// recently-gossiped hashes; drop the rest. Idempotent; preserves newest-first
519/// order. This is the single place retention is enforced.
520///
521/// The current-slot exemption is conditional on `has_current`: once the current
522/// commitment is retired (no responsible keys), `slots[0]` is no longer exempt
523/// and ages out by its own gossip TTL exactly like any other retained slot —
524/// the fix that stops a stale, continuously-re-gossiped current from pinning its
525/// keys forever.
526/// Stamp `hash` as gossiped at `now` (newest-first, de-duplicated, bounded to
527/// `RETAINED_GOSSIPED_COMMITMENTS`) and re-run retention. Shared by
528/// `mark_gossiped` and `current_for_gossip` so the snapshot-and-stamp can be one
529/// critical section.
530fn mark_gossiped_locked(inner: &mut Inner, hash: [u8; 32], now: Instant) {
531 inner.recently_gossiped.retain(|g| g.hash != hash);
532 inner.recently_gossiped.insert(
533 0,
534 GossipedAt {
535 hash,
536 last_gossiped_at: now,
537 },
538 );
539 inner
540 .recently_gossiped
541 .truncate(RETAINED_GOSSIPED_COMMITMENTS);
542 prune_slots(inner, now);
543}
544
545fn prune_slots(inner: &mut Inner, now: Instant) {
546 // 1. TTL-expire gossip records first (the answerability anchor). A record
547 // whose last gossip is older than the window no longer keeps anything
548 // answerable, regardless of distinct-hash churn or rotation ticks.
549 inner
550 .recently_gossiped
551 .retain(|g| now.duration_since(g.last_gossiped_at) < GOSSIP_ANSWERABILITY_TTL);
552
553 // 2. Keep the live current slot (only while has_current) + any slot still
554 // covered by an unexpired record. Snapshot the live hashes first to avoid
555 // borrowing `inner` twice (both collections are at most
556 // RETAINED_GOSSIPED_COMMITMENTS + 1 long).
557 let live: Vec<[u8; 32]> = inner.recently_gossiped.iter().map(|g| g.hash).collect();
558 let has_current = inner.has_current;
559 let mut idx = 0usize;
560 inner.slots.retain(|c| {
561 let keep = (has_current && idx == 0) || live.contains(&c.cached_hash);
562 idx += 1;
563 keep
564 });
565 // If nothing remains, there is no current slot to advertise.
566 if inner.slots.is_empty() {
567 inner.has_current = false;
568 }
569}
570
571// ---------------------------------------------------------------------------
572// Tests
573// ---------------------------------------------------------------------------
574
575#[cfg(test)]
576#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
577mod tests {
578 use super::*;
579 use crate::replication::commitment::{commitment_hash, leaf_hash, verify_path};
580 use saorsa_pqc::api::sig::ml_dsa_65;
581
582 fn key(byte: u8) -> XorName {
583 let mut k = [0u8; 32];
584 k[0] = byte;
585 k
586 }
587
588 fn bh(byte: u8) -> [u8; 32] {
589 [byte ^ 0x5A; 32]
590 }
591
592 fn keypair() -> (saorsa_pqc::api::sig::MlDsaPublicKey, MlDsaSecretKey) {
593 ml_dsa_65().generate_keypair().unwrap()
594 }
595
596 #[test]
597 fn built_commitment_hash_matches_global_hash() {
598 let (pk, sk) = keypair();
599 let pk_bytes = pk.to_bytes();
600 let entries: Vec<_> = (1..=5u8).map(|i| (key(i), bh(i))).collect();
601 let built = BuiltCommitment::build(entries, &[0xAB; 32], &sk, &pk_bytes).unwrap();
602 let expected = commitment_hash(built.commitment()).unwrap();
603 assert_eq!(built.hash(), expected);
604 }
605
606 #[test]
607 fn built_commitment_proof_verifies_under_its_own_root() {
608 let (pk, sk) = keypair();
609 let pk_bytes = pk.to_bytes();
610 let entries: Vec<_> = (1..=8u8).map(|i| (key(i), bh(i))).collect();
611 let built = BuiltCommitment::build(entries.clone(), &[1; 32], &sk, &pk_bytes).unwrap();
612 let root = built.commitment().root;
613 let key_count = built.commitment().key_count;
614
615 for (k, _) in &entries {
616 let (path, leaf_index) = built.proof_for(k).expect("present");
617 // Find the bytes_hash for this key.
618 let bh_k = entries.iter().find(|(kk, _)| kk == k).unwrap().1;
619 let lh = leaf_hash(k, &bh_k);
620 assert!(
621 verify_path(&lh, &path, leaf_index as usize, key_count, &root),
622 "path verify failed for key {k:?}"
623 );
624 }
625 }
626
627 #[test]
628 fn proof_for_absent_key_is_none() {
629 let (pk, sk) = keypair();
630 let pk_bytes = pk.to_bytes();
631 let built = BuiltCommitment::build(
632 vec![(key(1), bh(1)), (key(2), bh(2))],
633 &[0; 32],
634 &sk,
635 &pk_bytes,
636 )
637 .unwrap();
638 assert!(built.proof_for(&key(99)).is_none());
639 }
640
641 #[test]
642 fn empty_state_returns_none() {
643 let state = ResponderCommitmentState::new();
644 assert!(state.current().is_none());
645 assert!(state.lookup_by_hash(&[0; 32]).is_none());
646 }
647
648 #[test]
649 fn clear_all_drops_every_slot() {
650 // Empty-storage transition: after clear_all, the gossip path
651 // must observe `current() == None` so it stops piggybacking a
652 // commitment the node can no longer answer audits against.
653 let (pk, sk) = keypair();
654 let pk_bytes = pk.to_bytes();
655 let state = ResponderCommitmentState::new();
656 let peer_id = *blake3::hash(&pk.to_bytes()).as_bytes();
657
658 let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &peer_id, &sk, &pk_bytes).unwrap();
659 let h1 = c1.hash();
660 state.rotate(c1);
661 state.mark_gossiped(h1); // gossiped → retained across the next rotation
662 let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &peer_id, &sk, &pk_bytes).unwrap();
663 let h2 = c2.hash();
664 state.rotate(c2);
665 state.mark_gossiped(h2);
666
667 assert!(state.current().is_some());
668 assert!(state.lookup_by_hash(&h1).is_some());
669
670 state.clear_all();
671
672 assert!(state.current().is_none());
673 assert!(state.lookup_by_hash(&h1).is_none());
674 }
675
676 #[test]
677 fn lookup_arc_outlives_subsequent_rotation() {
678 // INV-R2: an in-flight audit responder that grabbed an Arc must
679 // be able to finish building the response even after the state
680 // rotates that commitment out past the retention window.
681 let (pk, sk) = keypair();
682 let pk_bytes = pk.to_bytes();
683 let state = ResponderCommitmentState::new();
684
685 let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
686 let h1 = c1.hash();
687 state.rotate(c1);
688
689 let in_flight = state.lookup_by_hash(&h1).unwrap();
690
691 // c1 was never gossiped, so the next rotation (a new current) drops it
692 // from the retention buffer.
693 let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &[0; 32], &sk, &pk_bytes).unwrap();
694 state.rotate(c2);
695 assert!(state.lookup_by_hash(&h1).is_none());
696
697 // But the in-flight Arc still works (INV: Arc keeps it alive).
698 assert_eq!(in_flight.hash(), h1);
699 assert!(in_flight.proof_for(&key(1)).is_some());
700 }
701
702 #[test]
703 fn gossiped_commitment_stays_answerable_across_rotations() {
704 // ADR-0002: a commitment that was actually gossiped stays answerable
705 // even after rotation, until it falls out of the last-2-gossiped window.
706 let (pk, sk) = keypair();
707 let pk_bytes = pk.to_bytes();
708 let state = ResponderCommitmentState::new();
709
710 let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
711 let h1 = c1.hash();
712 state.rotate(c1);
713 state.mark_gossiped(h1); // we put c1 on the wire
714
715 // Rotate to c2 and gossip it. c1 is still within the last-2-gossiped.
716 let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &[0; 32], &sk, &pk_bytes).unwrap();
717 let h2 = c2.hash();
718 state.rotate(c2);
719 state.mark_gossiped(h2);
720 assert!(
721 state.lookup_by_hash(&h1).is_some(),
722 "c1 must stay answerable"
723 );
724 assert!(state.lookup_by_hash(&h2).is_some());
725
726 // Rotate to c3 and gossip it. Now the last-2-gossiped are {h3, h2};
727 // h1 has fallen out of the window and is dropped.
728 let c3 = BuiltCommitment::build(vec![(key(3), bh(3))], &[0; 32], &sk, &pk_bytes).unwrap();
729 let h3 = c3.hash();
730 state.rotate(c3);
731 state.mark_gossiped(h3);
732 assert!(
733 state.lookup_by_hash(&h1).is_none(),
734 "c1 aged out of gossip window"
735 );
736 assert!(state.lookup_by_hash(&h2).is_some());
737 assert!(state.lookup_by_hash(&h3).is_some());
738 }
739
740 #[test]
741 fn current_plus_last_two_gossiped_are_simultaneously_answerable() {
742 // ADR-0002 "Two, not one": the retention depth must keep BOTH of the
743 // last two gossiped commitments answerable at the same time, alongside
744 // the current one. This is the property that "absorbs the race where an
745 // auditor asks about the commitment a node published just before its
746 // newest one". The existing across-rotations test only ever checks two
747 // hashes at once; this one proves three DISTINCT commitments are live
748 // simultaneously and that the third-oldest gossiped root is dropped —
749 // i.e. RETAINED_GOSSIPED_COMMITMENTS is exactly 2, not 1 and not 3.
750 let (pk, sk) = keypair();
751 let pk_bytes = pk.to_bytes();
752 let state = ResponderCommitmentState::new();
753
754 // Gossip three commitments in order: c1, c2, c3. After this the current
755 // slot is c3 and the last-two-gossiped are {h3, h2}. But c2 and c1 also
756 // need to be checked relative to the window: once c3 is gossiped, the
757 // window is {h3, h2}; c1 (the 3rd-oldest gossiped) must be gone.
758 let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
759 let h1 = c1.hash();
760 state.rotate(c1);
761 state.mark_gossiped(h1);
762
763 let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &[0; 32], &sk, &pk_bytes).unwrap();
764 let h2 = c2.hash();
765 state.rotate(c2);
766 state.mark_gossiped(h2);
767
768 // At this moment: current = c2, last-2-gossiped = {h2, h1}. Both the
769 // current AND the previously-gossiped c1 must be answerable — the "two,
770 // not one" race window. c1 is the commitment "published just before the
771 // newest one" and an auditor may still pin it.
772 assert!(
773 state.lookup_by_hash(&h1).is_some(),
774 "the commitment published just before the newest one must stay answerable"
775 );
776 assert!(
777 state.lookup_by_hash(&h2).is_some(),
778 "current must be answerable"
779 );
780 assert_ne!(h1, h2, "the two retained commitments must be distinct");
781
782 // Now gossip a third distinct commitment c3. Window becomes {h3, h2}.
783 // c3 (current) + c2 + c1: c1 must now be dropped (3rd-oldest gossiped),
784 // while c2 and c3 remain. This proves depth is exactly 2 beyond... no:
785 // depth is 2 gossiped TOTAL including current's hash once gossiped.
786 let c3 = BuiltCommitment::build(vec![(key(3), bh(3))], &[0; 32], &sk, &pk_bytes).unwrap();
787 let h3 = c3.hash();
788 state.rotate(c3);
789 state.mark_gossiped(h3);
790
791 assert_ne!(h2, h3);
792 assert_ne!(h1, h3);
793 assert!(
794 state.lookup_by_hash(&h3).is_some(),
795 "current (c3) answerable"
796 );
797 assert!(
798 state.lookup_by_hash(&h2).is_some(),
799 "c2 (published just before newest) answerable — the race-absorbing slot"
800 );
801 assert!(
802 state.lookup_by_hash(&h1).is_none(),
803 "c1 is the 3rd-oldest gossiped root and MUST be dropped — depth is exactly 2"
804 );
805 }
806
807 #[test]
808 fn is_held_tracks_keys_across_the_retention_window_and_ages_them_out() {
809 // The pruner's deletion veto relies on `is_held`: a key committed under
810 // ANY retained slot (current + last-2-gossiped) must read held, and must
811 // stop reading held once its commitment ages out of the window — that is
812 // the bounded reprieve, not a permanent pin. This mirrors the
813 // round-2 responder's `built.proof_for(key).is_some()` check folded over
814 // the slots, so "pruner won't delete" == "responder owes an answer".
815 let (pk, sk) = keypair();
816 let pk_bytes = pk.to_bytes();
817 let state = ResponderCommitmentState::new();
818
819 // c1 commits to key(1). Gossip it -> key(1) is held (current slot).
820 let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
821 let h1 = c1.hash();
822 state.rotate(c1);
823 state.mark_gossiped(h1);
824 assert!(
825 state.is_held(&key(1)),
826 "freshly committed+gossiped key is held"
827 );
828 assert!(!state.is_held(&key(99)), "never-committed key is not held");
829
830 // c2 commits to key(2) only (key(1) dropped from the new commitment,
831 // e.g. it went out of range). key(1) must STILL be held via the retained
832 // previous gossiped slot (the race-absorbing window), and key(2) too.
833 let c2 = BuiltCommitment::build(vec![(key(2), bh(2))], &[0; 32], &sk, &pk_bytes).unwrap();
834 let h2 = c2.hash();
835 state.rotate(c2);
836 state.mark_gossiped(h2);
837 assert!(
838 state.is_held(&key(1)),
839 "key dropped from the newest commitment is still held via the previous gossiped slot"
840 );
841 assert!(state.is_held(&key(2)), "newly committed key is held");
842
843 // c3 commits to key(3). Window becomes {h3, h2}; h1 ages out, so key(1)
844 // is no longer held anywhere -> the pruner may now reclaim it.
845 let c3 = BuiltCommitment::build(vec![(key(3), bh(3))], &[0; 32], &sk, &pk_bytes).unwrap();
846 let h3 = c3.hash();
847 state.rotate(c3);
848 state.mark_gossiped(h3);
849 assert!(
850 !state.is_held(&key(1)),
851 "key whose commitments all aged out of the retention window is no longer held"
852 );
853 assert!(
854 state.is_held(&key(2)),
855 "key(2) still held via the previous gossiped slot"
856 );
857 assert!(state.is_held(&key(3)), "current key held");
858 }
859
860 /// Build a `BuiltCommitment` over the given keys for use in raw `prune_slots`
861 /// tests (each key's `bytes_hash` is `bh(k[0])`).
862 fn built(keys: &[u8]) -> BuiltCommitment {
863 let (pk, sk) = keypair();
864 let entries: Vec<_> = keys.iter().map(|&b| (key(b), bh(b))).collect();
865 BuiltCommitment::build(entries, &[0; 32], &sk, &pk.to_bytes()).unwrap()
866 }
867
868 #[test]
869 fn stale_gossip_record_expires_by_ttl_even_without_new_distinct_gossip() {
870 // Frozen-retention-window regression: the no-op-rotation guard can freeze
871 // `recently_gossiped` (no new distinct hash is ever gossiped once the
872 // responsible key set stabilizes). The retention window must still age a
873 // stale gossiped commitment out by the WALL CLOCK, so its key stops
874 // being `is_held` and the pruner can reclaim it. Driven directly through
875 // `prune_slots(now)` with a synthetic clock so it is deterministic.
876 let c_current = Arc::new(built(&[1])); // root over key(1) — current
877 let c_stale = Arc::new(built(&[2])); // root over key(2) — out-of-range, only retained via gossip
878 let h_current = c_current.hash();
879 let h_stale = c_stale.hash();
880
881 // Synthetic clock: stamps anchor at `base` and the prune evaluates at a
882 // FUTURE `now` (adding to an `Instant` never underflows, unlike
883 // subtracting a TTL from a fresh Windows monotonic clock). The stale
884 // record was last gossiped just over the TTL before `now`; the current
885 // record was gossiped at `now`. This is exactly the frozen-window state:
886 // current keeps being re-gossiped (refreshing its stamp) while the stale
887 // root is never gossiped again.
888 let base = Instant::now();
889 let now = base + GOSSIP_ANSWERABILITY_TTL + Duration::from_secs(1);
890 let mut inner = Inner {
891 slots: vec![Arc::clone(&c_current), Arc::clone(&c_stale)],
892 has_current: true,
893 recently_gossiped: vec![
894 GossipedAt {
895 hash: h_current,
896 last_gossiped_at: now,
897 },
898 GossipedAt {
899 hash: h_stale,
900 last_gossiped_at: base,
901 },
902 ],
903 };
904
905 prune_slots(&mut inner, now);
906
907 // The stale record (and its slot) must be gone; the current one stays.
908 assert!(
909 inner.recently_gossiped.iter().all(|g| g.hash != h_stale),
910 "stale gossip record past its TTL must expire"
911 );
912 assert_eq!(inner.slots.len(), 1, "the stale slot must be dropped");
913 assert_eq!(inner.slots[0].hash(), h_current, "current slot retained");
914 // key(2) — committed only under the now-expired stale slot — is no
915 // longer held, so the pruner may reclaim it. key(1) stays held.
916 assert!(
917 inner.slots.iter().all(|c| c.proof_for(&key(2)).is_none()),
918 "stale key is no longer held once its commitment ages out"
919 );
920 assert!(
921 inner.slots.iter().any(|c| c.proof_for(&key(1)).is_some()),
922 "current key still held"
923 );
924 }
925
926 #[test]
927 fn recent_gossip_record_stays_answerable_within_ttl() {
928 // Early-drop regression: a commitment gossiped recently (within the TTL)
929 // must remain answerable even if it is no longer the current root — a
930 // peer may still have pinned it. `prune_slots` must NOT drop it early.
931 let c_current = Arc::new(built(&[1]));
932 let c_prev = Arc::new(built(&[2]));
933 let h_current = c_current.hash();
934 let h_prev = c_prev.hash();
935
936 // Synthetic clock (forward-only, see the stale-expiry test above).
937 let base = Instant::now();
938 let now = base + GOSSIP_ANSWERABILITY_TTL / 2;
939 let mut inner = Inner {
940 slots: vec![Arc::clone(&c_current), Arc::clone(&c_prev)],
941 has_current: true,
942 recently_gossiped: vec![
943 GossipedAt {
944 hash: h_current,
945 last_gossiped_at: now,
946 },
947 GossipedAt {
948 // Gossiped a while ago, but still comfortably within the TTL.
949 hash: h_prev,
950 last_gossiped_at: base,
951 },
952 ],
953 };
954
955 prune_slots(&mut inner, now);
956
957 assert_eq!(
958 inner.slots.len(),
959 2,
960 "a commitment gossiped within the TTL must stay answerable (the 'two, not one' race window)"
961 );
962 assert!(
963 inner.slots.iter().any(|c| c.hash() == h_prev),
964 "the recently-gossiped previous commitment must not be dropped early"
965 );
966 }
967
968 #[test]
969 fn retire_current_hides_current_but_keeps_recent_pin_answerable() {
970 // Retire-current regression: retiring the current commitment (no responsible
971 // keys) must STOP advertising it (current() -> None, so the gossip loop
972 // stops re-stamping it) while keeping it answerable for an in-flight pin.
973 let state = ResponderCommitmentState::new();
974 let c1 = built(&[1]);
975 let h1 = c1.hash();
976 state.rotate(c1);
977 state.mark_gossiped(h1);
978
979 assert!(state.current().is_some(), "fresh current is advertised");
980
981 state.retire_current();
982
983 assert!(
984 state.current().is_none(),
985 "retired current must not be advertised (stops the gossip loop re-stamping it)"
986 );
987 assert!(
988 state.lookup_by_hash(&h1).is_some(),
989 "retired current stays answerable for an in-flight pin within its TTL"
990 );
991 assert!(
992 state.is_held(&key(1)),
993 "its keys are still held while answerable, so the pruner still vetoes them"
994 );
995 }
996
997 #[test]
998 fn retired_current_ages_out_by_gossip_ttl() {
999 // The retired current must age out by its gossip TTL — the exact fix for
1000 // the stale-current permanent pin: its record is never refreshed (not
1001 // advertised), so once the TTL lapses prune_slots drops it.
1002 let c1 = Arc::new(built(&[1]));
1003 let h1 = c1.hash();
1004 // Synthetic clock (forward-only, see the stale-expiry test above).
1005 let base = Instant::now();
1006 let now = base + GOSSIP_ANSWERABILITY_TTL + Duration::from_secs(1);
1007 let mut inner = Inner {
1008 slots: vec![Arc::clone(&c1)],
1009 has_current: false, // already retired
1010 recently_gossiped: vec![GossipedAt {
1011 hash: h1,
1012 last_gossiped_at: base,
1013 }],
1014 };
1015
1016 prune_slots(&mut inner, now);
1017
1018 assert!(
1019 inner.slots.is_empty(),
1020 "retired current past its TTL is dropped"
1021 );
1022 assert!(!inner.has_current);
1023 assert!(
1024 inner.slots.iter().all(|c| c.proof_for(&key(1)).is_none()),
1025 "its key is no longer held -> pruner reclaims it"
1026 );
1027 }
1028
1029 #[test]
1030 fn retired_current_stays_answerable_within_ttl() {
1031 // A retired current within its TTL must remain answerable (not dropped).
1032 let c1 = Arc::new(built(&[1]));
1033 let h1 = c1.hash();
1034 // Synthetic clock (forward-only, see the stale-expiry test above).
1035 let base = Instant::now();
1036 let now = base + GOSSIP_ANSWERABILITY_TTL / 2;
1037 let mut inner = Inner {
1038 slots: vec![Arc::clone(&c1)],
1039 has_current: false, // retired
1040 recently_gossiped: vec![GossipedAt {
1041 hash: h1,
1042 last_gossiped_at: base,
1043 }],
1044 };
1045
1046 prune_slots(&mut inner, now);
1047
1048 assert_eq!(
1049 inner.slots.len(),
1050 1,
1051 "retired-but-recent current stays answerable"
1052 );
1053 assert_eq!(inner.slots[0].hash(), h1);
1054 }
1055
1056 #[test]
1057 fn re_acquire_after_retire_advertises_fresh_current_without_resurrecting_stale() {
1058 // Re-acquire path: a node retires its current (went out of range), then
1059 // becomes responsible again and rotates a fresh commitment. The fresh
1060 // one must become the advertised current; the retired one must only
1061 // linger as a retained (answerable) slot if still gossiped+unexpired,
1062 // never resurrect as current.
1063 let state = ResponderCommitmentState::new();
1064 let c1 = built(&[1]);
1065 let h1 = c1.hash();
1066 state.rotate(c1);
1067 state.mark_gossiped(h1); // gossiped, so it stays answerable after retire
1068 state.retire_current();
1069 assert!(state.current().is_none());
1070
1071 // Become responsible again: rotate a fresh commitment.
1072 let c2 = built(&[2]);
1073 let h2 = c2.hash();
1074 state.rotate(c2);
1075 state.mark_gossiped(h2);
1076
1077 let cur = state
1078 .current()
1079 .expect("fresh current advertised after re-acquire");
1080 assert_eq!(
1081 cur.hash(),
1082 h2,
1083 "the FRESH commitment is current, not the retired one"
1084 );
1085 assert!(
1086 state.lookup_by_hash(&h1).is_some(),
1087 "the retired-but-recently-gossiped commitment is still answerable as a retained slot"
1088 );
1089 assert!(
1090 state.is_held(&key(1)),
1091 "retired key still held within its TTL"
1092 );
1093 assert!(state.is_held(&key(2)), "fresh current key held");
1094 }
1095
1096 #[test]
1097 fn retire_current_drops_ungossiped_current() {
1098 // A current that was never gossiped has nothing to stay answerable for,
1099 // so retiring it drops it outright (no lookup, no current).
1100 let state = ResponderCommitmentState::new();
1101 let c1 = built(&[1]);
1102 let h1 = c1.hash();
1103 state.rotate(c1); // built but NOT gossiped
1104
1105 state.retire_current();
1106
1107 assert!(state.current().is_none(), "no current after retire");
1108 assert!(
1109 state.lookup_by_hash(&h1).is_none(),
1110 "an ungossiped retired current is not answerable (nothing to retain)"
1111 );
1112 assert!(!state.is_held(&key(1)));
1113 }
1114
1115 #[test]
1116 fn ungossiped_rebuild_does_not_evict_gossiped_commitment() {
1117 // The rebuild-faster-than-gossip case: a node rebuilds (rotates) several
1118 // times without gossiping. The last *gossiped* commitment must remain
1119 // answerable so the node is not wrongly failed for "unknown hash".
1120 let (pk, sk) = keypair();
1121 let pk_bytes = pk.to_bytes();
1122 let state = ResponderCommitmentState::new();
1123
1124 let c1 = BuiltCommitment::build(vec![(key(1), bh(1))], &[0; 32], &sk, &pk_bytes).unwrap();
1125 let h1 = c1.hash();
1126 state.rotate(c1);
1127 state.mark_gossiped(h1);
1128
1129 // Several ungossiped rebuilds.
1130 for i in 2..=6u8 {
1131 let c =
1132 BuiltCommitment::build(vec![(key(i), bh(i))], &[0; 32], &sk, &pk_bytes).unwrap();
1133 state.rotate(c);
1134 }
1135 // h1 was gossiped and is still within the last-2-gossiped window
1136 // (nothing else was gossiped), so it must still be answerable.
1137 assert!(
1138 state.lookup_by_hash(&h1).is_some(),
1139 "gossiped commitment must survive ungossiped rebuilds"
1140 );
1141 }
1142}