Skip to main content

ant_node/replication/
neighbor_sync.rs

1//! Neighbor replication sync (Section 6.2).
2//!
3//! Round-robin cycle management: snapshot close neighbors, iterate through
4//! them in batches of `NEIGHBOR_SYNC_PEER_COUNT`, exchanging hint sets.
5
6use std::collections::HashSet;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::logging::{debug, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14
15use crate::ant_protocol::XorName;
16use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
17use crate::replication::paid_list::PaidList;
18use crate::replication::protocol::{
19    NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
20};
21use crate::replication::types::NeighborSyncState;
22use crate::storage::LmdbStorage;
23
24/// Replica hint sent to a peer, with the close-group snapshot used to decide
25/// that hint.
26#[derive(Debug)]
27pub(crate) struct SentReplicaHint {
28    /// Key included in the replica hint set.
29    pub(crate) key: XorName,
30    /// Self-inclusive close group observed during hint construction.
31    pub(crate) close_peers: HashSet<PeerId>,
32}
33
34/// Result of an outbound neighbor-sync exchange.
35#[derive(Debug)]
36pub(crate) struct NeighborSyncOutcome {
37    /// The peer's sync response.
38    pub(crate) response: NeighborSyncResponse,
39    /// Replica hints sent to the peer in our request.
40    pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
41}
42
43/// Build replica hints for a specific peer.
44///
45/// Returns keys that we believe the peer should hold (peer is among the
46/// `CLOSE_GROUP_SIZE` nearest to `K` in our `SelfInclusiveRT`).
47///
48/// This intentionally scans all locally stored records, not only records for
49/// which this node is still responsible. Out-of-range records retained by
50/// prune hysteresis therefore continue to repair their new close group before
51/// this node is allowed to delete them.
52pub async fn build_replica_hints_for_peer(
53    peer: &PeerId,
54    storage: &Arc<LmdbStorage>,
55    p2p_node: &Arc<P2PNode>,
56    close_group_size: usize,
57) -> Vec<XorName> {
58    build_replica_hints_for_peer_with_close_groups(peer, storage, p2p_node, close_group_size)
59        .await
60        .into_iter()
61        .map(|hint| hint.key)
62        .collect()
63}
64
65pub(crate) async fn build_replica_hints_for_peer_with_close_groups(
66    peer: &PeerId,
67    storage: &Arc<LmdbStorage>,
68    p2p_node: &Arc<P2PNode>,
69    close_group_size: usize,
70) -> Vec<SentReplicaHint> {
71    let all_keys = match storage.all_keys().await {
72        Ok(keys) => keys,
73        Err(e) => {
74            warn!("Failed to read stored keys for hint construction: {e}");
75            return Vec::new();
76        }
77    };
78
79    let dht = p2p_node.dht_manager();
80    let mut hints = Vec::new();
81    for key in all_keys {
82        let closest = dht
83            .find_closest_nodes_local_with_self(&key, close_group_size)
84            .await;
85        let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
86        if close_peers.contains(peer) {
87            hints.push(SentReplicaHint { key, close_peers });
88        }
89    }
90    hints
91}
92
93/// Build paid hints for a specific peer.
94///
95/// Returns keys from our `PaidForList` that we believe the peer should
96/// track (peer is among `PAID_LIST_CLOSE_GROUP_SIZE` nearest to `K`).
97pub async fn build_paid_hints_for_peer(
98    peer: &PeerId,
99    paid_list: &Arc<PaidList>,
100    p2p_node: &Arc<P2PNode>,
101    paid_list_close_group_size: usize,
102) -> Vec<XorName> {
103    let all_paid_keys = match paid_list.all_keys() {
104        Ok(keys) => keys,
105        Err(e) => {
106            warn!("Failed to read PaidForList for hint construction: {e}");
107            return Vec::new();
108        }
109    };
110
111    let dht = p2p_node.dht_manager();
112    let mut hints = Vec::new();
113    for key in all_paid_keys {
114        let closest = dht
115            .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
116            .await;
117        if closest.iter().any(|n| n.peer_id == *peer) {
118            hints.push(key);
119        }
120    }
121    hints
122}
123
124/// Take a fresh snapshot of close neighbors for a new round-robin cycle.
125///
126/// Rule 1: Compute `CloseNeighbors(self)` as `NEIGHBOR_SYNC_SCOPE` nearest
127/// peers.
128pub async fn snapshot_close_neighbors(
129    p2p_node: &Arc<P2PNode>,
130    self_id: &PeerId,
131    scope: usize,
132) -> Vec<PeerId> {
133    let self_xor: XorName = *self_id.as_bytes();
134    let closest = p2p_node
135        .dht_manager()
136        .find_closest_nodes_local(&self_xor, scope)
137        .await;
138    closest.iter().map(|n| n.peer_id).collect()
139}
140
141/// Select the next batch of peers for sync from the current cycle.
142///
143/// Rules 2-3: Scan forward from cursor, skip peers still under cooldown,
144/// fill up to `peer_count` slots.
145pub fn select_sync_batch(
146    state: &mut NeighborSyncState,
147    peer_count: usize,
148    cooldown: Duration,
149) -> Vec<PeerId> {
150    let mut batch = Vec::new();
151    let now = Instant::now();
152
153    while batch.len() < peer_count && state.cursor < state.order.len() {
154        let peer = state.order[state.cursor];
155
156        // Check cooldown (Rule 2a): if the peer was synced recently, remove
157        // from the snapshot and continue without advancing the cursor (the
158        // next element slides into the current cursor position).
159        if let Some(last_sync) = state.last_sync_times.get(&peer) {
160            if now.duration_since(*last_sync) < cooldown {
161                state.order.remove(state.cursor);
162                continue;
163            }
164        }
165
166        batch.push(peer);
167        state.cursor += 1;
168    }
169
170    batch
171}
172
173/// Execute a sync session with a single peer.
174///
175/// Returns the response hints if sync succeeded, or `None` if the peer
176/// was unreachable or the response could not be decoded.
177pub async fn sync_with_peer(
178    peer: &PeerId,
179    p2p_node: &Arc<P2PNode>,
180    storage: &Arc<LmdbStorage>,
181    paid_list: &Arc<PaidList>,
182    config: &ReplicationConfig,
183    is_bootstrapping: bool,
184) -> Option<NeighborSyncResponse> {
185    sync_with_peer_with_outcome(peer, p2p_node, storage, paid_list, config, is_bootstrapping)
186        .await
187        .map(|outcome| outcome.response)
188}
189
190pub(crate) async fn sync_with_peer_with_outcome(
191    peer: &PeerId,
192    p2p_node: &Arc<P2PNode>,
193    storage: &Arc<LmdbStorage>,
194    paid_list: &Arc<PaidList>,
195    config: &ReplicationConfig,
196    is_bootstrapping: bool,
197) -> Option<NeighborSyncOutcome> {
198    // Build peer-targeted hint sets (Rule 7).
199    let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
200        peer,
201        storage,
202        p2p_node,
203        config.close_group_size,
204    )
205    .await;
206    let replica_hints = sent_replica_hints
207        .iter()
208        .map(|hint| hint.key)
209        .collect::<Vec<_>>();
210    let paid_hints =
211        build_paid_hints_for_peer(peer, paid_list, p2p_node, config.paid_list_close_group_size)
212            .await;
213
214    let request = NeighborSyncRequest {
215        replica_hints,
216        paid_hints,
217        bootstrapping: is_bootstrapping,
218    };
219    let request_id = rand::thread_rng().gen::<u64>();
220    let msg = ReplicationMessage {
221        request_id,
222        body: ReplicationMessageBody::NeighborSyncRequest(request),
223    };
224
225    let encoded = match msg.encode() {
226        Ok(data) => data,
227        Err(e) => {
228            warn!("Failed to encode sync request for {peer}: {e}");
229            return None;
230        }
231    };
232
233    let response = match p2p_node
234        .send_request(
235            peer,
236            REPLICATION_PROTOCOL_ID,
237            encoded,
238            config.verification_request_timeout,
239        )
240        .await
241    {
242        Ok(resp) => resp,
243        Err(e) => {
244            debug!("Sync with {peer} failed: {e}");
245            return None;
246        }
247    };
248
249    match ReplicationMessage::decode(&response.data) {
250        Ok(decoded) => {
251            if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
252                Some(NeighborSyncOutcome {
253                    response: resp,
254                    sent_replica_hints,
255                })
256            } else {
257                warn!("Unexpected response type from {peer} during sync");
258                None
259            }
260        }
261        Err(e) => {
262            warn!("Failed to decode sync response from {peer}: {e}");
263            None
264        }
265    }
266}
267
268/// Handle a failed sync attempt: remove peer from snapshot and try to fill
269/// the vacated slot.
270///
271/// Rule 3: Remove unreachable peer from `NeighborSyncOrder`, attempt to fill
272/// by resuming scan from where rule 2 left off. Applies the same cooldown
273/// filtering as [`select_sync_batch`] to avoid selecting a peer that was
274/// recently synced.
275pub fn handle_sync_failure(
276    state: &mut NeighborSyncState,
277    failed_peer: &PeerId,
278    cooldown: Duration,
279) -> Option<PeerId> {
280    // Find and remove the failed peer from the ordering.
281    if let Some(pos) = state.order.iter().position(|p| p == failed_peer) {
282        state.order.remove(pos);
283        // Adjust cursor if removal was before the current cursor position.
284        if pos < state.cursor {
285            state.cursor = state.cursor.saturating_sub(1);
286        }
287    }
288
289    // Try to fill the vacated slot, applying cooldown filtering (same as
290    // select_sync_batch Rule 2a).
291    let now = Instant::now();
292    while state.cursor < state.order.len() {
293        let candidate = state.order[state.cursor];
294
295        if let Some(last_sync) = state.last_sync_times.get(&candidate) {
296            if now.duration_since(*last_sync) < cooldown {
297                state.order.remove(state.cursor);
298                continue;
299            }
300        }
301
302        state.cursor += 1;
303        return Some(candidate);
304    }
305
306    None
307}
308
309/// Record a successful sync with a peer.
310pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
311    state.last_sync_times.insert(*peer, Instant::now());
312}
313
314/// Handle incoming sync request from a peer.
315///
316/// Rules 4-6: Validate peer is in `LocalRT`. If yes, bidirectional sync.
317/// If not, outbound-only (send hints but don't accept inbound).
318///
319/// Returns `(response, sender_in_routing_table)` where the second element
320/// indicates whether the caller should process the sender's inbound hints.
321pub async fn handle_sync_request(
322    sender: &PeerId,
323    request: &NeighborSyncRequest,
324    p2p_node: &Arc<P2PNode>,
325    storage: &Arc<LmdbStorage>,
326    paid_list: &Arc<PaidList>,
327    config: &ReplicationConfig,
328    is_bootstrapping: bool,
329) -> (NeighborSyncResponse, bool) {
330    let (response, _, sender_in_rt) = handle_sync_request_with_proofs(
331        sender,
332        request,
333        p2p_node,
334        storage,
335        paid_list,
336        config,
337        is_bootstrapping,
338    )
339    .await;
340    (response, sender_in_rt)
341}
342
343pub(crate) async fn handle_sync_request_with_proofs(
344    sender: &PeerId,
345    _request: &NeighborSyncRequest,
346    p2p_node: &Arc<P2PNode>,
347    storage: &Arc<LmdbStorage>,
348    paid_list: &Arc<PaidList>,
349    config: &ReplicationConfig,
350    is_bootstrapping: bool,
351) -> (NeighborSyncResponse, Vec<SentReplicaHint>, bool) {
352    let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
353
354    // Build outbound hints (always sent, even to non-RT peers).
355    let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
356        sender,
357        storage,
358        p2p_node,
359        config.close_group_size,
360    )
361    .await;
362    let replica_hints = sent_replica_hints
363        .iter()
364        .map(|hint| hint.key)
365        .collect::<Vec<_>>();
366    let paid_hints = build_paid_hints_for_peer(
367        sender,
368        paid_list,
369        p2p_node,
370        config.paid_list_close_group_size,
371    )
372    .await;
373
374    let response = NeighborSyncResponse {
375        replica_hints,
376        paid_hints,
377        bootstrapping: is_bootstrapping,
378        rejected_keys: Vec::new(),
379    };
380
381    // Rule 4-6: accept inbound hints only if sender is in LocalRT.
382    (response, sent_replica_hints, sender_in_rt)
383}
384
385// ---------------------------------------------------------------------------
386// Tests
387// ---------------------------------------------------------------------------
388
389#[cfg(test)]
390#[allow(clippy::unwrap_used, clippy::expect_used)]
391mod tests {
392    use super::*;
393    use crate::replication::types::PeerSyncRecord;
394    use std::collections::HashMap;
395
396    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
397    fn peer_id_from_byte(b: u8) -> PeerId {
398        let mut bytes = [0u8; 32];
399        bytes[0] = b;
400        PeerId::from_bytes(bytes)
401    }
402
403    // -- select_sync_batch ---------------------------------------------------
404
405    #[test]
406    fn select_sync_batch_returns_up_to_peer_count() {
407        let peers = vec![
408            peer_id_from_byte(1),
409            peer_id_from_byte(2),
410            peer_id_from_byte(3),
411            peer_id_from_byte(4),
412            peer_id_from_byte(5),
413        ];
414        let mut state = NeighborSyncState::new_cycle(peers);
415        let batch_size = 3;
416
417        let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
418
419        assert_eq!(batch.len(), batch_size);
420        assert_eq!(batch[0], peer_id_from_byte(1));
421        assert_eq!(batch[1], peer_id_from_byte(2));
422        assert_eq!(batch[2], peer_id_from_byte(3));
423        assert_eq!(state.cursor, 3);
424    }
425
426    #[test]
427    fn select_sync_batch_skips_cooldown_peers() {
428        let peers = vec![
429            peer_id_from_byte(1),
430            peer_id_from_byte(2),
431            peer_id_from_byte(3),
432            peer_id_from_byte(4),
433        ];
434        let mut state = NeighborSyncState::new_cycle(peers);
435
436        // Mark peer 1 and peer 3 as recently synced.
437        state
438            .last_sync_times
439            .insert(peer_id_from_byte(1), Instant::now());
440        state
441            .last_sync_times
442            .insert(peer_id_from_byte(3), Instant::now());
443
444        let cooldown = Duration::from_secs(3600); // 1 hour
445        let batch = select_sync_batch(&mut state, 2, cooldown);
446
447        // Peer 1 and peer 3 should be skipped (removed from order).
448        assert_eq!(batch.len(), 2);
449        assert_eq!(batch[0], peer_id_from_byte(2));
450        assert_eq!(batch[1], peer_id_from_byte(4));
451
452        // Cooldown peers should have been removed from the order.
453        assert!(!state.order.contains(&peer_id_from_byte(1)));
454        assert!(!state.order.contains(&peer_id_from_byte(3)));
455    }
456
457    #[test]
458    fn select_sync_batch_expired_cooldown_not_skipped() {
459        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
460        let mut state = NeighborSyncState::new_cycle(peers);
461
462        // Mark peer 1 as synced a long time ago (simulate expired cooldown).
463        // Use a small subtraction (2s) and a smaller cooldown (1s) to avoid
464        // `checked_sub` returning `None` on freshly-booted CI runners where
465        // `Instant::now()` (system uptime) may be very small.
466        state.last_sync_times.insert(
467            peer_id_from_byte(1),
468            Instant::now()
469                .checked_sub(Duration::from_secs(2))
470                .unwrap_or_else(Instant::now),
471        );
472
473        let cooldown = Duration::from_secs(1);
474        let batch = select_sync_batch(&mut state, 2, cooldown);
475
476        // Peer 1's cooldown expired so it should be included.
477        assert_eq!(batch.len(), 2);
478        assert_eq!(batch[0], peer_id_from_byte(1));
479        assert_eq!(batch[1], peer_id_from_byte(2));
480    }
481
482    #[test]
483    fn select_sync_batch_empty_order() {
484        let mut state = NeighborSyncState::new_cycle(vec![]);
485
486        let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
487
488        assert!(batch.is_empty());
489        assert_eq!(state.cursor, 0);
490    }
491
492    #[test]
493    fn select_sync_batch_all_on_cooldown() {
494        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
495        let mut state = NeighborSyncState::new_cycle(peers);
496
497        state
498            .last_sync_times
499            .insert(peer_id_from_byte(1), Instant::now());
500        state
501            .last_sync_times
502            .insert(peer_id_from_byte(2), Instant::now());
503
504        let cooldown = Duration::from_secs(3600);
505        let batch = select_sync_batch(&mut state, 4, cooldown);
506
507        assert!(batch.is_empty());
508        assert!(state.order.is_empty());
509    }
510
511    // -- handle_sync_failure -------------------------------------------------
512
513    #[test]
514    fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
515        let peers = vec![
516            peer_id_from_byte(1),
517            peer_id_from_byte(2),
518            peer_id_from_byte(3),
519            peer_id_from_byte(4),
520        ];
521        let mut state = NeighborSyncState::new_cycle(peers);
522        // Simulate having already processed peers at indices 0 and 1.
523        state.cursor = 2;
524
525        // Peer 2 (index 1, before cursor) fails.
526        let replacement =
527            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
528
529        // Cursor should be adjusted down by 1 (was 2, now 1).
530        assert_eq!(state.cursor, 2); // was 2, removed at pos 1, adjusted to 1, then replacement advances to 2
531        assert!(!state.order.contains(&peer_id_from_byte(2)));
532
533        // Should get peer 4 as replacement (index 1 after removal = peer 3,
534        // but cursor was adjusted to 1 so peer 3 is at index 1; it returns
535        // the peer at the new cursor and advances).
536        assert!(replacement.is_some());
537    }
538
539    #[test]
540    fn handle_sync_failure_removes_peer_after_cursor() {
541        let peers = vec![
542            peer_id_from_byte(1),
543            peer_id_from_byte(2),
544            peer_id_from_byte(3),
545            peer_id_from_byte(4),
546        ];
547        let mut state = NeighborSyncState::new_cycle(peers);
548        state.cursor = 1;
549
550        // Peer 3 (index 2, after cursor) fails.
551        let replacement =
552            handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
553
554        // Cursor should stay at 1 (removal was after cursor).
555        assert_eq!(state.cursor, 2); // cursor was 1, replacement advances to 2
556        assert!(!state.order.contains(&peer_id_from_byte(3)));
557
558        // Replacement should be peer 2 (now at cursor position 1).
559        assert_eq!(replacement, Some(peer_id_from_byte(2)));
560    }
561
562    #[test]
563    fn handle_sync_failure_no_replacement_when_exhausted() {
564        let peers = vec![peer_id_from_byte(1)];
565        let mut state = NeighborSyncState::new_cycle(peers);
566        state.cursor = 1; // Already past the only peer.
567
568        let replacement =
569            handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
570
571        assert!(state.order.is_empty());
572        assert!(replacement.is_none());
573    }
574
575    #[test]
576    fn handle_sync_failure_unknown_peer_is_noop() {
577        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
578        let mut state = NeighborSyncState::new_cycle(peers);
579        state.cursor = 1;
580
581        let replacement =
582            handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
583
584        // Order should be unchanged.
585        assert_eq!(state.order.len(), 2);
586        // Still tries to fill from cursor.
587        assert_eq!(replacement, Some(peer_id_from_byte(2)));
588        assert_eq!(state.cursor, 2);
589    }
590
591    // -- record_successful_sync ----------------------------------------------
592
593    #[test]
594    fn record_successful_sync_updates_last_sync_time() {
595        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
596        let mut state = NeighborSyncState::new_cycle(peers);
597        let peer = peer_id_from_byte(1);
598
599        assert!(!state.last_sync_times.contains_key(&peer));
600
601        let before = Instant::now();
602        record_successful_sync(&mut state, &peer);
603        let after = Instant::now();
604
605        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
606        assert!(*ts >= before);
607        assert!(*ts <= after);
608    }
609
610    #[test]
611    fn record_successful_sync_overwrites_previous() {
612        let peers = vec![peer_id_from_byte(1)];
613        let mut state = NeighborSyncState::new_cycle(peers);
614        let peer = peer_id_from_byte(1);
615
616        // Record a sync at an old time. Use a small subtraction to avoid
617        // `checked_sub` returning `None` on freshly-booted CI runners.
618        let old_time = Instant::now()
619            .checked_sub(Duration::from_secs(2))
620            .unwrap_or_else(Instant::now);
621        state.last_sync_times.insert(peer, old_time);
622
623        record_successful_sync(&mut state, &peer);
624
625        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
626        assert!(*ts > old_time, "sync time should be updated");
627    }
628
629    // -- Section 18: Neighbor sync scenarios --------------------------------
630
631    #[test]
632    fn scenario_35_round_robin_with_cooldown_skip() {
633        // With >PEER_COUNT eligible peers, consecutive rounds scan forward
634        // from cursor, skip cooldown peers, sync next batch.
635        // Create 8 peers, mark peers 2,4 on cooldown.
636        // First batch of 4: peers 1,3,5,6 (2,4 skipped and removed).
637        // Second batch of 4: peers 7,8 (only 2 remain).
638        // Cycle should complete after second batch.
639        let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
640        let mut state = NeighborSyncState::new_cycle(peers);
641        let batch_size = 4;
642        let cooldown = Duration::from_secs(3600);
643
644        // Mark peers 2 and 4 as recently synced (on cooldown).
645        state
646            .last_sync_times
647            .insert(peer_id_from_byte(2), Instant::now());
648        state
649            .last_sync_times
650            .insert(peer_id_from_byte(4), Instant::now());
651
652        // First batch: scan from cursor 0. Peers 2 and 4 are removed,
653        // leaving [1,3,5,6,7,8]. We pick the first 4: [1,3,5,6].
654        let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
655        assert_eq!(batch1.len(), 4);
656        assert_eq!(batch1[0], peer_id_from_byte(1));
657        assert_eq!(batch1[1], peer_id_from_byte(3));
658        assert_eq!(batch1[2], peer_id_from_byte(5));
659        assert_eq!(batch1[3], peer_id_from_byte(6));
660
661        // Cooldown peers should have been removed from the order.
662        assert!(!state.order.contains(&peer_id_from_byte(2)));
663        assert!(!state.order.contains(&peer_id_from_byte(4)));
664
665        // Second batch: only peers 7,8 remain after cursor.
666        let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
667        assert_eq!(batch2.len(), 2);
668        assert_eq!(batch2[0], peer_id_from_byte(7));
669        assert_eq!(batch2[1], peer_id_from_byte(8));
670
671        // Cycle should be complete after second batch.
672        assert!(state.is_cycle_complete());
673    }
674
675    #[test]
676    fn cycle_complete_when_cursor_past_order() {
677        // is_cycle_complete() returns true when cursor >= order.len().
678        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
679        let mut state = NeighborSyncState::new_cycle(peers);
680
681        // Not complete at the start.
682        assert!(!state.is_cycle_complete());
683
684        // Advance cursor to exactly order.len().
685        state.cursor = 3;
686        assert!(state.is_cycle_complete());
687
688        // Also complete when cursor exceeds order.len().
689        state.cursor = 10;
690        assert!(state.is_cycle_complete());
691
692        // Edge case: order is emptied (peers removed) with cursor at 0.
693        state.order.clear();
694        state.cursor = 0;
695        assert!(state.is_cycle_complete());
696    }
697
698    /// Scenario 36: Post-cycle responsibility pruning with time-based
699    /// hysteresis.
700    ///
701    /// When a full round-robin cycle completes, node runs one prune pass
702    /// over BOTH stored records and `PaidForList` entries using current
703    /// `SelfInclusiveRT`. Out-of-range items have timestamps recorded but
704    /// are deleted only after `PRUNE_HYSTERESIS_DURATION`. In-range items
705    /// have their timestamps cleared.
706    ///
707    /// Full `run_prune_pass` requires a live `P2PNode`. This test verifies
708    /// the deterministic trigger condition (cycle completion) and the
709    /// combined record + paid-list pruning contract:
710    ///   (1) Cycle completes -> prune pass should run.
711    ///   (2) Both `RecordOutOfRangeFirstSeen` and `PaidOutOfRangeFirstSeen`
712    ///       are tracked independently in the same pass.
713    ///   (3) Keys within hysteresis window are retained.
714    #[test]
715    fn scenario_36_post_cycle_triggers_combined_prune_pass() {
716        let config = ReplicationConfig::default();
717
718        // Step 1: Run a full cycle to completion.
719        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
720        let mut state = NeighborSyncState::new_cycle(peers);
721        let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
722        assert!(
723            state.is_cycle_complete(),
724            "cycle must be complete before prune pass triggers"
725        );
726
727        // Step 2: Verify prune hysteresis parameters are configured.
728        assert!(
729            !config.prune_hysteresis_duration.is_zero(),
730            "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
731        );
732
733        // Step 3: Simulate the prune-pass timestamp tracking for BOTH
734        // record and paid-list entries (the two independent timestamp
735        // families that Section 11 requires in a single pass).
736        //
737        // Record timestamps and paid timestamps are independent — clearing
738        // one must not affect the other (tested in scenario_52). Here we
739        // verify the combined trigger: cycle completion -> both kinds of
740        // timestamps are eligible for evaluation.
741        let record_key: [u8; 32] = [0x36; 32];
742        let paid_key: [u8; 32] = [0x37; 32];
743
744        // Simulate: record_key goes out of range, paid_key goes out of range.
745        let record_first_seen = Instant::now();
746        let paid_first_seen = Instant::now();
747
748        // Both timestamps are recent — well within hysteresis window.
749        let record_elapsed = record_first_seen.elapsed();
750        let paid_elapsed = paid_first_seen.elapsed();
751        assert!(
752            record_elapsed < config.prune_hysteresis_duration,
753            "record key should be retained within hysteresis window"
754        );
755        assert!(
756            paid_elapsed < config.prune_hysteresis_duration,
757            "paid key should be retained within hysteresis window"
758        );
759
760        // The prune pass evaluates both independently. Verify they don't
761        // interfere by using separate keys.
762        assert_ne!(
763            record_key, paid_key,
764            "record and paid pruning keys must be independent"
765        );
766
767        // Step 4: After the cycle, a new snapshot is taken and cursor resets.
768        let new_state = NeighborSyncState::new_cycle(vec![
769            peer_id_from_byte(1),
770            peer_id_from_byte(2),
771            peer_id_from_byte(3),
772        ]);
773        assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
774        assert!(
775            !new_state.is_cycle_complete(),
776            "new cycle should not be immediately complete"
777        );
778    }
779
780    #[test]
781    fn scenario_38_mid_cycle_peer_join_excluded() {
782        // Peer D joins CloseNeighbors mid-cycle.
783        // D should NOT appear in the current NeighborSyncOrder snapshot.
784        // After cycle completes and a new snapshot is taken, D can be included.
785        let peers = vec![
786            peer_id_from_byte(0xA),
787            peer_id_from_byte(0xB),
788            peer_id_from_byte(0xC),
789        ];
790        let mut state = NeighborSyncState::new_cycle(peers);
791
792        // Advance cursor to simulate mid-cycle.
793        let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
794        assert_eq!(state.cursor, 1);
795
796        // Peer D "joins" the network. It should NOT be in the current snapshot.
797        let peer_d = peer_id_from_byte(0xD);
798        assert!(
799            !state.order.contains(&peer_d),
800            "mid-cycle joiner must not appear in the current snapshot"
801        );
802
803        // Complete the current cycle.
804        let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
805        assert!(state.is_cycle_complete());
806
807        // New cycle: now D can be included in the fresh snapshot.
808        let new_peers = vec![
809            peer_id_from_byte(0xA),
810            peer_id_from_byte(0xB),
811            peer_id_from_byte(0xC),
812            peer_d,
813        ];
814        let new_state = NeighborSyncState::new_cycle(new_peers);
815        assert!(
816            new_state.order.contains(&peer_d),
817            "after new snapshot, joiner D should be present"
818        );
819    }
820
821    #[test]
822    fn scenario_39_unreachable_peer_removed_slot_filled() {
823        // Peer P is in snapshot. Sync fails. P removed from order.
824        // Node resumes scanning and picks next peer Q to fill the slot.
825        let peers = vec![
826            peer_id_from_byte(1),
827            peer_id_from_byte(2),
828            peer_id_from_byte(3),
829            peer_id_from_byte(4),
830            peer_id_from_byte(5),
831        ];
832        let mut state = NeighborSyncState::new_cycle(peers);
833
834        // First batch selects peers 1,2.
835        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
836        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
837
838        // Peer 2 becomes unreachable. Remove it and fill the slot.
839        let replacement =
840            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
841        assert!(!state.order.contains(&peer_id_from_byte(2)));
842
843        // Slot should be filled by the next available peer (peer 3).
844        assert_eq!(
845            replacement,
846            Some(peer_id_from_byte(3)),
847            "vacated slot should be filled by next peer in order"
848        );
849
850        // Continue: next batch should resume from after the replacement.
851        let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
852        assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
853        assert!(state.is_cycle_complete());
854    }
855
856    #[test]
857    fn scenario_40_cooldown_peer_removed_from_snapshot() {
858        // Peer synced within cooldown period. When batch selection reaches it,
859        // peer is REMOVED from order (not just skipped). Scanning continues to
860        // next peer.
861        let peers = vec![
862            peer_id_from_byte(1),
863            peer_id_from_byte(2),
864            peer_id_from_byte(3),
865        ];
866        let mut state = NeighborSyncState::new_cycle(peers);
867        let cooldown = Duration::from_secs(3600);
868
869        // Mark peer 2 as recently synced.
870        state
871            .last_sync_times
872            .insert(peer_id_from_byte(2), Instant::now());
873
874        let batch = select_sync_batch(&mut state, 3, cooldown);
875
876        // Peer 2 should have been REMOVED from order, not just skipped.
877        assert!(!state.order.contains(&peer_id_from_byte(2)));
878        assert_eq!(state.order.len(), 2, "order should shrink by 1");
879
880        // Batch contains the non-cooldown peers.
881        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
882
883        // Cycle is complete since all remaining peers were selected.
884        assert!(state.is_cycle_complete());
885    }
886
887    #[test]
888    fn scenario_41_cycle_always_terminates() {
889        // Under arbitrary cooldowns and removals, cycle always terminates.
890        // Create 10 peers. Mark ALL on cooldown. select_sync_batch
891        // should remove all and return empty. Cycle complete.
892        let peer_count: u8 = 10;
893        let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
894        let mut state = NeighborSyncState::new_cycle(peers);
895        let cooldown = Duration::from_secs(3600);
896
897        // Mark all peers as recently synced.
898        for i in 1..=peer_count {
899            state
900                .last_sync_times
901                .insert(peer_id_from_byte(i), Instant::now());
902        }
903
904        let batch = select_sync_batch(&mut state, 4, cooldown);
905
906        assert!(
907            batch.is_empty(),
908            "all peers on cooldown — batch must be empty"
909        );
910        assert!(state.order.is_empty(), "all peers should have been removed");
911        assert!(
912            state.is_cycle_complete(),
913            "cycle must terminate when all peers are removed"
914        );
915    }
916
917    #[test]
918    fn consecutive_rounds_advance_through_full_cycle() {
919        // 6 peers, batch_size=2, no cooldowns.
920        // Round 1: peers 0,1. Round 2: peers 2,3. Round 3: peers 4,5.
921        // After round 3: cycle complete.
922        let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
923        let mut state = NeighborSyncState::new_cycle(peers);
924        let batch_size = 2;
925        let no_cooldown = Duration::from_secs(0);
926
927        let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
928        assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
929        assert_eq!(state.cursor, 2);
930        assert!(!state.is_cycle_complete());
931
932        let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
933        assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
934        assert_eq!(state.cursor, 4);
935        assert!(!state.is_cycle_complete());
936
937        let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
938        assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
939        assert_eq!(state.cursor, 6);
940        assert!(state.is_cycle_complete());
941
942        // Extra call after cycle complete returns empty.
943        let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
944        assert!(round4.is_empty());
945    }
946
947    /// Scenario 37: Non-`LocalRT` inbound sync behavior.
948    ///
949    /// When a peer not in `LocalRT(self)` opens a sync session:
950    /// - Receiver STILL builds and sends outbound hints (response always
951    ///   constructed via `handle_sync_request`).
952    /// - Receiver drops ALL inbound replica/paid hints from that peer
953    ///   (caller returns early in `mod.rs:handle_neighbor_sync_request`
954    ///   when `sender_in_rt` is false).
955    /// - Sync history is NOT updated for non-RT peers, so no
956    ///   `RepairOpportunity` is created.
957    ///
958    /// Full integration requires a live `P2PNode` (`handle_sync_request`
959    /// calls `is_in_routing_table`). This test verifies the deterministic
960    /// contract:
961    ///   (1) `NeighborSyncResponse` is always constructed regardless of
962    ///       sender RT membership (outbound hints sent).
963    ///   (2) When `sender_in_rt` is false, no admission runs and sync
964    ///       history is not updated.
965    ///   (3) When `sender_in_rt` is true, sync history IS updated and
966    ///       inbound hints enter the admission pipeline.
967    #[test]
968    fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
969        let sender = peer_id_from_byte(0x37);
970
971        // Simulate what handle_sync_request always builds: outbound hints
972        // in the response, regardless of whether sender is in LocalRT.
973        let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
974        let outbound_paid_hints = vec![[0x03; 32]];
975        let response = NeighborSyncResponse {
976            replica_hints: outbound_replica_hints.clone(),
977            paid_hints: outbound_paid_hints.clone(),
978            bootstrapping: false,
979            rejected_keys: Vec::new(),
980        };
981
982        // Inbound hints from the sender (would be in the request).
983        let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
984        let inbound_paid_hints = vec![[0xB0; 32]];
985
986        // --- Case 1: sender NOT in LocalRT (sender_in_rt = false) ---
987        let sender_in_rt = false;
988        let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
989
990        // Response is still built — outbound hints are sent.
991        assert_eq!(
992            response.replica_hints, outbound_replica_hints,
993            "outbound replica hints must be sent even when sender is not in LocalRT"
994        );
995        assert_eq!(
996            response.paid_hints, outbound_paid_hints,
997            "outbound paid hints must be sent even when sender is not in LocalRT"
998        );
999
1000        // Caller checks sender_in_rt and returns early. No admission runs.
1001        if !sender_in_rt {
1002            // This is the early-return path in mod.rs:964-966.
1003            // Inbound hints are never processed.
1004            let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
1005            let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
1006
1007            for key in &inbound_replica_hints {
1008                assert!(
1009                    !admitted_replica_keys.contains(key),
1010                    "inbound replica hints must NOT be admitted from non-RT sender"
1011                );
1012            }
1013            for key in &inbound_paid_hints {
1014                assert!(
1015                    !admitted_paid_keys.contains(key),
1016                    "inbound paid hints must NOT be admitted from non-RT sender"
1017                );
1018            }
1019
1020            // Sync history is NOT updated for non-RT peers.
1021            assert!(
1022                !sync_history.contains_key(&sender),
1023                "sync history must NOT be updated for non-LocalRT sender"
1024            );
1025        }
1026
1027        // --- Case 2: sender IS in LocalRT (sender_in_rt = true) ---
1028        let sender_in_rt = true;
1029        assert!(
1030            sender_in_rt,
1031            "when sender is in LocalRT, inbound hints are processed"
1032        );
1033
1034        // Sync history IS updated for RT peers.
1035        sync_history.insert(
1036            sender,
1037            PeerSyncRecord {
1038                last_sync: Some(Instant::now()),
1039                cycles_since_sync: 0,
1040            },
1041        );
1042        assert!(
1043            sync_history.contains_key(&sender),
1044            "sync history should be updated for LocalRT sender"
1045        );
1046        assert!(
1047            sync_history
1048                .get(&sender)
1049                .expect("sender in history")
1050                .last_sync
1051                .is_some(),
1052            "last_sync should be recorded for RT sender"
1053        );
1054    }
1055
1056    #[test]
1057    fn cycle_completion_resets_cursor_but_keeps_sync_times() {
1058        // Verify that after cycle completes, starting a new cycle
1059        // preserves the last_sync_times from the old state.
1060        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1061        let mut state = NeighborSyncState::new_cycle(peers);
1062
1063        // Sync both peers and record their times.
1064        let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1065        record_successful_sync(&mut state, &peer_id_from_byte(1));
1066        record_successful_sync(&mut state, &peer_id_from_byte(2));
1067        assert!(state.is_cycle_complete());
1068
1069        // Capture sync times before "resetting" for a new cycle.
1070        let old_sync_times = state.last_sync_times.clone();
1071        assert_eq!(old_sync_times.len(), 2);
1072
1073        // Simulate starting a new cycle: create fresh state but carry over
1074        // last_sync_times (as the real driver would).
1075        let new_peers = vec![
1076            peer_id_from_byte(1),
1077            peer_id_from_byte(2),
1078            peer_id_from_byte(3),
1079        ];
1080        let mut new_state = NeighborSyncState::new_cycle(new_peers);
1081        new_state.last_sync_times = old_sync_times;
1082
1083        // Cursor is reset.
1084        assert_eq!(new_state.cursor, 0);
1085        assert!(!new_state.is_cycle_complete());
1086
1087        // Sync times are preserved.
1088        assert_eq!(new_state.last_sync_times.len(), 2);
1089        assert!(new_state
1090            .last_sync_times
1091            .contains_key(&peer_id_from_byte(1)));
1092        assert!(new_state
1093            .last_sync_times
1094            .contains_key(&peer_id_from_byte(2)));
1095
1096        // The preserved cooldowns cause peers 1,2 to be removed, leaving
1097        // only peer 3 selected.
1098        let cooldown = Duration::from_secs(3600);
1099        let batch = select_sync_batch(&mut new_state, 3, cooldown);
1100        assert_eq!(
1101            batch,
1102            std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1103            "only the new peer should be selected; old peers are on cooldown"
1104        );
1105    }
1106}