Skip to main content

ant_node/replication/
types.rs

1//! Core types for the replication subsystem.
2//!
3//! These types represent the state machine states, queue entries, and domain
4//! concepts from the Kademlia-style replication design (see
5//! `docs/REPLICATION_DESIGN.md`).
6
7use std::cmp::Ordering;
8use std::collections::{HashMap, HashSet};
9use std::time::{Duration, Instant};
10
11use serde::{Deserialize, Serialize};
12
13use crate::ant_protocol::XorName;
14use saorsa_core::identity::PeerId;
15
16// ---------------------------------------------------------------------------
17// Verification state machine (Section 8 of REPLICATION_DESIGN.md)
18// ---------------------------------------------------------------------------
19
20/// Verification state machine.
21///
22/// Each unknown key transitions through these states exactly once per offer
23/// lifecycle.  See Section 8 of `REPLICATION_DESIGN.md` for the full
24/// state-transition diagram.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum VerificationState {
27    /// Offer received, not yet processed.
28    OfferReceived,
29    /// Passed admission filter, awaiting quorum / paid-list verification.
30    PendingVerify,
31    /// Presence quorum passed (>= `QuorumNeeded` positives from
32    /// `QuorumTargets`).
33    QuorumVerified,
34    /// Paid-list authorisation succeeded (>= `ConfirmNeeded` confirmations or
35    /// derived from replica majority).
36    PaidListVerified,
37    /// Queued for record fetch.
38    QueuedForFetch,
39    /// Actively fetching from a verified source.
40    Fetching,
41    /// Successfully stored locally.
42    Stored,
43    /// Fetch failed but retryable (alternate sources remain).
44    FetchRetryable,
45    /// Fetch permanently abandoned (terminal failure or no alternate sources).
46    FetchAbandoned,
47    /// Quorum failed definitively (both paid-list and presence impossible this
48    /// round).
49    QuorumFailed,
50    /// Quorum inconclusive (timeout with neither success nor fail-fast).
51    QuorumInconclusive,
52    /// Terminal: quorum abandoned, key forgotten.
53    QuorumAbandoned,
54    /// Terminal: key returned to idle (forgotten, requires new offer to
55    /// re-enter).
56    Idle,
57}
58
59// ---------------------------------------------------------------------------
60// Hint pipeline classification
61// ---------------------------------------------------------------------------
62
63/// Whether a key was admitted via replica hints or paid hints only.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65pub enum HintPipeline {
66    /// Key is in the admitted replica-hint pipeline (fetch-eligible).
67    Replica,
68    /// Key is in the paid-hint-only pipeline (`PaidForList` update only, no
69    /// fetch).
70    PaidOnly,
71}
72
73// ---------------------------------------------------------------------------
74// Pending-verification table entry
75// ---------------------------------------------------------------------------
76
77/// Entry in the pending-verification table.
78///
79/// Tracks a single key through the verification FSM, recording which peers
80/// responded and which have been tried for fetch.
81#[derive(Debug, Clone)]
82pub struct VerificationEntry {
83    /// Current state in the verification FSM.
84    pub state: VerificationState,
85    /// Which pipeline admitted this key.
86    pub pipeline: HintPipeline,
87    /// Peers that responded `Present` during verification (verified fetch
88    /// sources).
89    pub verified_sources: Vec<PeerId>,
90    /// Peers already tried for fetch (to avoid retrying the same source).
91    pub tried_sources: HashSet<PeerId>,
92    /// When this entry was created.
93    pub created_at: Instant,
94    /// The peer that originally hinted this key (for source tracking).
95    pub hint_sender: PeerId,
96}
97
98// ---------------------------------------------------------------------------
99// Fetch queue candidate
100// ---------------------------------------------------------------------------
101
102/// A candidate queued for fetch, ordered by relevance (nearest-first).
103///
104/// Implements [`Ord`] with *reversed* distance comparison so that a
105/// [`BinaryHeap`](std::collections::BinaryHeap) (max-heap) dequeues the
106/// nearest key first.
107#[derive(Debug, Clone)]
108pub struct FetchCandidate {
109    /// The key to fetch.
110    pub key: XorName,
111    /// XOR distance from self to key (for priority ordering).
112    pub distance: XorName,
113    /// Verified source peers that responded `Present`.
114    pub sources: Vec<PeerId>,
115}
116
117impl Eq for FetchCandidate {}
118
119impl PartialEq for FetchCandidate {
120    fn eq(&self, other: &Self) -> bool {
121        self.distance == other.distance && self.key == other.key
122    }
123}
124
125impl Ord for FetchCandidate {
126    fn cmp(&self, other: &Self) -> Ordering {
127        // Reverse ordering: smaller distance = higher priority (BinaryHeap is
128        // max-heap).  Tie-break on key for consistency with PartialEq.
129        other
130            .distance
131            .cmp(&self.distance)
132            .then_with(|| self.key.cmp(&other.key))
133    }
134}
135
136impl PartialOrd for FetchCandidate {
137    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
138        Some(self.cmp(other))
139    }
140}
141
142// ---------------------------------------------------------------------------
143// Verification evidence types
144// ---------------------------------------------------------------------------
145
146/// Per-key presence evidence from a verification round.
147#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
148pub enum PresenceEvidence {
149    /// Peer holds the record.
150    Present,
151    /// Peer does not hold the record.
152    Absent,
153    /// Peer did not respond in time (neutral, not negative).
154    Unresolved,
155}
156
157/// Per-key paid-list evidence from a verification round.
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
159pub enum PaidListEvidence {
160    /// Peer confirms key is in its `PaidForList`.
161    Confirmed,
162    /// Peer says key is NOT in its `PaidForList`.
163    NotFound,
164    /// Peer did not respond in time (neutral).
165    Unresolved,
166}
167
168/// Aggregated verification evidence for a single key from one verification
169/// round.
170#[derive(Debug, Clone)]
171pub struct KeyVerificationEvidence {
172    /// Presence evidence per peer (from `QuorumTargets`).
173    pub presence: HashMap<PeerId, PresenceEvidence>,
174    /// Paid-list evidence per peer (from `PaidTargets`).
175    pub paid_list: HashMap<PeerId, PaidListEvidence>,
176}
177
178// ---------------------------------------------------------------------------
179// Failure evidence (Section 14 — TrustEngine integration)
180// ---------------------------------------------------------------------------
181
182/// Failure evidence types emitted to `TrustEngine` (Section 14).
183#[derive(Debug, Clone)]
184pub enum FailureEvidence {
185    /// Failed fetch attempt from a source peer.
186    ReplicationFailure {
187        /// The peer that failed to serve the record.
188        peer: PeerId,
189        /// The key that could not be fetched.
190        key: XorName,
191    },
192    /// Audit failure with confirmed responsible keys.
193    AuditFailure {
194        /// Unique identifier for the audit challenge.
195        challenge_id: u64,
196        /// The peer that was challenged.
197        challenged_peer: PeerId,
198        /// Keys confirmed as failed.
199        confirmed_failed_keys: Vec<XorName>,
200        /// Why the audit failed.
201        reason: AuditFailureReason,
202    },
203    /// Peer claiming bootstrap past grace period.
204    BootstrapClaimAbuse {
205        /// The offending peer.
206        peer: PeerId,
207        /// When this peer was first seen.
208        first_seen: Instant,
209    },
210}
211
212/// Reason for audit failure.
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214pub enum AuditFailureReason {
215    /// Peer timed out (no response within deadline).
216    Timeout,
217    /// Response was malformed.
218    MalformedResponse,
219    /// One or more per-key digest mismatches.
220    DigestMismatch,
221    /// Key was absent (signalled by sentinel digest).
222    KeyAbsent,
223    /// Peer explicitly rejected the audit challenge.
224    Rejected,
225}
226
227// ---------------------------------------------------------------------------
228// Peer sync tracking
229// ---------------------------------------------------------------------------
230
231/// Record of sync history with a peer, for `RepairOpportunity` tracking.
232#[derive(Debug, Clone)]
233pub struct PeerSyncRecord {
234    /// Last time we successfully synced with this peer.
235    pub last_sync: Option<Instant>,
236    /// Number of full neighbor-sync cycles completed since last sync with this
237    /// peer.
238    pub cycles_since_sync: u32,
239}
240
241impl PeerSyncRecord {
242    /// Whether this peer has had a repair opportunity (synced at least once
243    /// and at least one subsequent cycle has completed).
244    #[must_use]
245    pub fn has_repair_opportunity(&self) -> bool {
246        self.last_sync.is_some() && self.cycles_since_sync >= 1
247    }
248}
249
250// ---------------------------------------------------------------------------
251// Repair proof tracking
252// ---------------------------------------------------------------------------
253
254/// Evidence that this node has sent a replica repair hint for a key to a peer.
255#[derive(Debug, Clone)]
256struct RepairProof {
257    /// Local neighbor-sync cycle epoch when the hint was sent.
258    hinted_at_epoch: u64,
259}
260
261/// Repair proofs for one key, tied to the close-group snapshot they were
262/// recorded against.
263#[derive(Debug, Clone)]
264struct RepairProofEntry {
265    /// Self-inclusive close group observed when these proofs were recorded.
266    close_peers: HashSet<PeerId>,
267    /// Per-peer proof metadata for peers in `close_peers`.
268    peer_proofs: HashMap<PeerId, RepairProof>,
269}
270
271impl RepairProofEntry {
272    fn new(close_peers: HashSet<PeerId>) -> Self {
273        Self {
274            close_peers,
275            peer_proofs: HashMap::new(),
276        }
277    }
278}
279
280/// Evidence that this node has sent replica repair hints for local keys.
281///
282/// The map is keyed by record key so each key retains only one close-group
283/// snapshot and at most that snapshot's peers. This bounds memory by local key
284/// count times the replication close-group size rather than by churn history.
285#[derive(Debug, Clone, Default)]
286pub struct RepairProofs {
287    /// Key-scoped repair proofs.
288    proofs_by_key: HashMap<XorName, RepairProofEntry>,
289}
290
291impl RepairProofs {
292    /// Create an empty repair-proof table.
293    #[must_use]
294    pub fn new() -> Self {
295        Self::default()
296    }
297
298    /// Record that `peer` was sent a replica repair hint for `key`.
299    ///
300    /// `current_close_peers` must be the current self-inclusive close group for
301    /// `key`. If that close group differs from the previous proof snapshot,
302    /// proofs for peers that left the close group are invalidated before
303    /// recording. Stable peers keep their proofs, while a peer that leaves and
304    /// later re-enters still needs a fresh hint.
305    pub fn record_replica_hint_sent(
306        &mut self,
307        peer: PeerId,
308        key: XorName,
309        current_close_peers: &HashSet<PeerId>,
310        hinted_at_epoch: u64,
311    ) -> bool {
312        self.reconcile_key_close_group(&key, current_close_peers);
313
314        if !current_close_peers.contains(&peer) {
315            return false;
316        }
317
318        let entry = self
319            .proofs_by_key
320            .entry(key)
321            .or_insert_with(|| RepairProofEntry::new(current_close_peers.clone()));
322
323        if entry.peer_proofs.contains_key(&peer) {
324            return false;
325        }
326
327        entry
328            .peer_proofs
329            .insert(peer, RepairProof { hinted_at_epoch });
330        true
331    }
332
333    /// Whether this node has mature repair-hint evidence for `(peer, key)`.
334    ///
335    /// The check invalidates proofs for peers that have left the current
336    /// self-inclusive close group. A proof is mature only after at least one
337    /// later local sync-cycle epoch.
338    pub fn has_mature_replica_hint(
339        &mut self,
340        peer: &PeerId,
341        key: &XorName,
342        current_close_peers: &HashSet<PeerId>,
343        current_epoch: u64,
344    ) -> bool {
345        self.reconcile_key_close_group(key, current_close_peers);
346
347        self.proofs_by_key
348            .get(key)
349            .and_then(|entry| entry.peer_proofs.get(peer))
350            .is_some_and(|proof| proof.hinted_at_epoch < current_epoch)
351    }
352
353    /// Remove all repair proofs for a key, e.g. after local deletion.
354    pub fn remove_key(&mut self, key: &XorName) {
355        self.proofs_by_key.remove(key);
356    }
357
358    /// Remove all repair proofs for a peer, e.g. after routing-table removal.
359    pub fn remove_peer(&mut self, peer: &PeerId) {
360        self.proofs_by_key.retain(|_, entry| {
361            entry.peer_proofs.remove(peer);
362            !entry.peer_proofs.is_empty()
363        });
364    }
365
366    fn reconcile_key_close_group(&mut self, key: &XorName, current_close_peers: &HashSet<PeerId>) {
367        let should_remove = if let Some(entry) = self.proofs_by_key.get_mut(key) {
368            if entry.close_peers == *current_close_peers {
369                return;
370            }
371
372            entry.close_peers.clone_from(current_close_peers);
373            entry
374                .peer_proofs
375                .retain(|peer, _| current_close_peers.contains(peer));
376            entry.peer_proofs.is_empty()
377        } else {
378            false
379        };
380
381        if should_remove {
382            self.proofs_by_key.remove(key);
383        }
384    }
385}
386
387// ---------------------------------------------------------------------------
388// Neighbor sync cycle state
389// ---------------------------------------------------------------------------
390
391/// Result of observing a peer's bootstrap claim.
392#[derive(Debug, Clone, Copy, PartialEq, Eq)]
393pub enum BootstrapClaimObservation {
394    /// The peer is inside its first and only bootstrap-claim grace window.
395    WithinGrace {
396        /// First time this peer claimed bootstrap status.
397        first_seen: Instant,
398    },
399    /// The peer has continuously claimed bootstrap status past the grace period.
400    PastGrace {
401        /// First time this peer claimed bootstrap status.
402        first_seen: Instant,
403    },
404    /// The peer previously stopped claiming bootstrap and then claimed it again.
405    Repeated {
406        /// First time this peer ever claimed bootstrap status.
407        first_seen: Instant,
408    },
409}
410
411/// Neighbor sync cycle state.
412///
413/// Tracks a deterministic walk through the current close-group snapshot,
414/// per-peer cooldown times, active bootstrap claims, and peers that have already
415/// used their one bootstrap-claim window.
416#[derive(Debug)]
417pub struct NeighborSyncState {
418    /// Deterministic ordering of peers for the current cycle (snapshot).
419    pub order: Vec<PeerId>,
420    /// Current cursor position into `order`.
421    pub cursor: usize,
422    /// Per-peer last successful sync time (for cooldown).
423    pub last_sync_times: HashMap<PeerId, Instant>,
424    /// Active bootstrap claim first-seen timestamps per peer.
425    ///
426    /// Entries are removed when a peer stops claiming bootstrap. The peer
427    /// remains in `bootstrap_claim_history`, so a later claim is repeated-claim
428    /// abuse instead of a fresh grace period.
429    pub bootstrap_claims: HashMap<PeerId, Instant>,
430    /// First-ever bootstrap claim timestamp per peer.
431    ///
432    /// This is retained after active claims are cleared so each peer gets at
433    /// most one bootstrap-claim grace window. Under Sybil attack with many
434    /// distinct peer IDs claiming bootstrap, this map grows unboundedly. In
435    /// practice the trust engine limits Sybil impact before this becomes a
436    /// memory issue.
437    pub bootstrap_claim_history: HashMap<PeerId, Instant>,
438    /// Cursor used by post-cycle pruning to rotate through stored records when
439    /// the per-pass prune-confirmation budget is exhausted.
440    pub prune_cursor: usize,
441}
442
443impl NeighborSyncState {
444    /// Create a new cycle from the given close neighbors.
445    #[must_use]
446    pub fn new_cycle(close_neighbors: Vec<PeerId>) -> Self {
447        Self {
448            order: close_neighbors,
449            cursor: 0,
450            last_sync_times: HashMap::new(),
451            bootstrap_claims: HashMap::new(),
452            bootstrap_claim_history: HashMap::new(),
453            prune_cursor: 0,
454        }
455    }
456
457    /// Observe a peer claiming bootstrap status.
458    ///
459    /// A peer receives one grace window from its first observed bootstrap claim.
460    /// If it later stops claiming bootstrap, callers should clear only the
461    /// active claim with [`Self::clear_active_bootstrap_claim`]. A subsequent
462    /// claim is then reported as [`BootstrapClaimObservation::Repeated`].
463    #[must_use]
464    pub fn observe_bootstrap_claim(
465        &mut self,
466        peer: PeerId,
467        now: Instant,
468        grace_period: Duration,
469    ) -> BootstrapClaimObservation {
470        if let Some(first_seen) = self.bootstrap_claims.get(&peer).copied() {
471            if now.duration_since(first_seen) > grace_period {
472                BootstrapClaimObservation::PastGrace { first_seen }
473            } else {
474                BootstrapClaimObservation::WithinGrace { first_seen }
475            }
476        } else if let Some(first_seen) = self.bootstrap_claim_history.get(&peer).copied() {
477            BootstrapClaimObservation::Repeated { first_seen }
478        } else {
479            self.bootstrap_claims.insert(peer, now);
480            self.bootstrap_claim_history.insert(peer, now);
481            BootstrapClaimObservation::WithinGrace { first_seen: now }
482        }
483    }
484
485    /// Clear the active bootstrap claim for a peer, retaining claim history.
486    pub fn clear_active_bootstrap_claim(&mut self, peer: &PeerId) -> bool {
487        self.bootstrap_claims.remove(peer).is_some()
488    }
489
490    /// Whether the current cycle is complete.
491    #[must_use]
492    pub fn is_cycle_complete(&self) -> bool {
493        self.cursor >= self.order.len()
494    }
495}
496
497// ---------------------------------------------------------------------------
498// Bootstrap drain state (Section 16)
499// ---------------------------------------------------------------------------
500
501/// Bootstrap drain state tracking (Section 16).
502#[derive(Debug)]
503pub struct BootstrapState {
504    /// Whether bootstrap is complete (all peer requests done, queues empty).
505    pub drained: bool,
506    /// Number of bootstrap peer requests still pending.
507    pub pending_peer_requests: usize,
508    /// Keys discovered during bootstrap that are still in the verification /
509    /// fetch pipeline.
510    pub pending_keys: HashSet<XorName>,
511    /// Peers whose last bootstrap admission cycle had one or more hints
512    /// silently dropped at the `pending_verify` capacity bounds. Each entry
513    /// represents "this source still owes us at least one re-hinted key
514    /// after the queues drain". `check_bootstrap_drained` refuses to claim
515    /// the node fully drained while this set is non-empty: a source's
516    /// presence is cleared by its next admission cycle that completes with
517    /// zero capacity rejections (i.e. the source successfully re-delivered
518    /// everything that previously overflowed). Tracking per-source instead
519    /// of a global counter prevents one peer's rejection from being
520    /// "cleared" by an unrelated peer's clean cycle.
521    pub capacity_rejected_sources: HashSet<PeerId>,
522}
523
524impl BootstrapState {
525    /// Create initial bootstrap state.
526    #[must_use]
527    pub fn new() -> Self {
528        Self {
529            drained: false,
530            pending_peer_requests: 0,
531            pending_keys: HashSet::new(),
532            capacity_rejected_sources: HashSet::new(),
533        }
534    }
535
536    /// Check if bootstrap is drained.
537    ///
538    /// Only returns `true` after [`super::bootstrap::check_bootstrap_drained`] or
539    /// [`super::bootstrap::mark_bootstrap_drained`] has explicitly set the flag. A fresh
540    /// `BootstrapState` is NOT drained — the audit loop must wait until
541    /// bootstrap work has actually completed (Invariant 19).
542    #[must_use]
543    pub fn is_drained(&self) -> bool {
544        self.drained
545    }
546
547    /// Remove a key from the bootstrap pending set.
548    ///
549    /// Called when a key terminally leaves the verification/fetch pipeline
550    /// (stored, abandoned, quorum failed, etc.) so the drain check set
551    /// shrinks incrementally rather than being re-scanned in full.
552    pub fn remove_key(&mut self, key: &XorName) {
553        self.pending_keys.remove(key);
554    }
555}
556
557impl Default for BootstrapState {
558    fn default() -> Self {
559        Self::new()
560    }
561}
562
563// ---------------------------------------------------------------------------
564// Tests
565// ---------------------------------------------------------------------------
566
567#[cfg(test)]
568mod tests {
569    use std::collections::BinaryHeap;
570
571    use super::*;
572
573    /// Helper: build a `PeerId` from a single byte (zero-padded to 32 bytes).
574    fn peer_id_from_byte(b: u8) -> PeerId {
575        let mut bytes = [0u8; 32];
576        bytes[0] = b;
577        PeerId::from_bytes(bytes)
578    }
579
580    // -- FetchCandidate ordering -------------------------------------------
581
582    #[test]
583    fn fetch_candidate_nearest_key_has_highest_priority() {
584        let near = FetchCandidate {
585            key: [1u8; 32],
586            distance: [
587                0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
588                0, 0, 0, 0,
589            ],
590            sources: vec![peer_id_from_byte(1)],
591        };
592
593        let far = FetchCandidate {
594            key: [2u8; 32],
595            distance: [
596                0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
597                0, 0, 0, 0, 0,
598            ],
599            sources: vec![peer_id_from_byte(2)],
600        };
601
602        // In a max-heap the "greatest" element pops first.
603        // Our reversed Ord makes smaller-distance candidates greater.
604        assert!(near > far, "nearer candidate should compare greater");
605
606        let mut heap = BinaryHeap::new();
607        heap.push(far.clone());
608        heap.push(near.clone());
609
610        assert_eq!(heap.len(), 2, "heap should contain both candidates");
611
612        let first = heap.pop();
613        assert!(first.is_some(), "first pop should succeed");
614        assert_eq!(
615            first.map(|c| c.key),
616            Some(near.key),
617            "nearest key should pop first"
618        );
619
620        let second = heap.pop();
621        assert!(second.is_some(), "second pop should succeed");
622        assert_eq!(
623            second.map(|c| c.key),
624            Some(far.key),
625            "farthest key should pop second"
626        );
627    }
628
629    #[test]
630    fn fetch_candidate_same_distance_and_key_is_equal() {
631        let a = FetchCandidate {
632            key: [1u8; 32],
633            distance: [5u8; 32],
634            sources: vec![],
635        };
636
637        let b = FetchCandidate {
638            key: [1u8; 32],
639            distance: [5u8; 32],
640            sources: vec![],
641        };
642
643        assert_eq!(
644            a.cmp(&b),
645            Ordering::Equal,
646            "same distance + same key should yield Equal"
647        );
648        assert_eq!(a, b, "PartialEq must agree with Ord");
649    }
650
651    #[test]
652    fn fetch_candidate_same_distance_different_key_is_deterministic() {
653        let a = FetchCandidate {
654            key: [1u8; 32],
655            distance: [5u8; 32],
656            sources: vec![],
657        };
658
659        let b = FetchCandidate {
660            key: [2u8; 32],
661            distance: [5u8; 32],
662            sources: vec![],
663        };
664
665        assert_ne!(
666            a.cmp(&b),
667            Ordering::Equal,
668            "same distance + different key must not be Equal"
669        );
670        assert_ne!(a, b, "PartialEq must agree with Ord");
671    }
672
673    // -- PeerSyncRecord ----------------------------------------------------
674
675    #[test]
676    fn peer_sync_record_no_sync_yet() {
677        let record = PeerSyncRecord {
678            last_sync: None,
679            cycles_since_sync: 0,
680        };
681        assert!(
682            !record.has_repair_opportunity(),
683            "never-synced peer has no repair opportunity"
684        );
685    }
686
687    #[test]
688    fn peer_sync_record_synced_but_no_cycle() {
689        let record = PeerSyncRecord {
690            last_sync: Some(Instant::now()),
691            cycles_since_sync: 0,
692        };
693        assert!(
694            !record.has_repair_opportunity(),
695            "synced peer with zero subsequent cycles has no repair opportunity"
696        );
697    }
698
699    #[test]
700    fn peer_sync_record_synced_with_cycle() {
701        let record = PeerSyncRecord {
702            last_sync: Some(Instant::now()),
703            cycles_since_sync: 1,
704        };
705        assert!(
706            record.has_repair_opportunity(),
707            "synced peer with >= 1 cycle should have repair opportunity"
708        );
709    }
710
711    #[test]
712    fn peer_sync_record_no_sync_many_cycles() {
713        let record = PeerSyncRecord {
714            last_sync: None,
715            cycles_since_sync: 10,
716        };
717        assert!(
718            !record.has_repair_opportunity(),
719            "never-synced peer has no repair opportunity regardless of cycle count"
720        );
721    }
722
723    // -- RepairProofs --------------------------------------------------------
724
725    #[test]
726    fn repair_proofs_record_sent_hint_for_close_peer() {
727        const HINT_EPOCH: u64 = 7;
728        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
729
730        let key = [0xA1; 32];
731        let peer = peer_id_from_byte(1);
732        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
733        let mut proofs = RepairProofs::new();
734
735        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
736
737        assert!(
738            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
739            "sent hint should make key auditable for that peer"
740        );
741    }
742
743    #[test]
744    fn repair_proofs_reject_peer_outside_current_close_group() {
745        const HINT_EPOCH: u64 = 7;
746        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
747
748        let key = [0xA2; 32];
749        let peer = peer_id_from_byte(1);
750        let close_peers = HashSet::from([peer_id_from_byte(2), peer_id_from_byte(3)]);
751        let mut proofs = RepairProofs::new();
752
753        assert!(!proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
754
755        assert!(
756            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
757            "peers outside current close group must not get repair proof"
758        );
759    }
760
761    #[test]
762    fn repair_proofs_require_later_epoch() {
763        const HINT_EPOCH: u64 = 7;
764        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
765
766        let key = [0xA3; 32];
767        let peer = peer_id_from_byte(1);
768        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
769        let mut proofs = RepairProofs::new();
770
771        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
772
773        assert!(
774            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, HINT_EPOCH),
775            "same-cycle proof should not be audit-eligible"
776        );
777        assert!(
778            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
779            "proof should mature after a later local sync-cycle epoch"
780        );
781    }
782
783    #[test]
784    fn repair_proofs_repeated_hint_does_not_reset_maturity() {
785        const HINT_EPOCH: u64 = 7;
786        const REPEATED_HINT_EPOCH: u64 = HINT_EPOCH + 1;
787
788        let key = [0xA5; 32];
789        let peer = peer_id_from_byte(1);
790        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
791        let mut proofs = RepairProofs::new();
792
793        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
794        assert!(
795            !proofs.record_replica_hint_sent(peer, key, &close_peers, REPEATED_HINT_EPOCH),
796            "duplicate hint in the same close group should keep existing proof"
797        );
798        assert!(
799            proofs.has_mature_replica_hint(&peer, &key, &close_peers, REPEATED_HINT_EPOCH),
800            "duplicate hint must not reset an already mature proof"
801        );
802    }
803
804    #[test]
805    fn repair_proofs_retain_stable_peers_on_close_group_change() {
806        const HINT_EPOCH: u64 = 7;
807        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
808
809        let key = [0xA7; 32];
810        let stable_peer = peer_id_from_byte(1);
811        let departing_peer = peer_id_from_byte(2);
812        let retained_peer = peer_id_from_byte(3);
813        let new_peer = peer_id_from_byte(4);
814        let old_group = HashSet::from([stable_peer, departing_peer, retained_peer]);
815        let changed_group = HashSet::from([stable_peer, retained_peer, new_peer]);
816        let mut proofs = RepairProofs::new();
817
818        assert!(proofs.record_replica_hint_sent(stable_peer, key, &old_group, HINT_EPOCH));
819        assert!(proofs.record_replica_hint_sent(departing_peer, key, &old_group, HINT_EPOCH));
820
821        assert!(
822            proofs.has_mature_replica_hint(&stable_peer, &key, &changed_group, CURRENT_EPOCH),
823            "stable peers should keep mature repair proofs across unrelated close-group churn"
824        );
825        assert!(
826            !proofs.has_mature_replica_hint(&departing_peer, &key, &changed_group, CURRENT_EPOCH),
827            "peers that left the close group should lose repair proofs"
828        );
829        assert!(
830            !proofs.has_mature_replica_hint(&new_peer, &key, &changed_group, CURRENT_EPOCH),
831            "new close-group peers need their own repair hint before auditing"
832        );
833    }
834
835    #[test]
836    fn repair_proofs_evicted_peer_reentry_requires_fresh_hint() {
837        const FIRST_HINT_EPOCH: u64 = 7;
838        const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1;
839        const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1;
840
841        let key = [0xA3; 32];
842        let returning_peer = peer_id_from_byte(1);
843        let new_peer = peer_id_from_byte(4);
844        let old_group = HashSet::from([returning_peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
845        let changed_group = HashSet::from([new_peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
846        let mut proofs = RepairProofs::new();
847
848        assert!(proofs.record_replica_hint_sent(returning_peer, key, &old_group, FIRST_HINT_EPOCH,));
849
850        assert!(
851            !proofs.has_mature_replica_hint(&new_peer, &key, &changed_group, SECOND_HINT_EPOCH),
852            "new close-group peer should not inherit another peer's repair proof"
853        );
854        assert!(
855            !proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH),
856            "a peer that re-enters must receive a fresh repair hint"
857        );
858
859        assert!(proofs.record_replica_hint_sent(
860            returning_peer,
861            key,
862            &old_group,
863            SECOND_HINT_EPOCH,
864        ));
865        assert!(
866            proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH),
867            "fresh repair hint after re-entry should be eligible once mature"
868        );
869    }
870
871    #[test]
872    fn repair_proofs_remove_peer_requires_fresh_hint_after_reentry() {
873        const FIRST_HINT_EPOCH: u64 = 7;
874        const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1;
875        const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1;
876
877        let key = [0xA6; 32];
878        let peer = peer_id_from_byte(1);
879        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
880        let mut proofs = RepairProofs::new();
881
882        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, FIRST_HINT_EPOCH));
883        proofs.remove_peer(&peer);
884
885        assert!(
886            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
887            "routing-table removal should clear proof even if peer re-enters same close group"
888        );
889
890        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, SECOND_HINT_EPOCH));
891        assert!(
892            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
893            "fresh hint after re-entry should become eligible after a later epoch"
894        );
895    }
896
897    #[test]
898    fn repair_proofs_remove_key_clears_all_peer_entries() {
899        const HINT_EPOCH: u64 = 7;
900        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
901
902        let key = [0xA4; 32];
903        let peer = peer_id_from_byte(1);
904        let close_peers = HashSet::from([peer]);
905        let mut proofs = RepairProofs::new();
906
907        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
908        proofs.remove_key(&key);
909
910        assert!(
911            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
912            "deleted local key should not retain repair proof entries"
913        );
914    }
915
916    // -- NeighborSyncState -------------------------------------------------
917
918    #[test]
919    fn neighbor_sync_empty_cycle_is_immediately_complete() {
920        let state = NeighborSyncState::new_cycle(vec![]);
921        assert!(
922            state.is_cycle_complete(),
923            "empty neighbor list means cycle is complete"
924        );
925    }
926
927    #[test]
928    fn neighbor_sync_new_cycle_not_complete() {
929        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
930        let state = NeighborSyncState::new_cycle(peers);
931        assert!(
932            !state.is_cycle_complete(),
933            "fresh cycle with peers should not be complete"
934        );
935    }
936
937    #[test]
938    fn neighbor_sync_cycle_completes_when_cursor_reaches_end() {
939        let peers = vec![
940            peer_id_from_byte(1),
941            peer_id_from_byte(2),
942            peer_id_from_byte(3),
943        ];
944        let mut state = NeighborSyncState::new_cycle(peers);
945
946        // Simulate stepping through the cycle.
947        state.cursor = 2;
948        assert!(
949            !state.is_cycle_complete(),
950            "cursor at len-1 should not be complete"
951        );
952
953        state.cursor = 3;
954        assert!(
955            state.is_cycle_complete(),
956            "cursor at len should be complete"
957        );
958    }
959
960    #[test]
961    fn neighbor_sync_cursor_past_end_is_still_complete() {
962        let peers = vec![peer_id_from_byte(1)];
963        let mut state = NeighborSyncState::new_cycle(peers);
964        state.cursor = 5;
965        assert!(
966            state.is_cycle_complete(),
967            "cursor past end should still report complete"
968        );
969    }
970
971    #[test]
972    fn bootstrap_claim_history_prevents_second_grace_window() {
973        let peer = peer_id_from_byte(9);
974        let mut state = NeighborSyncState::new_cycle(vec![peer]);
975        let first_seen = Instant::now();
976        let grace = Duration::from_secs(60);
977
978        assert_eq!(
979            state.observe_bootstrap_claim(peer, first_seen, grace),
980            BootstrapClaimObservation::WithinGrace { first_seen }
981        );
982        assert!(state.clear_active_bootstrap_claim(&peer));
983        assert!(!state.bootstrap_claims.contains_key(&peer));
984        assert!(state.bootstrap_claim_history.contains_key(&peer));
985
986        assert_eq!(
987            state.observe_bootstrap_claim(peer, first_seen + Duration::from_secs(1), grace),
988            BootstrapClaimObservation::Repeated { first_seen }
989        );
990        assert!(
991            !state.bootstrap_claims.contains_key(&peer),
992            "repeated claims must not recreate an active grace window"
993        );
994        assert_eq!(
995            state.observe_bootstrap_claim(peer, first_seen + Duration::from_secs(2), grace),
996            BootstrapClaimObservation::Repeated { first_seen }
997        );
998    }
999
1000    #[test]
1001    fn bootstrap_claim_active_window_reports_past_grace() {
1002        let peer = peer_id_from_byte(10);
1003        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1004        let first_seen = Instant::now();
1005        let grace = Duration::from_secs(60);
1006
1007        let _ = state.observe_bootstrap_claim(peer, first_seen, grace);
1008
1009        assert_eq!(
1010            state.observe_bootstrap_claim(peer, first_seen + grace + Duration::from_secs(1), grace),
1011            BootstrapClaimObservation::PastGrace { first_seen }
1012        );
1013    }
1014
1015    // -- BootstrapState ----------------------------------------------------
1016
1017    #[test]
1018    fn bootstrap_state_initial_not_drained() {
1019        // A freshly created state must NOT report drained — the bootstrap
1020        // sync task has not started yet (Invariant 19 race prevention).
1021        let state = BootstrapState::new();
1022        assert!(
1023            !state.is_drained(),
1024            "initial state must not be drained before bootstrap begins"
1025        );
1026    }
1027
1028    #[test]
1029    fn bootstrap_state_pending_requests_block_drain() {
1030        let mut state = BootstrapState::new();
1031        state.pending_peer_requests = 3;
1032        assert!(
1033            !state.is_drained(),
1034            "pending peer requests should block drain"
1035        );
1036    }
1037
1038    #[test]
1039    fn bootstrap_state_pending_keys_block_drain() {
1040        let mut state = BootstrapState::new();
1041        state.pending_keys.insert([42u8; 32]);
1042        assert!(!state.is_drained(), "pending keys should block drain");
1043    }
1044
1045    #[test]
1046    fn bootstrap_state_explicit_drained_overrides() {
1047        let mut state = BootstrapState::new();
1048        state.pending_peer_requests = 5;
1049        state.pending_keys.insert([99u8; 32]);
1050        state.drained = true;
1051        assert!(
1052            state.is_drained(),
1053            "explicit drained flag should override pending counts"
1054        );
1055    }
1056
1057    #[test]
1058    fn bootstrap_state_requires_explicit_drain() {
1059        let mut state = BootstrapState::new();
1060        state.pending_peer_requests = 2;
1061        state.pending_keys.insert([1u8; 32]);
1062
1063        // Simulate completing work — but without explicit drain flag.
1064        state.pending_peer_requests = 0;
1065        state.pending_keys.clear();
1066
1067        assert!(
1068            !state.is_drained(),
1069            "clearing counters alone must not drain — requires check_bootstrap_drained"
1070        );
1071
1072        // Explicit drain (set by check_bootstrap_drained or mark_bootstrap_drained).
1073        state.drained = true;
1074        assert!(state.is_drained(), "explicit flag should drain");
1075    }
1076
1077    #[test]
1078    fn bootstrap_state_default_matches_new() {
1079        let from_new = BootstrapState::new();
1080        let from_default = BootstrapState::default();
1081
1082        assert_eq!(from_new.drained, from_default.drained);
1083        assert_eq!(
1084            from_new.pending_peer_requests,
1085            from_default.pending_peer_requests
1086        );
1087        assert_eq!(from_new.pending_keys, from_default.pending_keys);
1088    }
1089
1090    // -- Scenario tests -------------------------------------------------------
1091
1092    /// #13: Bootstrap not drained while `pending_keys` overlap with the
1093    /// pipeline. Keys must be removed from `pending_keys` for drain to occur.
1094    #[test]
1095    fn bootstrap_drain_requires_empty_pending_keys() {
1096        let key_a: XorName = [0xA0; 32];
1097        let key_b: XorName = [0xB0; 32];
1098        let key_c: XorName = [0xC0; 32];
1099
1100        let mut state = BootstrapState::new();
1101        state.pending_peer_requests = 0; // requests already done
1102        state.pending_keys = std::iter::once(key_a)
1103            .chain(std::iter::once(key_b))
1104            .chain(std::iter::once(key_c))
1105            .collect();
1106
1107        assert!(
1108            !state.is_drained(),
1109            "should NOT be drained while pending_keys still has entries"
1110        );
1111
1112        // Simulate pipeline processing — remove one key at a time.
1113        state.pending_keys.remove(&key_a);
1114        assert!(!state.is_drained(), "still not drained with 2 pending keys");
1115
1116        state.pending_keys.remove(&key_b);
1117        assert!(!state.is_drained(), "still not drained with 1 pending key");
1118
1119        state.pending_keys.remove(&key_c);
1120        assert!(
1121            !state.is_drained(),
1122            "removing all keys is necessary but not sufficient — needs explicit drain"
1123        );
1124
1125        // Simulate check_bootstrap_drained setting the flag.
1126        state.drained = true;
1127        assert!(state.is_drained(), "explicit drain flag should finalize");
1128    }
1129
1130    /// Verify that the FSM terminal states are distinguishable and document
1131    /// which variants are logically terminal (no outgoing transitions).
1132    #[test]
1133    fn verification_state_terminal_variants() {
1134        let terminal_states = [
1135            VerificationState::QuorumAbandoned,
1136            VerificationState::FetchAbandoned,
1137            VerificationState::Stored,
1138            VerificationState::Idle,
1139        ];
1140
1141        // All terminal states must be distinct from each other.
1142        for (i, a) in terminal_states.iter().enumerate() {
1143            for (j, b) in terminal_states.iter().enumerate() {
1144                if i != j {
1145                    assert_ne!(
1146                        a, b,
1147                        "terminal states at indices {i} and {j} must be distinct"
1148                    );
1149                }
1150            }
1151        }
1152
1153        // Terminal states must be distinct from all non-terminal states.
1154        let non_terminal_states = [
1155            VerificationState::OfferReceived,
1156            VerificationState::PendingVerify,
1157            VerificationState::QuorumVerified,
1158            VerificationState::PaidListVerified,
1159            VerificationState::QueuedForFetch,
1160            VerificationState::Fetching,
1161            VerificationState::FetchRetryable,
1162            VerificationState::QuorumFailed,
1163            VerificationState::QuorumInconclusive,
1164        ];
1165
1166        for terminal in &terminal_states {
1167            for non_terminal in &non_terminal_states {
1168                assert_ne!(
1169                    terminal, non_terminal,
1170                    "terminal state {terminal:?} must not equal non-terminal state {non_terminal:?}"
1171                );
1172            }
1173        }
1174    }
1175
1176    /// `has_repair_opportunity` requires BOTH a previous sync AND at least
1177    /// one subsequent cycle.
1178    #[test]
1179    fn repair_opportunity_requires_both_sync_and_cycle() {
1180        // last_sync = Some, cycles_since_sync = 0 → false (synced but no cycle yet)
1181        let synced_no_cycle = PeerSyncRecord {
1182            last_sync: Some(
1183                Instant::now()
1184                    .checked_sub(std::time::Duration::from_secs(2))
1185                    .unwrap_or_else(Instant::now),
1186            ),
1187            cycles_since_sync: 0,
1188        };
1189        assert!(
1190            !synced_no_cycle.has_repair_opportunity(),
1191            "synced with zero subsequent cycles should NOT have repair opportunity"
1192        );
1193
1194        // last_sync = None, cycles_since_sync = 5 → false (never synced)
1195        let never_synced = PeerSyncRecord {
1196            last_sync: None,
1197            cycles_since_sync: 5,
1198        };
1199        assert!(
1200            !never_synced.has_repair_opportunity(),
1201            "never-synced peer should NOT have repair opportunity regardless of cycles"
1202        );
1203
1204        // last_sync = Some, cycles_since_sync = 1 → true
1205        let ready = PeerSyncRecord {
1206            last_sync: Some(
1207                Instant::now()
1208                    .checked_sub(std::time::Duration::from_secs(5))
1209                    .unwrap_or_else(Instant::now),
1210            ),
1211            cycles_since_sync: 1,
1212        };
1213        assert!(
1214            ready.has_repair_opportunity(),
1215            "synced peer with >= 1 cycle SHOULD have repair opportunity"
1216        );
1217    }
1218}