Skip to main content

ant_node/replication/
types.rs

1//! Core types for the replication subsystem.
2//!
3//! These types represent the state machine states, queue entries, and domain
4//! concepts from the Kademlia-style replication design (see
5//! `docs/REPLICATION_DESIGN.md`).
6
7use std::cmp::Ordering;
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::time::{Duration, Instant};
10
11use serde::{Deserialize, Serialize};
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::REPAIR_HINT_MIN_AGE;
15use saorsa_core::identity::PeerId;
16
17// ---------------------------------------------------------------------------
18// Verification state machine (Section 8 of REPLICATION_DESIGN.md)
19// ---------------------------------------------------------------------------
20
21/// Verification state machine.
22///
23/// Each unknown key transitions through these states exactly once per offer
24/// lifecycle.  See Section 8 of `REPLICATION_DESIGN.md` for the full
25/// state-transition diagram.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum VerificationState {
28    /// Offer received, not yet processed.
29    OfferReceived,
30    /// Passed admission filter, awaiting quorum / paid-list verification.
31    PendingVerify,
32    /// Presence quorum passed (>= `QuorumNeeded` positives from
33    /// `QuorumTargets`).
34    QuorumVerified,
35    /// Paid-list authorisation succeeded (>= `ConfirmNeeded` confirmations or
36    /// derived from replica majority).
37    PaidListVerified,
38    /// Queued for record fetch.
39    QueuedForFetch,
40    /// Actively fetching from a verified source.
41    Fetching,
42    /// Successfully stored locally.
43    Stored,
44    /// Fetch failed but retryable (alternate sources remain).
45    FetchRetryable,
46    /// Fetch permanently abandoned (terminal failure or no alternate sources).
47    FetchAbandoned,
48    /// Quorum failed definitively (both paid-list and presence impossible this
49    /// round).
50    QuorumFailed,
51    /// Quorum inconclusive (timeout with neither success nor fail-fast).
52    QuorumInconclusive,
53    /// Terminal: quorum abandoned, key forgotten.
54    QuorumAbandoned,
55    /// Terminal: key returned to idle (forgotten, requires new offer to
56    /// re-enter).
57    Idle,
58}
59
60// ---------------------------------------------------------------------------
61// Hint pipeline classification
62// ---------------------------------------------------------------------------
63
64/// Whether a key was admitted via replica hints or paid hints only.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
66pub enum HintPipeline {
67    /// Key is in the admitted replica-hint pipeline (fetch-eligible).
68    Replica,
69    /// Key is in the paid-hint-only pipeline (`PaidForList` update only, no
70    /// fetch).
71    PaidOnly,
72}
73
74// ---------------------------------------------------------------------------
75// Pending-verification table entry
76// ---------------------------------------------------------------------------
77
78/// Entry in the pending-verification table.
79///
80/// Tracks a single key through the verification FSM, recording which peers
81/// responded and which have been tried for fetch.
82#[derive(Debug, Clone)]
83pub struct VerificationEntry {
84    /// Current state in the verification FSM.
85    pub state: VerificationState,
86    /// Which pipeline admitted this key.
87    pub pipeline: HintPipeline,
88    /// Peers that responded `Present` during verification (verified fetch
89    /// sources).
90    pub verified_sources: Vec<PeerId>,
91    /// Peers already tried for fetch (to avoid retrying the same source).
92    pub tried_sources: HashSet<PeerId>,
93    /// When this entry was created.
94    pub created_at: Instant,
95    /// The peer that originally hinted this key (for source tracking).
96    pub hint_sender: PeerId,
97}
98
99// ---------------------------------------------------------------------------
100// Fetch queue candidate
101// ---------------------------------------------------------------------------
102
103/// A candidate queued for fetch, ordered by relevance (nearest-first).
104///
105/// Implements [`Ord`] with *reversed* distance comparison so that a
106/// [`BinaryHeap`](std::collections::BinaryHeap) (max-heap) dequeues the
107/// nearest key first.
108#[derive(Debug, Clone)]
109pub struct FetchCandidate {
110    /// The key to fetch.
111    pub key: XorName,
112    /// XOR distance from self to key (for priority ordering).
113    pub distance: XorName,
114    /// Verified source peers that responded `Present`.
115    pub sources: Vec<PeerId>,
116}
117
118impl Eq for FetchCandidate {}
119
120impl PartialEq for FetchCandidate {
121    fn eq(&self, other: &Self) -> bool {
122        self.distance == other.distance && self.key == other.key
123    }
124}
125
126impl Ord for FetchCandidate {
127    fn cmp(&self, other: &Self) -> Ordering {
128        // Reverse ordering: smaller distance = higher priority (BinaryHeap is
129        // max-heap).  Tie-break on key for consistency with PartialEq.
130        other
131            .distance
132            .cmp(&self.distance)
133            .then_with(|| self.key.cmp(&other.key))
134    }
135}
136
137impl PartialOrd for FetchCandidate {
138    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139        Some(self.cmp(other))
140    }
141}
142
143// ---------------------------------------------------------------------------
144// Verification evidence types
145// ---------------------------------------------------------------------------
146
147/// Per-key presence evidence from a verification round.
148#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
149pub enum PresenceEvidence {
150    /// Peer holds the record.
151    Present,
152    /// Peer does not hold the record.
153    Absent,
154    /// Peer did not respond in time (neutral, not negative).
155    Unresolved,
156}
157
158/// Per-key paid-list evidence from a verification round.
159#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
160pub enum PaidListEvidence {
161    /// Peer confirms key is in its `PaidForList`.
162    Confirmed,
163    /// Peer says key is NOT in its `PaidForList`.
164    NotFound,
165    /// Peer did not respond in time (neutral).
166    Unresolved,
167}
168
169/// Aggregated verification evidence for a single key from one verification
170/// round.
171#[derive(Debug, Clone)]
172pub struct KeyVerificationEvidence {
173    /// Presence evidence per peer (from `QuorumTargets`).
174    pub presence: HashMap<PeerId, PresenceEvidence>,
175    /// Paid-list evidence per peer (from `PaidTargets`).
176    pub paid_list: HashMap<PeerId, PaidListEvidence>,
177}
178
179// ---------------------------------------------------------------------------
180// Failure evidence (Section 14 — TrustEngine integration)
181// ---------------------------------------------------------------------------
182
183/// Counts that describe a confirmed audit failure without logging per-key
184/// detail at `ERROR` level.
185#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
186pub struct AuditFailureSummary {
187    /// Number of keys in the original audit challenge.
188    pub challenged_keys: usize,
189    /// Number of keys that still failed after responsibility confirmation.
190    pub failed_keys: usize,
191    /// Confirmed failures where the responder returned the absent-key sentinel.
192    pub absent_keys: usize,
193    /// Confirmed failures where the responder returned a digest that did not
194    /// match the challenger's local bytes.
195    pub digest_mismatch_keys: usize,
196}
197
198/// Failure evidence types emitted to `TrustEngine` (Section 14).
199#[derive(Debug, Clone)]
200pub enum FailureEvidence {
201    /// Failed fetch attempt from a source peer.
202    ReplicationFailure {
203        /// The peer that failed to serve the record.
204        peer: PeerId,
205        /// The key that could not be fetched.
206        key: XorName,
207    },
208    /// Audit failure with confirmed responsible keys.
209    AuditFailure {
210        /// Unique identifier for the audit challenge.
211        challenge_id: u64,
212        /// The peer that was challenged.
213        challenged_peer: PeerId,
214        /// Keys confirmed as failed.
215        confirmed_failed_keys: Vec<XorName>,
216        /// Aggregated reason counts for the confirmed failures.
217        summary: AuditFailureSummary,
218        /// Why the audit failed.
219        reason: AuditFailureReason,
220    },
221    /// Peer claiming bootstrap past grace period.
222    BootstrapClaimAbuse {
223        /// The offending peer.
224        peer: PeerId,
225        /// When this peer was first seen.
226        first_seen: Instant,
227    },
228}
229
230/// Reason for audit failure.
231#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
232pub enum AuditFailureReason {
233    /// Peer timed out (no response within deadline).
234    Timeout,
235    /// Response was malformed.
236    MalformedResponse,
237    /// One or more per-key digest mismatches.
238    DigestMismatch,
239    /// Key was absent (signalled by sentinel digest).
240    KeyAbsent,
241    /// Peer explicitly rejected the audit challenge.
242    Rejected,
243}
244
245// ---------------------------------------------------------------------------
246// Peer sync tracking
247// ---------------------------------------------------------------------------
248
249/// Record of sync history with a peer, for `RepairOpportunity` tracking.
250#[derive(Debug, Clone)]
251pub struct PeerSyncRecord {
252    /// Last time we successfully synced with this peer.
253    pub last_sync: Option<Instant>,
254    /// Number of full neighbor-sync cycles completed since last sync with this
255    /// peer.
256    pub cycles_since_sync: u32,
257}
258
259impl PeerSyncRecord {
260    /// Whether this peer has had a repair opportunity (synced at least once
261    /// and at least one subsequent cycle has completed).
262    #[must_use]
263    pub fn has_repair_opportunity(&self) -> bool {
264        self.last_sync.is_some() && self.cycles_since_sync >= 1
265    }
266}
267
268// ---------------------------------------------------------------------------
269// Repair proof tracking
270// ---------------------------------------------------------------------------
271
272/// Evidence that this node has sent a replica repair hint for a key to a peer.
273#[derive(Debug, Clone)]
274struct RepairProof {
275    /// Local neighbor-sync cycle epoch when the hint was sent.
276    hinted_at_epoch: u64,
277    /// Monotonic local time when the hint was sent.
278    hinted_at: Instant,
279}
280
281/// Repair proofs for one key, tied to the close-group snapshot they were
282/// recorded against.
283#[derive(Debug, Clone)]
284struct RepairProofEntry {
285    /// Self-inclusive close group observed when these proofs were recorded.
286    close_peers: HashSet<PeerId>,
287    /// Per-peer proof metadata for peers in `close_peers`.
288    peer_proofs: HashMap<PeerId, RepairProof>,
289}
290
291impl RepairProofEntry {
292    fn new(close_peers: HashSet<PeerId>) -> Self {
293        Self {
294            close_peers,
295            peer_proofs: HashMap::new(),
296        }
297    }
298}
299
300/// Evidence that this node has sent replica repair hints for local keys.
301///
302/// The map is keyed by record key so each key retains only one close-group
303/// snapshot and at most that snapshot's peers. This bounds memory by local key
304/// count times the replication close-group size rather than by churn history.
305#[derive(Debug, Clone, Default)]
306pub struct RepairProofs {
307    /// Key-scoped repair proofs.
308    proofs_by_key: HashMap<XorName, RepairProofEntry>,
309}
310
311impl RepairProofs {
312    /// Create an empty repair-proof table.
313    #[must_use]
314    pub fn new() -> Self {
315        Self::default()
316    }
317
318    /// Record that `peer` was sent a replica repair hint for `key`.
319    ///
320    /// `current_close_peers` must be the current self-inclusive close group for
321    /// `key`. If that close group differs from the previous proof snapshot,
322    /// proofs for peers that left the close group are invalidated before
323    /// recording. Stable peers keep their proofs, while a peer that leaves and
324    /// later re-enters still needs a fresh hint.
325    pub fn record_replica_hint_sent(
326        &mut self,
327        peer: PeerId,
328        key: XorName,
329        current_close_peers: &HashSet<PeerId>,
330        hinted_at_epoch: u64,
331    ) -> bool {
332        self.insert_replica_hint_sent(
333            peer,
334            key,
335            current_close_peers,
336            hinted_at_epoch,
337            Instant::now(),
338        )
339    }
340
341    /// Record that `peer` was sent a replica repair hint at a caller-provided
342    /// time.
343    ///
344    /// This is exposed only for deterministic tests and test harnesses. Normal
345    /// production callers use [`Self::record_replica_hint_sent`] so the proof
346    /// timestamp is captured internally at send-recording time.
347    #[cfg(any(test, feature = "test-utils"))]
348    pub fn record_replica_hint_sent_at(
349        &mut self,
350        peer: PeerId,
351        key: XorName,
352        current_close_peers: &HashSet<PeerId>,
353        hinted_at_epoch: u64,
354        hinted_at: Instant,
355    ) -> bool {
356        self.insert_replica_hint_sent(peer, key, current_close_peers, hinted_at_epoch, hinted_at)
357    }
358
359    fn insert_replica_hint_sent(
360        &mut self,
361        peer: PeerId,
362        key: XorName,
363        current_close_peers: &HashSet<PeerId>,
364        hinted_at_epoch: u64,
365        hinted_at: Instant,
366    ) -> bool {
367        self.reconcile_key_close_group(&key, current_close_peers);
368
369        if !current_close_peers.contains(&peer) {
370            return false;
371        }
372
373        let entry = self
374            .proofs_by_key
375            .entry(key)
376            .or_insert_with(|| RepairProofEntry::new(current_close_peers.clone()));
377
378        if entry.peer_proofs.contains_key(&peer) {
379            return false;
380        }
381
382        entry.peer_proofs.insert(
383            peer,
384            RepairProof {
385                hinted_at_epoch,
386                hinted_at,
387            },
388        );
389        true
390    }
391
392    /// Whether this node has mature repair-hint evidence for `(peer, key)`.
393    ///
394    /// The check invalidates proofs for peers that have left the current
395    /// self-inclusive close group. A proof is mature only after at least one
396    /// later local sync-cycle epoch and the repair hint is at least
397    /// [`REPAIR_HINT_MIN_AGE`] old.
398    pub fn has_mature_replica_hint(
399        &mut self,
400        peer: &PeerId,
401        key: &XorName,
402        current_close_peers: &HashSet<PeerId>,
403        current_epoch: u64,
404        now: Instant,
405    ) -> bool {
406        self.reconcile_key_close_group(key, current_close_peers);
407
408        self.proofs_by_key
409            .get(key)
410            .and_then(|entry| entry.peer_proofs.get(peer))
411            .is_some_and(|proof| {
412                proof.hinted_at_epoch < current_epoch
413                    && now.saturating_duration_since(proof.hinted_at) >= REPAIR_HINT_MIN_AGE
414            })
415    }
416
417    /// Remove all repair proofs for a key, e.g. after local deletion.
418    pub fn remove_key(&mut self, key: &XorName) {
419        self.proofs_by_key.remove(key);
420    }
421
422    /// Remove all repair proofs for a peer, e.g. after routing-table removal.
423    pub fn remove_peer(&mut self, peer: &PeerId) {
424        self.proofs_by_key.retain(|_, entry| {
425            entry.peer_proofs.remove(peer);
426            !entry.peer_proofs.is_empty()
427        });
428    }
429
430    fn reconcile_key_close_group(&mut self, key: &XorName, current_close_peers: &HashSet<PeerId>) {
431        let should_remove = if let Some(entry) = self.proofs_by_key.get_mut(key) {
432            if entry.close_peers == *current_close_peers {
433                return;
434            }
435
436            entry.close_peers.clone_from(current_close_peers);
437            entry
438                .peer_proofs
439                .retain(|peer, _| current_close_peers.contains(peer));
440            entry.peer_proofs.is_empty()
441        } else {
442            false
443        };
444
445        if should_remove {
446            self.proofs_by_key.remove(key);
447        }
448    }
449}
450
451// ---------------------------------------------------------------------------
452// Neighbor sync cycle state
453// ---------------------------------------------------------------------------
454
455/// Result of observing a peer's bootstrap claim.
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum BootstrapClaimObservation {
458    /// The peer is inside its first and only bootstrap-claim grace window.
459    WithinGrace {
460        /// First time this peer claimed bootstrap status.
461        first_seen: Instant,
462    },
463    /// The peer has continuously claimed bootstrap status past the grace period.
464    PastGrace {
465        /// First time this peer claimed bootstrap status.
466        first_seen: Instant,
467    },
468    /// The peer previously stopped claiming bootstrap and then claimed it again.
469    Repeated {
470        /// First time this peer ever claimed bootstrap status.
471        first_seen: Instant,
472    },
473}
474
475/// Neighbor sync cycle state.
476///
477/// Tracks a deterministic walk through the current close-group snapshot,
478/// per-peer cooldown times, active bootstrap claims, and peers that have already
479/// used their one bootstrap-claim window.
480#[derive(Debug)]
481pub struct NeighborSyncState {
482    /// Newly-entered close peers to sync before the normal cycle cursor.
483    ///
484    /// This queue is populated from `KClosestPeersChanged` routing-table
485    /// events and is intentionally separate from `order`: priority syncs do
486    /// not restart the current round-robin cycle or discard already-scanned
487    /// peers.
488    pub priority_order: VecDeque<PeerId>,
489    /// Deterministic ordering of peers for the current cycle (snapshot).
490    pub order: Vec<PeerId>,
491    /// Current cursor position into `order`.
492    pub cursor: usize,
493    /// Per-peer last successful sync time (for cooldown).
494    pub last_sync_times: HashMap<PeerId, Instant>,
495    /// Active bootstrap claim first-seen timestamps per peer.
496    ///
497    /// Entries are removed when a peer stops claiming bootstrap. The peer
498    /// remains in `bootstrap_claim_history`, so a later claim is repeated-claim
499    /// abuse instead of a fresh grace period.
500    pub bootstrap_claims: HashMap<PeerId, Instant>,
501    /// First-ever bootstrap claim timestamp per peer.
502    ///
503    /// This is retained after active claims are cleared so each peer gets at
504    /// most one bootstrap-claim grace window. Under Sybil attack with many
505    /// distinct peer IDs claiming bootstrap, this map grows unboundedly. In
506    /// practice the trust engine limits Sybil impact before this becomes a
507    /// memory issue.
508    pub bootstrap_claim_history: HashMap<PeerId, Instant>,
509    /// Cursor used by post-cycle pruning to rotate through stored records when
510    /// the per-pass prune-confirmation budget is exhausted.
511    pub prune_cursor: usize,
512}
513
514impl NeighborSyncState {
515    /// Create a new cycle from the given close neighbors.
516    #[must_use]
517    pub fn new_cycle(close_neighbors: Vec<PeerId>) -> Self {
518        Self {
519            priority_order: VecDeque::new(),
520            order: close_neighbors,
521            cursor: 0,
522            last_sync_times: HashMap::new(),
523            bootstrap_claims: HashMap::new(),
524            bootstrap_claim_history: HashMap::new(),
525            prune_cursor: 0,
526        }
527    }
528
529    /// Observe a peer claiming bootstrap status.
530    ///
531    /// A peer receives one grace window from its first observed bootstrap claim.
532    /// If it later stops claiming bootstrap, callers should clear only the
533    /// active claim with [`Self::clear_active_bootstrap_claim`]. A subsequent
534    /// claim is then reported as [`BootstrapClaimObservation::Repeated`].
535    #[must_use]
536    pub fn observe_bootstrap_claim(
537        &mut self,
538        peer: PeerId,
539        now: Instant,
540        grace_period: Duration,
541    ) -> BootstrapClaimObservation {
542        if let Some(first_seen) = self.bootstrap_claims.get(&peer).copied() {
543            if now.duration_since(first_seen) > grace_period {
544                BootstrapClaimObservation::PastGrace { first_seen }
545            } else {
546                BootstrapClaimObservation::WithinGrace { first_seen }
547            }
548        } else if let Some(first_seen) = self.bootstrap_claim_history.get(&peer).copied() {
549            BootstrapClaimObservation::Repeated { first_seen }
550        } else {
551            self.bootstrap_claims.insert(peer, now);
552            self.bootstrap_claim_history.insert(peer, now);
553            BootstrapClaimObservation::WithinGrace { first_seen: now }
554        }
555    }
556
557    /// Clear the active bootstrap claim for a peer, retaining claim history.
558    pub fn clear_active_bootstrap_claim(&mut self, peer: &PeerId) -> bool {
559        self.bootstrap_claims.remove(peer).is_some()
560    }
561
562    /// Queue newly-entered close peers for priority sync.
563    ///
564    /// Existing queued peers are retained in their original position and
565    /// duplicates from the same routing-table churn burst are ignored.
566    pub fn queue_priority_peers<I>(&mut self, peers: I) -> usize
567    where
568        I: IntoIterator<Item = PeerId>,
569    {
570        let mut queued = 0;
571        for peer in peers {
572            if self.priority_order.contains(&peer) {
573                continue;
574            }
575            self.priority_order.push_back(peer);
576            queued += 1;
577        }
578        queued
579    }
580
581    /// Drop pending sync peers that are no longer in the close-peer set.
582    ///
583    /// Peers still in `close_peers` keep their relative position. The cursor is
584    /// adjusted so already-scanned retained peers are not selected again.
585    pub fn retain_sync_peers(&mut self, close_peers: &HashSet<PeerId>) -> usize {
586        let old_priority_len = self.priority_order.len();
587        self.priority_order
588            .retain(|peer| close_peers.contains(peer));
589
590        let old_order_len = self.order.len();
591        let old_cursor = self.cursor;
592        let mut retained_before_cursor = 0;
593        let mut retained_order = Vec::with_capacity(old_order_len);
594        for (idx, peer) in self.order.drain(..).enumerate() {
595            if close_peers.contains(&peer) {
596                if idx < old_cursor {
597                    retained_before_cursor += 1;
598                }
599                retained_order.push(peer);
600            }
601        }
602
603        self.order = retained_order;
604        self.cursor = retained_before_cursor;
605
606        (old_priority_len - self.priority_order.len()) + (old_order_len - self.order.len())
607    }
608
609    /// Remove a peer from any pending neighbor-sync state.
610    pub fn remove_peer(&mut self, peer: &PeerId) -> bool {
611        let old_priority_len = self.priority_order.len();
612        self.priority_order.retain(|queued| queued != peer);
613
614        let old_order_len = self.order.len();
615        if let Some(pos) = self.order.iter().position(|queued| queued == peer) {
616            self.order.remove(pos);
617            if pos < self.cursor {
618                self.cursor = self.cursor.saturating_sub(1);
619            }
620        }
621
622        old_priority_len != self.priority_order.len() || old_order_len != self.order.len()
623    }
624
625    /// Whether the current cycle is complete.
626    #[must_use]
627    pub fn is_cycle_complete(&self) -> bool {
628        self.priority_order.is_empty() && self.cursor >= self.order.len()
629    }
630}
631
632// ---------------------------------------------------------------------------
633// Bootstrap drain state (Section 16)
634// ---------------------------------------------------------------------------
635
636/// Bootstrap drain state tracking (Section 16).
637#[derive(Debug)]
638pub struct BootstrapState {
639    /// Whether bootstrap is complete (all peer requests done, queues empty).
640    pub drained: bool,
641    /// Number of bootstrap peer requests still pending.
642    pub pending_peer_requests: usize,
643    /// Keys discovered during bootstrap that are still in the verification /
644    /// fetch pipeline.
645    pub pending_keys: HashSet<XorName>,
646    /// Peers whose last bootstrap admission cycle had one or more hints
647    /// silently dropped at the `pending_verify` capacity bounds. Each entry
648    /// represents "this source still owes us at least one re-hinted key
649    /// after the queues drain". `check_bootstrap_drained` refuses to claim
650    /// the node fully drained while this set is non-empty: a source's
651    /// presence is cleared by its next admission cycle that completes with
652    /// zero capacity rejections (i.e. the source successfully re-delivered
653    /// everything that previously overflowed). Tracking per-source instead
654    /// of a global counter prevents one peer's rejection from being
655    /// "cleared" by an unrelated peer's clean cycle.
656    pub capacity_rejected_sources: HashSet<PeerId>,
657}
658
659impl BootstrapState {
660    /// Create initial bootstrap state.
661    #[must_use]
662    pub fn new() -> Self {
663        Self {
664            drained: false,
665            pending_peer_requests: 0,
666            pending_keys: HashSet::new(),
667            capacity_rejected_sources: HashSet::new(),
668        }
669    }
670
671    /// Check if bootstrap is drained.
672    ///
673    /// Only returns `true` after [`super::bootstrap::check_bootstrap_drained`] or
674    /// [`super::bootstrap::mark_bootstrap_drained`] has explicitly set the flag. A fresh
675    /// `BootstrapState` is NOT drained — the audit loop must wait until
676    /// bootstrap work has actually completed (Invariant 19).
677    #[must_use]
678    pub fn is_drained(&self) -> bool {
679        self.drained
680    }
681
682    /// Remove a key from the bootstrap pending set.
683    ///
684    /// Called when a key terminally leaves the verification/fetch pipeline
685    /// (stored, abandoned, quorum failed, etc.) so the drain check set
686    /// shrinks incrementally rather than being re-scanned in full.
687    pub fn remove_key(&mut self, key: &XorName) {
688        self.pending_keys.remove(key);
689    }
690}
691
692impl Default for BootstrapState {
693    fn default() -> Self {
694        Self::new()
695    }
696}
697
698// ---------------------------------------------------------------------------
699// Tests
700// ---------------------------------------------------------------------------
701
702#[cfg(test)]
703mod tests {
704    use std::collections::BinaryHeap;
705
706    use super::*;
707
708    /// Helper: build a `PeerId` from a single byte (zero-padded to 32 bytes).
709    fn peer_id_from_byte(b: u8) -> PeerId {
710        let mut bytes = [0u8; 32];
711        bytes[0] = b;
712        PeerId::from_bytes(bytes)
713    }
714
715    fn mature_hint_times() -> (Instant, Instant) {
716        let hinted_at = Instant::now();
717        let now = hinted_at
718            .checked_add(REPAIR_HINT_MIN_AGE)
719            .unwrap_or(hinted_at);
720        (hinted_at, now)
721    }
722
723    // -- FetchCandidate ordering -------------------------------------------
724
725    #[test]
726    fn fetch_candidate_nearest_key_has_highest_priority() {
727        let near = FetchCandidate {
728            key: [1u8; 32],
729            distance: [
730                0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
731                0, 0, 0, 0,
732            ],
733            sources: vec![peer_id_from_byte(1)],
734        };
735
736        let far = FetchCandidate {
737            key: [2u8; 32],
738            distance: [
739                0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
740                0, 0, 0, 0, 0,
741            ],
742            sources: vec![peer_id_from_byte(2)],
743        };
744
745        // In a max-heap the "greatest" element pops first.
746        // Our reversed Ord makes smaller-distance candidates greater.
747        assert!(near > far, "nearer candidate should compare greater");
748
749        let mut heap = BinaryHeap::new();
750        heap.push(far.clone());
751        heap.push(near.clone());
752
753        assert_eq!(heap.len(), 2, "heap should contain both candidates");
754
755        let first = heap.pop();
756        assert!(first.is_some(), "first pop should succeed");
757        assert_eq!(
758            first.map(|c| c.key),
759            Some(near.key),
760            "nearest key should pop first"
761        );
762
763        let second = heap.pop();
764        assert!(second.is_some(), "second pop should succeed");
765        assert_eq!(
766            second.map(|c| c.key),
767            Some(far.key),
768            "farthest key should pop second"
769        );
770    }
771
772    #[test]
773    fn fetch_candidate_same_distance_and_key_is_equal() {
774        let a = FetchCandidate {
775            key: [1u8; 32],
776            distance: [5u8; 32],
777            sources: vec![],
778        };
779
780        let b = FetchCandidate {
781            key: [1u8; 32],
782            distance: [5u8; 32],
783            sources: vec![],
784        };
785
786        assert_eq!(
787            a.cmp(&b),
788            Ordering::Equal,
789            "same distance + same key should yield Equal"
790        );
791        assert_eq!(a, b, "PartialEq must agree with Ord");
792    }
793
794    #[test]
795    fn fetch_candidate_same_distance_different_key_is_deterministic() {
796        let a = FetchCandidate {
797            key: [1u8; 32],
798            distance: [5u8; 32],
799            sources: vec![],
800        };
801
802        let b = FetchCandidate {
803            key: [2u8; 32],
804            distance: [5u8; 32],
805            sources: vec![],
806        };
807
808        assert_ne!(
809            a.cmp(&b),
810            Ordering::Equal,
811            "same distance + different key must not be Equal"
812        );
813        assert_ne!(a, b, "PartialEq must agree with Ord");
814    }
815
816    // -- PeerSyncRecord ----------------------------------------------------
817
818    #[test]
819    fn peer_sync_record_no_sync_yet() {
820        let record = PeerSyncRecord {
821            last_sync: None,
822            cycles_since_sync: 0,
823        };
824        assert!(
825            !record.has_repair_opportunity(),
826            "never-synced peer has no repair opportunity"
827        );
828    }
829
830    #[test]
831    fn peer_sync_record_synced_but_no_cycle() {
832        let record = PeerSyncRecord {
833            last_sync: Some(Instant::now()),
834            cycles_since_sync: 0,
835        };
836        assert!(
837            !record.has_repair_opportunity(),
838            "synced peer with zero subsequent cycles has no repair opportunity"
839        );
840    }
841
842    #[test]
843    fn peer_sync_record_synced_with_cycle() {
844        let record = PeerSyncRecord {
845            last_sync: Some(Instant::now()),
846            cycles_since_sync: 1,
847        };
848        assert!(
849            record.has_repair_opportunity(),
850            "synced peer with >= 1 cycle should have repair opportunity"
851        );
852    }
853
854    #[test]
855    fn peer_sync_record_no_sync_many_cycles() {
856        let record = PeerSyncRecord {
857            last_sync: None,
858            cycles_since_sync: 10,
859        };
860        assert!(
861            !record.has_repair_opportunity(),
862            "never-synced peer has no repair opportunity regardless of cycle count"
863        );
864    }
865
866    // -- RepairProofs --------------------------------------------------------
867
868    #[test]
869    fn repair_proofs_record_sent_hint_for_close_peer() {
870        const HINT_EPOCH: u64 = 7;
871        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
872
873        let key = [0xA1; 32];
874        let peer = peer_id_from_byte(1);
875        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
876        let mut proofs = RepairProofs::new();
877        let (hinted_at, now) = mature_hint_times();
878
879        assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at,));
880
881        assert!(
882            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
883            "old sent hint should make key auditable for that peer"
884        );
885    }
886
887    #[test]
888    fn repair_proofs_reject_peer_outside_current_close_group() {
889        const HINT_EPOCH: u64 = 7;
890        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
891
892        let key = [0xA2; 32];
893        let peer = peer_id_from_byte(1);
894        let close_peers = HashSet::from([peer_id_from_byte(2), peer_id_from_byte(3)]);
895        let mut proofs = RepairProofs::new();
896        let (hinted_at, now) = mature_hint_times();
897
898        assert!(!proofs.record_replica_hint_sent_at(
899            peer,
900            key,
901            &close_peers,
902            HINT_EPOCH,
903            hinted_at,
904        ));
905
906        assert!(
907            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
908            "peers outside current close group must not get repair proof"
909        );
910    }
911
912    #[test]
913    fn repair_proofs_require_later_epoch() {
914        const HINT_EPOCH: u64 = 7;
915        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
916
917        let key = [0xA3; 32];
918        let peer = peer_id_from_byte(1);
919        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
920        let mut proofs = RepairProofs::new();
921        let (hinted_at, now) = mature_hint_times();
922
923        assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at,));
924
925        assert!(
926            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, HINT_EPOCH, now),
927            "same-cycle proof should not be audit-eligible"
928        );
929        assert!(
930            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
931            "old proof should mature after a later local sync-cycle epoch"
932        );
933    }
934
935    #[test]
936    fn repair_proofs_require_min_hint_age() {
937        const HINT_EPOCH: u64 = 7;
938        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
939
940        let key = [0xA8; 32];
941        let peer = peer_id_from_byte(1);
942        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
943        let mut proofs = RepairProofs::new();
944        let hinted_at = Instant::now();
945
946        assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at));
947
948        assert!(
949            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, hinted_at),
950            "fresh repair hints should not be audit-eligible"
951        );
952        assert!(
953            proofs.has_mature_replica_hint(
954                &peer,
955                &key,
956                &close_peers,
957                CURRENT_EPOCH,
958                hinted_at
959                    .checked_add(REPAIR_HINT_MIN_AGE)
960                    .unwrap_or(hinted_at),
961            ),
962            "repair hints should mature once they are at least the minimum age"
963        );
964    }
965
966    #[test]
967    fn repair_proofs_repeated_hint_does_not_reset_maturity() {
968        const HINT_EPOCH: u64 = 7;
969        const REPEATED_HINT_EPOCH: u64 = HINT_EPOCH + 1;
970
971        let key = [0xA5; 32];
972        let peer = peer_id_from_byte(1);
973        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
974        let mut proofs = RepairProofs::new();
975        let (hinted_at, now) = mature_hint_times();
976
977        assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at,));
978        assert!(
979            !proofs.record_replica_hint_sent_at(peer, key, &close_peers, REPEATED_HINT_EPOCH, now),
980            "duplicate hint in the same close group should keep existing proof"
981        );
982        assert!(
983            proofs.has_mature_replica_hint(&peer, &key, &close_peers, REPEATED_HINT_EPOCH, now),
984            "duplicate hint must not reset an already mature proof"
985        );
986    }
987
988    #[test]
989    fn repair_proofs_retain_stable_peers_on_close_group_change() {
990        const HINT_EPOCH: u64 = 7;
991        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
992
993        let key = [0xA7; 32];
994        let stable_peer = peer_id_from_byte(1);
995        let departing_peer = peer_id_from_byte(2);
996        let retained_peer = peer_id_from_byte(3);
997        let new_peer = peer_id_from_byte(4);
998        let old_group = HashSet::from([stable_peer, departing_peer, retained_peer]);
999        let changed_group = HashSet::from([stable_peer, retained_peer, new_peer]);
1000        let mut proofs = RepairProofs::new();
1001        let (hinted_at, now) = mature_hint_times();
1002
1003        assert!(proofs.record_replica_hint_sent_at(
1004            stable_peer,
1005            key,
1006            &old_group,
1007            HINT_EPOCH,
1008            hinted_at,
1009        ));
1010        assert!(proofs.record_replica_hint_sent_at(
1011            departing_peer,
1012            key,
1013            &old_group,
1014            HINT_EPOCH,
1015            hinted_at,
1016        ));
1017
1018        assert!(
1019            proofs.has_mature_replica_hint(&stable_peer, &key, &changed_group, CURRENT_EPOCH, now),
1020            "stable peers should keep mature repair proofs across unrelated close-group churn"
1021        );
1022        assert!(
1023            !proofs.has_mature_replica_hint(
1024                &departing_peer,
1025                &key,
1026                &changed_group,
1027                CURRENT_EPOCH,
1028                now,
1029            ),
1030            "peers that left the close group should lose repair proofs"
1031        );
1032        assert!(
1033            !proofs.has_mature_replica_hint(&new_peer, &key, &changed_group, CURRENT_EPOCH, now),
1034            "new close-group peers need their own repair hint before auditing"
1035        );
1036    }
1037
1038    #[test]
1039    fn repair_proofs_evicted_peer_reentry_requires_fresh_hint() {
1040        const FIRST_HINT_EPOCH: u64 = 7;
1041        const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1;
1042        const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1;
1043
1044        let key = [0xA3; 32];
1045        let returning_peer = peer_id_from_byte(1);
1046        let new_peer = peer_id_from_byte(4);
1047        let old_group = HashSet::from([returning_peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
1048        let changed_group = HashSet::from([new_peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
1049        let mut proofs = RepairProofs::new();
1050        let (hinted_at, now) = mature_hint_times();
1051
1052        assert!(proofs.record_replica_hint_sent_at(
1053            returning_peer,
1054            key,
1055            &old_group,
1056            FIRST_HINT_EPOCH,
1057            hinted_at,
1058        ));
1059
1060        assert!(
1061            !proofs.has_mature_replica_hint(
1062                &new_peer,
1063                &key,
1064                &changed_group,
1065                SECOND_HINT_EPOCH,
1066                now
1067            ),
1068            "new close-group peer should not inherit another peer's repair proof"
1069        );
1070        assert!(
1071            !proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH, now),
1072            "a peer that re-enters must receive a fresh repair hint"
1073        );
1074
1075        assert!(proofs.record_replica_hint_sent_at(
1076            returning_peer,
1077            key,
1078            &old_group,
1079            SECOND_HINT_EPOCH,
1080            hinted_at,
1081        ));
1082        assert!(
1083            proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH, now),
1084            "fresh repair hint after re-entry should be eligible once mature"
1085        );
1086    }
1087
1088    #[test]
1089    fn repair_proofs_remove_peer_requires_fresh_hint_after_reentry() {
1090        const FIRST_HINT_EPOCH: u64 = 7;
1091        const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1;
1092        const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1;
1093
1094        let key = [0xA6; 32];
1095        let peer = peer_id_from_byte(1);
1096        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
1097        let mut proofs = RepairProofs::new();
1098        let (hinted_at, now) = mature_hint_times();
1099
1100        assert!(proofs.record_replica_hint_sent_at(
1101            peer,
1102            key,
1103            &close_peers,
1104            FIRST_HINT_EPOCH,
1105            hinted_at,
1106        ));
1107        proofs.remove_peer(&peer);
1108
1109        assert!(
1110            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
1111            "routing-table removal should clear proof even if peer re-enters same close group"
1112        );
1113
1114        assert!(proofs.record_replica_hint_sent_at(
1115            peer,
1116            key,
1117            &close_peers,
1118            SECOND_HINT_EPOCH,
1119            hinted_at,
1120        ));
1121        assert!(
1122            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
1123            "fresh hint after re-entry should become eligible after a later epoch"
1124        );
1125    }
1126
1127    #[test]
1128    fn repair_proofs_remove_key_clears_all_peer_entries() {
1129        const HINT_EPOCH: u64 = 7;
1130        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
1131
1132        let key = [0xA4; 32];
1133        let peer = peer_id_from_byte(1);
1134        let close_peers = HashSet::from([peer]);
1135        let mut proofs = RepairProofs::new();
1136        let (hinted_at, now) = mature_hint_times();
1137
1138        assert!(proofs.record_replica_hint_sent_at(peer, key, &close_peers, HINT_EPOCH, hinted_at,));
1139        proofs.remove_key(&key);
1140
1141        assert!(
1142            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH, now),
1143            "deleted local key should not retain repair proof entries"
1144        );
1145    }
1146
1147    // -- NeighborSyncState -------------------------------------------------
1148
1149    #[test]
1150    fn neighbor_sync_empty_cycle_is_immediately_complete() {
1151        let state = NeighborSyncState::new_cycle(vec![]);
1152        assert!(
1153            state.is_cycle_complete(),
1154            "empty neighbor list means cycle is complete"
1155        );
1156    }
1157
1158    #[test]
1159    fn neighbor_sync_new_cycle_not_complete() {
1160        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1161        let state = NeighborSyncState::new_cycle(peers);
1162        assert!(
1163            !state.is_cycle_complete(),
1164            "fresh cycle with peers should not be complete"
1165        );
1166    }
1167
1168    #[test]
1169    fn neighbor_sync_cycle_completes_when_cursor_reaches_end() {
1170        let peers = vec![
1171            peer_id_from_byte(1),
1172            peer_id_from_byte(2),
1173            peer_id_from_byte(3),
1174        ];
1175        let mut state = NeighborSyncState::new_cycle(peers);
1176
1177        // Simulate stepping through the cycle.
1178        state.cursor = 2;
1179        assert!(
1180            !state.is_cycle_complete(),
1181            "cursor at len-1 should not be complete"
1182        );
1183
1184        state.cursor = 3;
1185        assert!(
1186            state.is_cycle_complete(),
1187            "cursor at len should be complete"
1188        );
1189    }
1190
1191    #[test]
1192    fn neighbor_sync_cursor_past_end_is_still_complete() {
1193        let peers = vec![peer_id_from_byte(1)];
1194        let mut state = NeighborSyncState::new_cycle(peers);
1195        state.cursor = 5;
1196        assert!(
1197            state.is_cycle_complete(),
1198            "cursor past end should still report complete"
1199        );
1200    }
1201
1202    #[test]
1203    fn neighbor_sync_priority_queue_blocks_cycle_completion() {
1204        let peer = peer_id_from_byte(2);
1205        let mut state = NeighborSyncState::new_cycle(Vec::new());
1206
1207        assert!(state.is_cycle_complete());
1208        assert_eq!(state.queue_priority_peers([peer]), 1);
1209        assert!(
1210            !state.is_cycle_complete(),
1211            "pending priority peers must sync before the cycle completes"
1212        );
1213    }
1214
1215    #[test]
1216    fn neighbor_sync_priority_queue_deduplicates_peers() {
1217        let peer = peer_id_from_byte(3);
1218        let mut state = NeighborSyncState::new_cycle(Vec::new());
1219
1220        assert_eq!(state.queue_priority_peers([peer, peer]), 1);
1221        assert_eq!(state.priority_order.len(), 1);
1222    }
1223
1224    #[test]
1225    fn neighbor_sync_remove_peer_clears_order_and_priority_queue() {
1226        let peer = peer_id_from_byte(4);
1227        let retained = peer_id_from_byte(5);
1228        let mut state = NeighborSyncState::new_cycle(vec![peer, retained]);
1229        assert_eq!(state.queue_priority_peers([peer]), 1);
1230        state.cursor = 1;
1231
1232        assert!(state.remove_peer(&peer));
1233
1234        assert!(!state.order.contains(&peer));
1235        assert!(!state.priority_order.contains(&peer));
1236        assert_eq!(state.order, vec![retained]);
1237        assert_eq!(state.cursor, 0);
1238    }
1239
1240    #[test]
1241    fn neighbor_sync_retain_sync_peers_prunes_only_departed_peers() {
1242        let already_scanned = peer_id_from_byte(1);
1243        let stable_scanned = peer_id_from_byte(2);
1244        let departed_priority = peer_id_from_byte(3);
1245        let stable_unscanned = peer_id_from_byte(4);
1246        let stable_priority = peer_id_from_byte(5);
1247        let mut state = NeighborSyncState::new_cycle(vec![
1248            already_scanned,
1249            stable_scanned,
1250            departed_priority,
1251            stable_unscanned,
1252        ]);
1253        assert_eq!(
1254            state.queue_priority_peers([departed_priority, stable_priority]),
1255            2
1256        );
1257        state.cursor = 2;
1258        let close_peers = HashSet::from([stable_scanned, stable_unscanned, stable_priority]);
1259
1260        let removed = state.retain_sync_peers(&close_peers);
1261
1262        assert_eq!(removed, 3);
1263        assert_eq!(state.order, vec![stable_scanned, stable_unscanned]);
1264        assert_eq!(
1265            state.priority_order.iter().copied().collect::<Vec<_>>(),
1266            vec![stable_priority]
1267        );
1268        assert_eq!(
1269            state.cursor, 1,
1270            "stable peer scanned before churn must not be selected again"
1271        );
1272    }
1273
1274    #[test]
1275    fn bootstrap_claim_history_prevents_second_grace_window() {
1276        let peer = peer_id_from_byte(9);
1277        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1278        let first_seen = Instant::now();
1279        let grace = Duration::from_secs(60);
1280
1281        assert_eq!(
1282            state.observe_bootstrap_claim(peer, first_seen, grace),
1283            BootstrapClaimObservation::WithinGrace { first_seen }
1284        );
1285        assert!(state.clear_active_bootstrap_claim(&peer));
1286        assert!(!state.bootstrap_claims.contains_key(&peer));
1287        assert!(state.bootstrap_claim_history.contains_key(&peer));
1288
1289        assert_eq!(
1290            state.observe_bootstrap_claim(peer, first_seen + Duration::from_secs(1), grace),
1291            BootstrapClaimObservation::Repeated { first_seen }
1292        );
1293        assert!(
1294            !state.bootstrap_claims.contains_key(&peer),
1295            "repeated claims must not recreate an active grace window"
1296        );
1297        assert_eq!(
1298            state.observe_bootstrap_claim(peer, first_seen + Duration::from_secs(2), grace),
1299            BootstrapClaimObservation::Repeated { first_seen }
1300        );
1301    }
1302
1303    #[test]
1304    fn bootstrap_claim_active_window_reports_past_grace() {
1305        let peer = peer_id_from_byte(10);
1306        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1307        let first_seen = Instant::now();
1308        let grace = Duration::from_secs(60);
1309
1310        let _ = state.observe_bootstrap_claim(peer, first_seen, grace);
1311
1312        assert_eq!(
1313            state.observe_bootstrap_claim(peer, first_seen + grace + Duration::from_secs(1), grace),
1314            BootstrapClaimObservation::PastGrace { first_seen }
1315        );
1316    }
1317
1318    // -- BootstrapState ----------------------------------------------------
1319
1320    #[test]
1321    fn bootstrap_state_initial_not_drained() {
1322        // A freshly created state must NOT report drained — the bootstrap
1323        // sync task has not started yet (Invariant 19 race prevention).
1324        let state = BootstrapState::new();
1325        assert!(
1326            !state.is_drained(),
1327            "initial state must not be drained before bootstrap begins"
1328        );
1329    }
1330
1331    #[test]
1332    fn bootstrap_state_pending_requests_block_drain() {
1333        let mut state = BootstrapState::new();
1334        state.pending_peer_requests = 3;
1335        assert!(
1336            !state.is_drained(),
1337            "pending peer requests should block drain"
1338        );
1339    }
1340
1341    #[test]
1342    fn bootstrap_state_pending_keys_block_drain() {
1343        let mut state = BootstrapState::new();
1344        state.pending_keys.insert([42u8; 32]);
1345        assert!(!state.is_drained(), "pending keys should block drain");
1346    }
1347
1348    #[test]
1349    fn bootstrap_state_explicit_drained_overrides() {
1350        let mut state = BootstrapState::new();
1351        state.pending_peer_requests = 5;
1352        state.pending_keys.insert([99u8; 32]);
1353        state.drained = true;
1354        assert!(
1355            state.is_drained(),
1356            "explicit drained flag should override pending counts"
1357        );
1358    }
1359
1360    #[test]
1361    fn bootstrap_state_requires_explicit_drain() {
1362        let mut state = BootstrapState::new();
1363        state.pending_peer_requests = 2;
1364        state.pending_keys.insert([1u8; 32]);
1365
1366        // Simulate completing work — but without explicit drain flag.
1367        state.pending_peer_requests = 0;
1368        state.pending_keys.clear();
1369
1370        assert!(
1371            !state.is_drained(),
1372            "clearing counters alone must not drain — requires check_bootstrap_drained"
1373        );
1374
1375        // Explicit drain (set by check_bootstrap_drained or mark_bootstrap_drained).
1376        state.drained = true;
1377        assert!(state.is_drained(), "explicit flag should drain");
1378    }
1379
1380    #[test]
1381    fn bootstrap_state_default_matches_new() {
1382        let from_new = BootstrapState::new();
1383        let from_default = BootstrapState::default();
1384
1385        assert_eq!(from_new.drained, from_default.drained);
1386        assert_eq!(
1387            from_new.pending_peer_requests,
1388            from_default.pending_peer_requests
1389        );
1390        assert_eq!(from_new.pending_keys, from_default.pending_keys);
1391    }
1392
1393    // -- Scenario tests -------------------------------------------------------
1394
1395    /// #13: Bootstrap not drained while `pending_keys` overlap with the
1396    /// pipeline. Keys must be removed from `pending_keys` for drain to occur.
1397    #[test]
1398    fn bootstrap_drain_requires_empty_pending_keys() {
1399        let key_a: XorName = [0xA0; 32];
1400        let key_b: XorName = [0xB0; 32];
1401        let key_c: XorName = [0xC0; 32];
1402
1403        let mut state = BootstrapState::new();
1404        state.pending_peer_requests = 0; // requests already done
1405        state.pending_keys = std::iter::once(key_a)
1406            .chain(std::iter::once(key_b))
1407            .chain(std::iter::once(key_c))
1408            .collect();
1409
1410        assert!(
1411            !state.is_drained(),
1412            "should NOT be drained while pending_keys still has entries"
1413        );
1414
1415        // Simulate pipeline processing — remove one key at a time.
1416        state.pending_keys.remove(&key_a);
1417        assert!(!state.is_drained(), "still not drained with 2 pending keys");
1418
1419        state.pending_keys.remove(&key_b);
1420        assert!(!state.is_drained(), "still not drained with 1 pending key");
1421
1422        state.pending_keys.remove(&key_c);
1423        assert!(
1424            !state.is_drained(),
1425            "removing all keys is necessary but not sufficient — needs explicit drain"
1426        );
1427
1428        // Simulate check_bootstrap_drained setting the flag.
1429        state.drained = true;
1430        assert!(state.is_drained(), "explicit drain flag should finalize");
1431    }
1432
1433    /// Verify that the FSM terminal states are distinguishable and document
1434    /// which variants are logically terminal (no outgoing transitions).
1435    #[test]
1436    fn verification_state_terminal_variants() {
1437        let terminal_states = [
1438            VerificationState::QuorumAbandoned,
1439            VerificationState::FetchAbandoned,
1440            VerificationState::Stored,
1441            VerificationState::Idle,
1442        ];
1443
1444        // All terminal states must be distinct from each other.
1445        for (i, a) in terminal_states.iter().enumerate() {
1446            for (j, b) in terminal_states.iter().enumerate() {
1447                if i != j {
1448                    assert_ne!(
1449                        a, b,
1450                        "terminal states at indices {i} and {j} must be distinct"
1451                    );
1452                }
1453            }
1454        }
1455
1456        // Terminal states must be distinct from all non-terminal states.
1457        let non_terminal_states = [
1458            VerificationState::OfferReceived,
1459            VerificationState::PendingVerify,
1460            VerificationState::QuorumVerified,
1461            VerificationState::PaidListVerified,
1462            VerificationState::QueuedForFetch,
1463            VerificationState::Fetching,
1464            VerificationState::FetchRetryable,
1465            VerificationState::QuorumFailed,
1466            VerificationState::QuorumInconclusive,
1467        ];
1468
1469        for terminal in &terminal_states {
1470            for non_terminal in &non_terminal_states {
1471                assert_ne!(
1472                    terminal, non_terminal,
1473                    "terminal state {terminal:?} must not equal non-terminal state {non_terminal:?}"
1474                );
1475            }
1476        }
1477    }
1478
1479    /// `has_repair_opportunity` requires BOTH a previous sync AND at least
1480    /// one subsequent cycle.
1481    #[test]
1482    fn repair_opportunity_requires_both_sync_and_cycle() {
1483        // last_sync = Some, cycles_since_sync = 0 → false (synced but no cycle yet)
1484        let synced_no_cycle = PeerSyncRecord {
1485            last_sync: Some(
1486                Instant::now()
1487                    .checked_sub(std::time::Duration::from_secs(2))
1488                    .unwrap_or_else(Instant::now),
1489            ),
1490            cycles_since_sync: 0,
1491        };
1492        assert!(
1493            !synced_no_cycle.has_repair_opportunity(),
1494            "synced with zero subsequent cycles should NOT have repair opportunity"
1495        );
1496
1497        // last_sync = None, cycles_since_sync = 5 → false (never synced)
1498        let never_synced = PeerSyncRecord {
1499            last_sync: None,
1500            cycles_since_sync: 5,
1501        };
1502        assert!(
1503            !never_synced.has_repair_opportunity(),
1504            "never-synced peer should NOT have repair opportunity regardless of cycles"
1505        );
1506
1507        // last_sync = Some, cycles_since_sync = 1 → true
1508        let ready = PeerSyncRecord {
1509            last_sync: Some(
1510                Instant::now()
1511                    .checked_sub(std::time::Duration::from_secs(5))
1512                    .unwrap_or_else(Instant::now),
1513            ),
1514            cycles_since_sync: 1,
1515        };
1516        assert!(
1517            ready.has_repair_opportunity(),
1518            "synced peer with >= 1 cycle SHOULD have repair opportunity"
1519        );
1520    }
1521}