Skip to main content

ant_node/replication/
types.rs

1//! Core types for the replication subsystem.
2//!
3//! These types represent the state machine states, queue entries, and domain
4//! concepts from the Kademlia-style replication design (see
5//! `docs/REPLICATION_DESIGN.md`).
6
7use std::cmp::Ordering;
8use std::collections::{HashMap, HashSet};
9use std::time::{Duration, Instant};
10
11use serde::{Deserialize, Serialize};
12
13use crate::ant_protocol::XorName;
14use saorsa_core::identity::PeerId;
15
16// ---------------------------------------------------------------------------
17// Verification state machine (Section 8 of REPLICATION_DESIGN.md)
18// ---------------------------------------------------------------------------
19
20/// Verification state machine.
21///
22/// Each unknown key transitions through these states exactly once per offer
23/// lifecycle.  See Section 8 of `REPLICATION_DESIGN.md` for the full
24/// state-transition diagram.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum VerificationState {
27    /// Offer received, not yet processed.
28    OfferReceived,
29    /// Passed admission filter, awaiting quorum / paid-list verification.
30    PendingVerify,
31    /// Presence quorum passed (>= `QuorumNeeded` positives from
32    /// `QuorumTargets`).
33    QuorumVerified,
34    /// Paid-list authorisation succeeded (>= `ConfirmNeeded` confirmations or
35    /// derived from replica majority).
36    PaidListVerified,
37    /// Queued for record fetch.
38    QueuedForFetch,
39    /// Actively fetching from a verified source.
40    Fetching,
41    /// Successfully stored locally.
42    Stored,
43    /// Fetch failed but retryable (alternate sources remain).
44    FetchRetryable,
45    /// Fetch permanently abandoned (terminal failure or no alternate sources).
46    FetchAbandoned,
47    /// Quorum failed definitively (both paid-list and presence impossible this
48    /// round).
49    QuorumFailed,
50    /// Quorum inconclusive (timeout with neither success nor fail-fast).
51    QuorumInconclusive,
52    /// Terminal: quorum abandoned, key forgotten.
53    QuorumAbandoned,
54    /// Terminal: key returned to idle (forgotten, requires new offer to
55    /// re-enter).
56    Idle,
57}
58
59// ---------------------------------------------------------------------------
60// Hint pipeline classification
61// ---------------------------------------------------------------------------
62
63/// Whether a key was admitted via replica hints or paid hints only.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65pub enum HintPipeline {
66    /// Key is in the admitted replica-hint pipeline (fetch-eligible).
67    Replica,
68    /// Key is in the paid-hint-only pipeline (`PaidForList` update only, no
69    /// fetch).
70    PaidOnly,
71}
72
73// ---------------------------------------------------------------------------
74// Pending-verification table entry
75// ---------------------------------------------------------------------------
76
77/// Entry in the pending-verification table.
78///
79/// Tracks a single key through the verification FSM, recording which peers
80/// responded and which have been tried for fetch.
81#[derive(Debug, Clone)]
82pub struct VerificationEntry {
83    /// Current state in the verification FSM.
84    pub state: VerificationState,
85    /// Which pipeline admitted this key.
86    pub pipeline: HintPipeline,
87    /// Peers that responded `Present` during verification (verified fetch
88    /// sources).
89    pub verified_sources: Vec<PeerId>,
90    /// Peers already tried for fetch (to avoid retrying the same source).
91    pub tried_sources: HashSet<PeerId>,
92    /// When this entry was created.
93    pub created_at: Instant,
94    /// The peer that originally hinted this key (for source tracking).
95    pub hint_sender: PeerId,
96}
97
98// ---------------------------------------------------------------------------
99// Fetch queue candidate
100// ---------------------------------------------------------------------------
101
102/// A candidate queued for fetch, ordered by relevance (nearest-first).
103///
104/// Implements [`Ord`] with *reversed* distance comparison so that a
105/// [`BinaryHeap`](std::collections::BinaryHeap) (max-heap) dequeues the
106/// nearest key first.
107#[derive(Debug, Clone)]
108pub struct FetchCandidate {
109    /// The key to fetch.
110    pub key: XorName,
111    /// XOR distance from self to key (for priority ordering).
112    pub distance: XorName,
113    /// Verified source peers that responded `Present`.
114    pub sources: Vec<PeerId>,
115}
116
117impl Eq for FetchCandidate {}
118
119impl PartialEq for FetchCandidate {
120    fn eq(&self, other: &Self) -> bool {
121        self.distance == other.distance && self.key == other.key
122    }
123}
124
125impl Ord for FetchCandidate {
126    fn cmp(&self, other: &Self) -> Ordering {
127        // Reverse ordering: smaller distance = higher priority (BinaryHeap is
128        // max-heap).  Tie-break on key for consistency with PartialEq.
129        other
130            .distance
131            .cmp(&self.distance)
132            .then_with(|| self.key.cmp(&other.key))
133    }
134}
135
136impl PartialOrd for FetchCandidate {
137    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
138        Some(self.cmp(other))
139    }
140}
141
142// ---------------------------------------------------------------------------
143// Verification evidence types
144// ---------------------------------------------------------------------------
145
146/// Per-key presence evidence from a verification round.
147#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
148pub enum PresenceEvidence {
149    /// Peer holds the record.
150    Present,
151    /// Peer does not hold the record.
152    Absent,
153    /// Peer did not respond in time (neutral, not negative).
154    Unresolved,
155}
156
157/// Per-key paid-list evidence from a verification round.
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
159pub enum PaidListEvidence {
160    /// Peer confirms key is in its `PaidForList`.
161    Confirmed,
162    /// Peer says key is NOT in its `PaidForList`.
163    NotFound,
164    /// Peer did not respond in time (neutral).
165    Unresolved,
166}
167
168/// Aggregated verification evidence for a single key from one verification
169/// round.
170#[derive(Debug, Clone)]
171pub struct KeyVerificationEvidence {
172    /// Presence evidence per peer (from `QuorumTargets`).
173    pub presence: HashMap<PeerId, PresenceEvidence>,
174    /// Paid-list evidence per peer (from `PaidTargets`).
175    pub paid_list: HashMap<PeerId, PaidListEvidence>,
176}
177
178// ---------------------------------------------------------------------------
179// Failure evidence (Section 14 — TrustEngine integration)
180// ---------------------------------------------------------------------------
181
182/// Counts that describe a confirmed audit failure without logging per-key
183/// detail at `ERROR` level.
184#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
185pub struct AuditFailureSummary {
186    /// Number of keys in the original audit challenge.
187    pub challenged_keys: usize,
188    /// Number of keys that still failed after responsibility confirmation.
189    pub failed_keys: usize,
190    /// Confirmed failures where the responder returned the absent-key sentinel.
191    pub absent_keys: usize,
192    /// Confirmed failures where the responder returned a digest that did not
193    /// match the challenger's local bytes.
194    pub digest_mismatch_keys: usize,
195}
196
197/// Failure evidence types emitted to `TrustEngine` (Section 14).
198#[derive(Debug, Clone)]
199pub enum FailureEvidence {
200    /// Failed fetch attempt from a source peer.
201    ReplicationFailure {
202        /// The peer that failed to serve the record.
203        peer: PeerId,
204        /// The key that could not be fetched.
205        key: XorName,
206    },
207    /// Audit failure with confirmed responsible keys.
208    AuditFailure {
209        /// Unique identifier for the audit challenge.
210        challenge_id: u64,
211        /// The peer that was challenged.
212        challenged_peer: PeerId,
213        /// Keys confirmed as failed.
214        confirmed_failed_keys: Vec<XorName>,
215        /// Aggregated reason counts for the confirmed failures.
216        summary: AuditFailureSummary,
217        /// Why the audit failed.
218        reason: AuditFailureReason,
219    },
220    /// Peer claiming bootstrap past grace period.
221    BootstrapClaimAbuse {
222        /// The offending peer.
223        peer: PeerId,
224        /// When this peer was first seen.
225        first_seen: Instant,
226    },
227}
228
229/// Reason for audit failure.
230#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
231pub enum AuditFailureReason {
232    /// Peer timed out (no response within deadline).
233    Timeout,
234    /// Response was malformed.
235    MalformedResponse,
236    /// One or more per-key digest mismatches.
237    DigestMismatch,
238    /// Key was absent (signalled by sentinel digest).
239    KeyAbsent,
240    /// Peer explicitly rejected the audit challenge.
241    Rejected,
242}
243
244// ---------------------------------------------------------------------------
245// Peer sync tracking
246// ---------------------------------------------------------------------------
247
248/// Record of sync history with a peer, for `RepairOpportunity` tracking.
249#[derive(Debug, Clone)]
250pub struct PeerSyncRecord {
251    /// Last time we successfully synced with this peer.
252    pub last_sync: Option<Instant>,
253    /// Number of full neighbor-sync cycles completed since last sync with this
254    /// peer.
255    pub cycles_since_sync: u32,
256}
257
258impl PeerSyncRecord {
259    /// Whether this peer has had a repair opportunity (synced at least once
260    /// and at least one subsequent cycle has completed).
261    #[must_use]
262    pub fn has_repair_opportunity(&self) -> bool {
263        self.last_sync.is_some() && self.cycles_since_sync >= 1
264    }
265}
266
267// ---------------------------------------------------------------------------
268// Repair proof tracking
269// ---------------------------------------------------------------------------
270
271/// Evidence that this node has sent a replica repair hint for a key to a peer.
272#[derive(Debug, Clone)]
273struct RepairProof {
274    /// Local neighbor-sync cycle epoch when the hint was sent.
275    hinted_at_epoch: u64,
276}
277
278/// Repair proofs for one key, tied to the close-group snapshot they were
279/// recorded against.
280#[derive(Debug, Clone)]
281struct RepairProofEntry {
282    /// Self-inclusive close group observed when these proofs were recorded.
283    close_peers: HashSet<PeerId>,
284    /// Per-peer proof metadata for peers in `close_peers`.
285    peer_proofs: HashMap<PeerId, RepairProof>,
286}
287
288impl RepairProofEntry {
289    fn new(close_peers: HashSet<PeerId>) -> Self {
290        Self {
291            close_peers,
292            peer_proofs: HashMap::new(),
293        }
294    }
295}
296
297/// Evidence that this node has sent replica repair hints for local keys.
298///
299/// The map is keyed by record key so each key retains only one close-group
300/// snapshot and at most that snapshot's peers. This bounds memory by local key
301/// count times the replication close-group size rather than by churn history.
302#[derive(Debug, Clone, Default)]
303pub struct RepairProofs {
304    /// Key-scoped repair proofs.
305    proofs_by_key: HashMap<XorName, RepairProofEntry>,
306}
307
308impl RepairProofs {
309    /// Create an empty repair-proof table.
310    #[must_use]
311    pub fn new() -> Self {
312        Self::default()
313    }
314
315    /// Record that `peer` was sent a replica repair hint for `key`.
316    ///
317    /// `current_close_peers` must be the current self-inclusive close group for
318    /// `key`. If that close group differs from the previous proof snapshot,
319    /// proofs for peers that left the close group are invalidated before
320    /// recording. Stable peers keep their proofs, while a peer that leaves and
321    /// later re-enters still needs a fresh hint.
322    pub fn record_replica_hint_sent(
323        &mut self,
324        peer: PeerId,
325        key: XorName,
326        current_close_peers: &HashSet<PeerId>,
327        hinted_at_epoch: u64,
328    ) -> bool {
329        self.reconcile_key_close_group(&key, current_close_peers);
330
331        if !current_close_peers.contains(&peer) {
332            return false;
333        }
334
335        let entry = self
336            .proofs_by_key
337            .entry(key)
338            .or_insert_with(|| RepairProofEntry::new(current_close_peers.clone()));
339
340        if entry.peer_proofs.contains_key(&peer) {
341            return false;
342        }
343
344        entry
345            .peer_proofs
346            .insert(peer, RepairProof { hinted_at_epoch });
347        true
348    }
349
350    /// Whether this node has mature repair-hint evidence for `(peer, key)`.
351    ///
352    /// The check invalidates proofs for peers that have left the current
353    /// self-inclusive close group. A proof is mature only after at least one
354    /// later local sync-cycle epoch.
355    pub fn has_mature_replica_hint(
356        &mut self,
357        peer: &PeerId,
358        key: &XorName,
359        current_close_peers: &HashSet<PeerId>,
360        current_epoch: u64,
361    ) -> bool {
362        self.reconcile_key_close_group(key, current_close_peers);
363
364        self.proofs_by_key
365            .get(key)
366            .and_then(|entry| entry.peer_proofs.get(peer))
367            .is_some_and(|proof| proof.hinted_at_epoch < current_epoch)
368    }
369
370    /// Remove all repair proofs for a key, e.g. after local deletion.
371    pub fn remove_key(&mut self, key: &XorName) {
372        self.proofs_by_key.remove(key);
373    }
374
375    /// Remove all repair proofs for a peer, e.g. after routing-table removal.
376    pub fn remove_peer(&mut self, peer: &PeerId) {
377        self.proofs_by_key.retain(|_, entry| {
378            entry.peer_proofs.remove(peer);
379            !entry.peer_proofs.is_empty()
380        });
381    }
382
383    fn reconcile_key_close_group(&mut self, key: &XorName, current_close_peers: &HashSet<PeerId>) {
384        let should_remove = if let Some(entry) = self.proofs_by_key.get_mut(key) {
385            if entry.close_peers == *current_close_peers {
386                return;
387            }
388
389            entry.close_peers.clone_from(current_close_peers);
390            entry
391                .peer_proofs
392                .retain(|peer, _| current_close_peers.contains(peer));
393            entry.peer_proofs.is_empty()
394        } else {
395            false
396        };
397
398        if should_remove {
399            self.proofs_by_key.remove(key);
400        }
401    }
402}
403
404// ---------------------------------------------------------------------------
405// Neighbor sync cycle state
406// ---------------------------------------------------------------------------
407
408/// Result of observing a peer's bootstrap claim.
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410pub enum BootstrapClaimObservation {
411    /// The peer is inside its first and only bootstrap-claim grace window.
412    WithinGrace {
413        /// First time this peer claimed bootstrap status.
414        first_seen: Instant,
415    },
416    /// The peer has continuously claimed bootstrap status past the grace period.
417    PastGrace {
418        /// First time this peer claimed bootstrap status.
419        first_seen: Instant,
420    },
421    /// The peer previously stopped claiming bootstrap and then claimed it again.
422    Repeated {
423        /// First time this peer ever claimed bootstrap status.
424        first_seen: Instant,
425    },
426}
427
428/// Neighbor sync cycle state.
429///
430/// Tracks a deterministic walk through the current close-group snapshot,
431/// per-peer cooldown times, active bootstrap claims, and peers that have already
432/// used their one bootstrap-claim window.
433#[derive(Debug)]
434pub struct NeighborSyncState {
435    /// Deterministic ordering of peers for the current cycle (snapshot).
436    pub order: Vec<PeerId>,
437    /// Current cursor position into `order`.
438    pub cursor: usize,
439    /// Per-peer last successful sync time (for cooldown).
440    pub last_sync_times: HashMap<PeerId, Instant>,
441    /// Active bootstrap claim first-seen timestamps per peer.
442    ///
443    /// Entries are removed when a peer stops claiming bootstrap. The peer
444    /// remains in `bootstrap_claim_history`, so a later claim is repeated-claim
445    /// abuse instead of a fresh grace period.
446    pub bootstrap_claims: HashMap<PeerId, Instant>,
447    /// First-ever bootstrap claim timestamp per peer.
448    ///
449    /// This is retained after active claims are cleared so each peer gets at
450    /// most one bootstrap-claim grace window. Under Sybil attack with many
451    /// distinct peer IDs claiming bootstrap, this map grows unboundedly. In
452    /// practice the trust engine limits Sybil impact before this becomes a
453    /// memory issue.
454    pub bootstrap_claim_history: HashMap<PeerId, Instant>,
455    /// Cursor used by post-cycle pruning to rotate through stored records when
456    /// the per-pass prune-confirmation budget is exhausted.
457    pub prune_cursor: usize,
458}
459
460impl NeighborSyncState {
461    /// Create a new cycle from the given close neighbors.
462    #[must_use]
463    pub fn new_cycle(close_neighbors: Vec<PeerId>) -> Self {
464        Self {
465            order: close_neighbors,
466            cursor: 0,
467            last_sync_times: HashMap::new(),
468            bootstrap_claims: HashMap::new(),
469            bootstrap_claim_history: HashMap::new(),
470            prune_cursor: 0,
471        }
472    }
473
474    /// Observe a peer claiming bootstrap status.
475    ///
476    /// A peer receives one grace window from its first observed bootstrap claim.
477    /// If it later stops claiming bootstrap, callers should clear only the
478    /// active claim with [`Self::clear_active_bootstrap_claim`]. A subsequent
479    /// claim is then reported as [`BootstrapClaimObservation::Repeated`].
480    #[must_use]
481    pub fn observe_bootstrap_claim(
482        &mut self,
483        peer: PeerId,
484        now: Instant,
485        grace_period: Duration,
486    ) -> BootstrapClaimObservation {
487        if let Some(first_seen) = self.bootstrap_claims.get(&peer).copied() {
488            if now.duration_since(first_seen) > grace_period {
489                BootstrapClaimObservation::PastGrace { first_seen }
490            } else {
491                BootstrapClaimObservation::WithinGrace { first_seen }
492            }
493        } else if let Some(first_seen) = self.bootstrap_claim_history.get(&peer).copied() {
494            BootstrapClaimObservation::Repeated { first_seen }
495        } else {
496            self.bootstrap_claims.insert(peer, now);
497            self.bootstrap_claim_history.insert(peer, now);
498            BootstrapClaimObservation::WithinGrace { first_seen: now }
499        }
500    }
501
502    /// Clear the active bootstrap claim for a peer, retaining claim history.
503    pub fn clear_active_bootstrap_claim(&mut self, peer: &PeerId) -> bool {
504        self.bootstrap_claims.remove(peer).is_some()
505    }
506
507    /// Whether the current cycle is complete.
508    #[must_use]
509    pub fn is_cycle_complete(&self) -> bool {
510        self.cursor >= self.order.len()
511    }
512}
513
514// ---------------------------------------------------------------------------
515// Bootstrap drain state (Section 16)
516// ---------------------------------------------------------------------------
517
518/// Bootstrap drain state tracking (Section 16).
519#[derive(Debug)]
520pub struct BootstrapState {
521    /// Whether bootstrap is complete (all peer requests done, queues empty).
522    pub drained: bool,
523    /// Number of bootstrap peer requests still pending.
524    pub pending_peer_requests: usize,
525    /// Keys discovered during bootstrap that are still in the verification /
526    /// fetch pipeline.
527    pub pending_keys: HashSet<XorName>,
528    /// Peers whose last bootstrap admission cycle had one or more hints
529    /// silently dropped at the `pending_verify` capacity bounds. Each entry
530    /// represents "this source still owes us at least one re-hinted key
531    /// after the queues drain". `check_bootstrap_drained` refuses to claim
532    /// the node fully drained while this set is non-empty: a source's
533    /// presence is cleared by its next admission cycle that completes with
534    /// zero capacity rejections (i.e. the source successfully re-delivered
535    /// everything that previously overflowed). Tracking per-source instead
536    /// of a global counter prevents one peer's rejection from being
537    /// "cleared" by an unrelated peer's clean cycle.
538    pub capacity_rejected_sources: HashSet<PeerId>,
539}
540
541impl BootstrapState {
542    /// Create initial bootstrap state.
543    #[must_use]
544    pub fn new() -> Self {
545        Self {
546            drained: false,
547            pending_peer_requests: 0,
548            pending_keys: HashSet::new(),
549            capacity_rejected_sources: HashSet::new(),
550        }
551    }
552
553    /// Check if bootstrap is drained.
554    ///
555    /// Only returns `true` after [`super::bootstrap::check_bootstrap_drained`] or
556    /// [`super::bootstrap::mark_bootstrap_drained`] has explicitly set the flag. A fresh
557    /// `BootstrapState` is NOT drained — the audit loop must wait until
558    /// bootstrap work has actually completed (Invariant 19).
559    #[must_use]
560    pub fn is_drained(&self) -> bool {
561        self.drained
562    }
563
564    /// Remove a key from the bootstrap pending set.
565    ///
566    /// Called when a key terminally leaves the verification/fetch pipeline
567    /// (stored, abandoned, quorum failed, etc.) so the drain check set
568    /// shrinks incrementally rather than being re-scanned in full.
569    pub fn remove_key(&mut self, key: &XorName) {
570        self.pending_keys.remove(key);
571    }
572}
573
574impl Default for BootstrapState {
575    fn default() -> Self {
576        Self::new()
577    }
578}
579
580// ---------------------------------------------------------------------------
581// Tests
582// ---------------------------------------------------------------------------
583
584#[cfg(test)]
585mod tests {
586    use std::collections::BinaryHeap;
587
588    use super::*;
589
590    /// Helper: build a `PeerId` from a single byte (zero-padded to 32 bytes).
591    fn peer_id_from_byte(b: u8) -> PeerId {
592        let mut bytes = [0u8; 32];
593        bytes[0] = b;
594        PeerId::from_bytes(bytes)
595    }
596
597    // -- FetchCandidate ordering -------------------------------------------
598
599    #[test]
600    fn fetch_candidate_nearest_key_has_highest_priority() {
601        let near = FetchCandidate {
602            key: [1u8; 32],
603            distance: [
604                0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
605                0, 0, 0, 0,
606            ],
607            sources: vec![peer_id_from_byte(1)],
608        };
609
610        let far = FetchCandidate {
611            key: [2u8; 32],
612            distance: [
613                0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
614                0, 0, 0, 0, 0,
615            ],
616            sources: vec![peer_id_from_byte(2)],
617        };
618
619        // In a max-heap the "greatest" element pops first.
620        // Our reversed Ord makes smaller-distance candidates greater.
621        assert!(near > far, "nearer candidate should compare greater");
622
623        let mut heap = BinaryHeap::new();
624        heap.push(far.clone());
625        heap.push(near.clone());
626
627        assert_eq!(heap.len(), 2, "heap should contain both candidates");
628
629        let first = heap.pop();
630        assert!(first.is_some(), "first pop should succeed");
631        assert_eq!(
632            first.map(|c| c.key),
633            Some(near.key),
634            "nearest key should pop first"
635        );
636
637        let second = heap.pop();
638        assert!(second.is_some(), "second pop should succeed");
639        assert_eq!(
640            second.map(|c| c.key),
641            Some(far.key),
642            "farthest key should pop second"
643        );
644    }
645
646    #[test]
647    fn fetch_candidate_same_distance_and_key_is_equal() {
648        let a = FetchCandidate {
649            key: [1u8; 32],
650            distance: [5u8; 32],
651            sources: vec![],
652        };
653
654        let b = FetchCandidate {
655            key: [1u8; 32],
656            distance: [5u8; 32],
657            sources: vec![],
658        };
659
660        assert_eq!(
661            a.cmp(&b),
662            Ordering::Equal,
663            "same distance + same key should yield Equal"
664        );
665        assert_eq!(a, b, "PartialEq must agree with Ord");
666    }
667
668    #[test]
669    fn fetch_candidate_same_distance_different_key_is_deterministic() {
670        let a = FetchCandidate {
671            key: [1u8; 32],
672            distance: [5u8; 32],
673            sources: vec![],
674        };
675
676        let b = FetchCandidate {
677            key: [2u8; 32],
678            distance: [5u8; 32],
679            sources: vec![],
680        };
681
682        assert_ne!(
683            a.cmp(&b),
684            Ordering::Equal,
685            "same distance + different key must not be Equal"
686        );
687        assert_ne!(a, b, "PartialEq must agree with Ord");
688    }
689
690    // -- PeerSyncRecord ----------------------------------------------------
691
692    #[test]
693    fn peer_sync_record_no_sync_yet() {
694        let record = PeerSyncRecord {
695            last_sync: None,
696            cycles_since_sync: 0,
697        };
698        assert!(
699            !record.has_repair_opportunity(),
700            "never-synced peer has no repair opportunity"
701        );
702    }
703
704    #[test]
705    fn peer_sync_record_synced_but_no_cycle() {
706        let record = PeerSyncRecord {
707            last_sync: Some(Instant::now()),
708            cycles_since_sync: 0,
709        };
710        assert!(
711            !record.has_repair_opportunity(),
712            "synced peer with zero subsequent cycles has no repair opportunity"
713        );
714    }
715
716    #[test]
717    fn peer_sync_record_synced_with_cycle() {
718        let record = PeerSyncRecord {
719            last_sync: Some(Instant::now()),
720            cycles_since_sync: 1,
721        };
722        assert!(
723            record.has_repair_opportunity(),
724            "synced peer with >= 1 cycle should have repair opportunity"
725        );
726    }
727
728    #[test]
729    fn peer_sync_record_no_sync_many_cycles() {
730        let record = PeerSyncRecord {
731            last_sync: None,
732            cycles_since_sync: 10,
733        };
734        assert!(
735            !record.has_repair_opportunity(),
736            "never-synced peer has no repair opportunity regardless of cycle count"
737        );
738    }
739
740    // -- RepairProofs --------------------------------------------------------
741
742    #[test]
743    fn repair_proofs_record_sent_hint_for_close_peer() {
744        const HINT_EPOCH: u64 = 7;
745        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
746
747        let key = [0xA1; 32];
748        let peer = peer_id_from_byte(1);
749        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
750        let mut proofs = RepairProofs::new();
751
752        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
753
754        assert!(
755            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
756            "sent hint should make key auditable for that peer"
757        );
758    }
759
760    #[test]
761    fn repair_proofs_reject_peer_outside_current_close_group() {
762        const HINT_EPOCH: u64 = 7;
763        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
764
765        let key = [0xA2; 32];
766        let peer = peer_id_from_byte(1);
767        let close_peers = HashSet::from([peer_id_from_byte(2), peer_id_from_byte(3)]);
768        let mut proofs = RepairProofs::new();
769
770        assert!(!proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
771
772        assert!(
773            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
774            "peers outside current close group must not get repair proof"
775        );
776    }
777
778    #[test]
779    fn repair_proofs_require_later_epoch() {
780        const HINT_EPOCH: u64 = 7;
781        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
782
783        let key = [0xA3; 32];
784        let peer = peer_id_from_byte(1);
785        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
786        let mut proofs = RepairProofs::new();
787
788        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
789
790        assert!(
791            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, HINT_EPOCH),
792            "same-cycle proof should not be audit-eligible"
793        );
794        assert!(
795            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
796            "proof should mature after a later local sync-cycle epoch"
797        );
798    }
799
800    #[test]
801    fn repair_proofs_repeated_hint_does_not_reset_maturity() {
802        const HINT_EPOCH: u64 = 7;
803        const REPEATED_HINT_EPOCH: u64 = HINT_EPOCH + 1;
804
805        let key = [0xA5; 32];
806        let peer = peer_id_from_byte(1);
807        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
808        let mut proofs = RepairProofs::new();
809
810        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
811        assert!(
812            !proofs.record_replica_hint_sent(peer, key, &close_peers, REPEATED_HINT_EPOCH),
813            "duplicate hint in the same close group should keep existing proof"
814        );
815        assert!(
816            proofs.has_mature_replica_hint(&peer, &key, &close_peers, REPEATED_HINT_EPOCH),
817            "duplicate hint must not reset an already mature proof"
818        );
819    }
820
821    #[test]
822    fn repair_proofs_retain_stable_peers_on_close_group_change() {
823        const HINT_EPOCH: u64 = 7;
824        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
825
826        let key = [0xA7; 32];
827        let stable_peer = peer_id_from_byte(1);
828        let departing_peer = peer_id_from_byte(2);
829        let retained_peer = peer_id_from_byte(3);
830        let new_peer = peer_id_from_byte(4);
831        let old_group = HashSet::from([stable_peer, departing_peer, retained_peer]);
832        let changed_group = HashSet::from([stable_peer, retained_peer, new_peer]);
833        let mut proofs = RepairProofs::new();
834
835        assert!(proofs.record_replica_hint_sent(stable_peer, key, &old_group, HINT_EPOCH));
836        assert!(proofs.record_replica_hint_sent(departing_peer, key, &old_group, HINT_EPOCH));
837
838        assert!(
839            proofs.has_mature_replica_hint(&stable_peer, &key, &changed_group, CURRENT_EPOCH),
840            "stable peers should keep mature repair proofs across unrelated close-group churn"
841        );
842        assert!(
843            !proofs.has_mature_replica_hint(&departing_peer, &key, &changed_group, CURRENT_EPOCH),
844            "peers that left the close group should lose repair proofs"
845        );
846        assert!(
847            !proofs.has_mature_replica_hint(&new_peer, &key, &changed_group, CURRENT_EPOCH),
848            "new close-group peers need their own repair hint before auditing"
849        );
850    }
851
852    #[test]
853    fn repair_proofs_evicted_peer_reentry_requires_fresh_hint() {
854        const FIRST_HINT_EPOCH: u64 = 7;
855        const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1;
856        const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1;
857
858        let key = [0xA3; 32];
859        let returning_peer = peer_id_from_byte(1);
860        let new_peer = peer_id_from_byte(4);
861        let old_group = HashSet::from([returning_peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
862        let changed_group = HashSet::from([new_peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
863        let mut proofs = RepairProofs::new();
864
865        assert!(proofs.record_replica_hint_sent(returning_peer, key, &old_group, FIRST_HINT_EPOCH,));
866
867        assert!(
868            !proofs.has_mature_replica_hint(&new_peer, &key, &changed_group, SECOND_HINT_EPOCH),
869            "new close-group peer should not inherit another peer's repair proof"
870        );
871        assert!(
872            !proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH),
873            "a peer that re-enters must receive a fresh repair hint"
874        );
875
876        assert!(proofs.record_replica_hint_sent(
877            returning_peer,
878            key,
879            &old_group,
880            SECOND_HINT_EPOCH,
881        ));
882        assert!(
883            proofs.has_mature_replica_hint(&returning_peer, &key, &old_group, CURRENT_EPOCH),
884            "fresh repair hint after re-entry should be eligible once mature"
885        );
886    }
887
888    #[test]
889    fn repair_proofs_remove_peer_requires_fresh_hint_after_reentry() {
890        const FIRST_HINT_EPOCH: u64 = 7;
891        const SECOND_HINT_EPOCH: u64 = FIRST_HINT_EPOCH + 1;
892        const CURRENT_EPOCH: u64 = SECOND_HINT_EPOCH + 1;
893
894        let key = [0xA6; 32];
895        let peer = peer_id_from_byte(1);
896        let close_peers = HashSet::from([peer, peer_id_from_byte(2), peer_id_from_byte(3)]);
897        let mut proofs = RepairProofs::new();
898
899        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, FIRST_HINT_EPOCH));
900        proofs.remove_peer(&peer);
901
902        assert!(
903            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
904            "routing-table removal should clear proof even if peer re-enters same close group"
905        );
906
907        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, SECOND_HINT_EPOCH));
908        assert!(
909            proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
910            "fresh hint after re-entry should become eligible after a later epoch"
911        );
912    }
913
914    #[test]
915    fn repair_proofs_remove_key_clears_all_peer_entries() {
916        const HINT_EPOCH: u64 = 7;
917        const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
918
919        let key = [0xA4; 32];
920        let peer = peer_id_from_byte(1);
921        let close_peers = HashSet::from([peer]);
922        let mut proofs = RepairProofs::new();
923
924        assert!(proofs.record_replica_hint_sent(peer, key, &close_peers, HINT_EPOCH));
925        proofs.remove_key(&key);
926
927        assert!(
928            !proofs.has_mature_replica_hint(&peer, &key, &close_peers, CURRENT_EPOCH),
929            "deleted local key should not retain repair proof entries"
930        );
931    }
932
933    // -- NeighborSyncState -------------------------------------------------
934
935    #[test]
936    fn neighbor_sync_empty_cycle_is_immediately_complete() {
937        let state = NeighborSyncState::new_cycle(vec![]);
938        assert!(
939            state.is_cycle_complete(),
940            "empty neighbor list means cycle is complete"
941        );
942    }
943
944    #[test]
945    fn neighbor_sync_new_cycle_not_complete() {
946        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
947        let state = NeighborSyncState::new_cycle(peers);
948        assert!(
949            !state.is_cycle_complete(),
950            "fresh cycle with peers should not be complete"
951        );
952    }
953
954    #[test]
955    fn neighbor_sync_cycle_completes_when_cursor_reaches_end() {
956        let peers = vec![
957            peer_id_from_byte(1),
958            peer_id_from_byte(2),
959            peer_id_from_byte(3),
960        ];
961        let mut state = NeighborSyncState::new_cycle(peers);
962
963        // Simulate stepping through the cycle.
964        state.cursor = 2;
965        assert!(
966            !state.is_cycle_complete(),
967            "cursor at len-1 should not be complete"
968        );
969
970        state.cursor = 3;
971        assert!(
972            state.is_cycle_complete(),
973            "cursor at len should be complete"
974        );
975    }
976
977    #[test]
978    fn neighbor_sync_cursor_past_end_is_still_complete() {
979        let peers = vec![peer_id_from_byte(1)];
980        let mut state = NeighborSyncState::new_cycle(peers);
981        state.cursor = 5;
982        assert!(
983            state.is_cycle_complete(),
984            "cursor past end should still report complete"
985        );
986    }
987
988    #[test]
989    fn bootstrap_claim_history_prevents_second_grace_window() {
990        let peer = peer_id_from_byte(9);
991        let mut state = NeighborSyncState::new_cycle(vec![peer]);
992        let first_seen = Instant::now();
993        let grace = Duration::from_secs(60);
994
995        assert_eq!(
996            state.observe_bootstrap_claim(peer, first_seen, grace),
997            BootstrapClaimObservation::WithinGrace { first_seen }
998        );
999        assert!(state.clear_active_bootstrap_claim(&peer));
1000        assert!(!state.bootstrap_claims.contains_key(&peer));
1001        assert!(state.bootstrap_claim_history.contains_key(&peer));
1002
1003        assert_eq!(
1004            state.observe_bootstrap_claim(peer, first_seen + Duration::from_secs(1), grace),
1005            BootstrapClaimObservation::Repeated { first_seen }
1006        );
1007        assert!(
1008            !state.bootstrap_claims.contains_key(&peer),
1009            "repeated claims must not recreate an active grace window"
1010        );
1011        assert_eq!(
1012            state.observe_bootstrap_claim(peer, first_seen + Duration::from_secs(2), grace),
1013            BootstrapClaimObservation::Repeated { first_seen }
1014        );
1015    }
1016
1017    #[test]
1018    fn bootstrap_claim_active_window_reports_past_grace() {
1019        let peer = peer_id_from_byte(10);
1020        let mut state = NeighborSyncState::new_cycle(vec![peer]);
1021        let first_seen = Instant::now();
1022        let grace = Duration::from_secs(60);
1023
1024        let _ = state.observe_bootstrap_claim(peer, first_seen, grace);
1025
1026        assert_eq!(
1027            state.observe_bootstrap_claim(peer, first_seen + grace + Duration::from_secs(1), grace),
1028            BootstrapClaimObservation::PastGrace { first_seen }
1029        );
1030    }
1031
1032    // -- BootstrapState ----------------------------------------------------
1033
1034    #[test]
1035    fn bootstrap_state_initial_not_drained() {
1036        // A freshly created state must NOT report drained — the bootstrap
1037        // sync task has not started yet (Invariant 19 race prevention).
1038        let state = BootstrapState::new();
1039        assert!(
1040            !state.is_drained(),
1041            "initial state must not be drained before bootstrap begins"
1042        );
1043    }
1044
1045    #[test]
1046    fn bootstrap_state_pending_requests_block_drain() {
1047        let mut state = BootstrapState::new();
1048        state.pending_peer_requests = 3;
1049        assert!(
1050            !state.is_drained(),
1051            "pending peer requests should block drain"
1052        );
1053    }
1054
1055    #[test]
1056    fn bootstrap_state_pending_keys_block_drain() {
1057        let mut state = BootstrapState::new();
1058        state.pending_keys.insert([42u8; 32]);
1059        assert!(!state.is_drained(), "pending keys should block drain");
1060    }
1061
1062    #[test]
1063    fn bootstrap_state_explicit_drained_overrides() {
1064        let mut state = BootstrapState::new();
1065        state.pending_peer_requests = 5;
1066        state.pending_keys.insert([99u8; 32]);
1067        state.drained = true;
1068        assert!(
1069            state.is_drained(),
1070            "explicit drained flag should override pending counts"
1071        );
1072    }
1073
1074    #[test]
1075    fn bootstrap_state_requires_explicit_drain() {
1076        let mut state = BootstrapState::new();
1077        state.pending_peer_requests = 2;
1078        state.pending_keys.insert([1u8; 32]);
1079
1080        // Simulate completing work — but without explicit drain flag.
1081        state.pending_peer_requests = 0;
1082        state.pending_keys.clear();
1083
1084        assert!(
1085            !state.is_drained(),
1086            "clearing counters alone must not drain — requires check_bootstrap_drained"
1087        );
1088
1089        // Explicit drain (set by check_bootstrap_drained or mark_bootstrap_drained).
1090        state.drained = true;
1091        assert!(state.is_drained(), "explicit flag should drain");
1092    }
1093
1094    #[test]
1095    fn bootstrap_state_default_matches_new() {
1096        let from_new = BootstrapState::new();
1097        let from_default = BootstrapState::default();
1098
1099        assert_eq!(from_new.drained, from_default.drained);
1100        assert_eq!(
1101            from_new.pending_peer_requests,
1102            from_default.pending_peer_requests
1103        );
1104        assert_eq!(from_new.pending_keys, from_default.pending_keys);
1105    }
1106
1107    // -- Scenario tests -------------------------------------------------------
1108
1109    /// #13: Bootstrap not drained while `pending_keys` overlap with the
1110    /// pipeline. Keys must be removed from `pending_keys` for drain to occur.
1111    #[test]
1112    fn bootstrap_drain_requires_empty_pending_keys() {
1113        let key_a: XorName = [0xA0; 32];
1114        let key_b: XorName = [0xB0; 32];
1115        let key_c: XorName = [0xC0; 32];
1116
1117        let mut state = BootstrapState::new();
1118        state.pending_peer_requests = 0; // requests already done
1119        state.pending_keys = std::iter::once(key_a)
1120            .chain(std::iter::once(key_b))
1121            .chain(std::iter::once(key_c))
1122            .collect();
1123
1124        assert!(
1125            !state.is_drained(),
1126            "should NOT be drained while pending_keys still has entries"
1127        );
1128
1129        // Simulate pipeline processing — remove one key at a time.
1130        state.pending_keys.remove(&key_a);
1131        assert!(!state.is_drained(), "still not drained with 2 pending keys");
1132
1133        state.pending_keys.remove(&key_b);
1134        assert!(!state.is_drained(), "still not drained with 1 pending key");
1135
1136        state.pending_keys.remove(&key_c);
1137        assert!(
1138            !state.is_drained(),
1139            "removing all keys is necessary but not sufficient — needs explicit drain"
1140        );
1141
1142        // Simulate check_bootstrap_drained setting the flag.
1143        state.drained = true;
1144        assert!(state.is_drained(), "explicit drain flag should finalize");
1145    }
1146
1147    /// Verify that the FSM terminal states are distinguishable and document
1148    /// which variants are logically terminal (no outgoing transitions).
1149    #[test]
1150    fn verification_state_terminal_variants() {
1151        let terminal_states = [
1152            VerificationState::QuorumAbandoned,
1153            VerificationState::FetchAbandoned,
1154            VerificationState::Stored,
1155            VerificationState::Idle,
1156        ];
1157
1158        // All terminal states must be distinct from each other.
1159        for (i, a) in terminal_states.iter().enumerate() {
1160            for (j, b) in terminal_states.iter().enumerate() {
1161                if i != j {
1162                    assert_ne!(
1163                        a, b,
1164                        "terminal states at indices {i} and {j} must be distinct"
1165                    );
1166                }
1167            }
1168        }
1169
1170        // Terminal states must be distinct from all non-terminal states.
1171        let non_terminal_states = [
1172            VerificationState::OfferReceived,
1173            VerificationState::PendingVerify,
1174            VerificationState::QuorumVerified,
1175            VerificationState::PaidListVerified,
1176            VerificationState::QueuedForFetch,
1177            VerificationState::Fetching,
1178            VerificationState::FetchRetryable,
1179            VerificationState::QuorumFailed,
1180            VerificationState::QuorumInconclusive,
1181        ];
1182
1183        for terminal in &terminal_states {
1184            for non_terminal in &non_terminal_states {
1185                assert_ne!(
1186                    terminal, non_terminal,
1187                    "terminal state {terminal:?} must not equal non-terminal state {non_terminal:?}"
1188                );
1189            }
1190        }
1191    }
1192
1193    /// `has_repair_opportunity` requires BOTH a previous sync AND at least
1194    /// one subsequent cycle.
1195    #[test]
1196    fn repair_opportunity_requires_both_sync_and_cycle() {
1197        // last_sync = Some, cycles_since_sync = 0 → false (synced but no cycle yet)
1198        let synced_no_cycle = PeerSyncRecord {
1199            last_sync: Some(
1200                Instant::now()
1201                    .checked_sub(std::time::Duration::from_secs(2))
1202                    .unwrap_or_else(Instant::now),
1203            ),
1204            cycles_since_sync: 0,
1205        };
1206        assert!(
1207            !synced_no_cycle.has_repair_opportunity(),
1208            "synced with zero subsequent cycles should NOT have repair opportunity"
1209        );
1210
1211        // last_sync = None, cycles_since_sync = 5 → false (never synced)
1212        let never_synced = PeerSyncRecord {
1213            last_sync: None,
1214            cycles_since_sync: 5,
1215        };
1216        assert!(
1217            !never_synced.has_repair_opportunity(),
1218            "never-synced peer should NOT have repair opportunity regardless of cycles"
1219        );
1220
1221        // last_sync = Some, cycles_since_sync = 1 → true
1222        let ready = PeerSyncRecord {
1223            last_sync: Some(
1224                Instant::now()
1225                    .checked_sub(std::time::Duration::from_secs(5))
1226                    .unwrap_or_else(Instant::now),
1227            ),
1228            cycles_since_sync: 1,
1229        };
1230        assert!(
1231            ready.has_repair_opportunity(),
1232            "synced peer with >= 1 cycle SHOULD have repair opportunity"
1233        );
1234    }
1235}