Skip to main content

ant_node/replication/
neighbor_sync.rs

1//! Neighbor replication sync (Section 6.2).
2//!
3//! Round-robin cycle management: snapshot close neighbors, iterate through
4//! them in batches of `NEIGHBOR_SYNC_PEER_COUNT`, exchanging hint sets.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::logging::{debug, info, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14
15use crate::ant_protocol::XorName;
16use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
17use crate::replication::paid_list::PaidList;
18use crate::replication::protocol::{
19    NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
20};
21use crate::replication::types::NeighborSyncState;
22use crate::storage::LmdbStorage;
23
24/// Hint-build duration that is worth surfacing at info level.
25const HINT_BUILD_SLOW_LOG_MS: u128 = 250;
26
27/// Replica hint sent to a peer, with the close-group snapshot used to decide
28/// that hint.
29#[derive(Debug)]
30pub(crate) struct SentReplicaHint {
31    /// Key included in the replica hint set.
32    pub(crate) key: XorName,
33    /// Self-inclusive close group observed during hint construction.
34    pub(crate) close_peers: HashSet<PeerId>,
35}
36
37/// Result of an outbound neighbor-sync exchange.
38#[derive(Debug)]
39pub(crate) struct NeighborSyncOutcome {
40    /// The peer's sync response.
41    pub(crate) response: NeighborSyncResponse,
42    /// Replica hints sent to the peer in our request.
43    pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
44}
45
46/// Prebuilt hint sets for one outbound neighbor-sync exchange.
47#[derive(Debug, Default)]
48pub(crate) struct PeerSyncHints {
49    /// Replica hints, including the close-group snapshot needed for repair
50    /// proof recording after the request is successfully sent.
51    pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
52    /// Paid-list-only hints for this peer.
53    paid_hints: Vec<XorName>,
54}
55
56/// Build replica hints for a specific peer.
57///
58/// Returns keys that we believe the peer should hold (peer is among the
59/// `CLOSE_GROUP_SIZE` nearest to `K` in our `SelfInclusiveRT`).
60///
61/// This intentionally scans all locally stored records, not only records for
62/// which this node is still responsible. Out-of-range records retained by
63/// prune hysteresis therefore continue to repair their new close group before
64/// this node is allowed to delete them.
65pub async fn build_replica_hints_for_peer(
66    peer: &PeerId,
67    storage: &Arc<LmdbStorage>,
68    p2p_node: &Arc<P2PNode>,
69    close_group_size: usize,
70) -> Vec<XorName> {
71    build_replica_hints_for_peer_with_close_groups(peer, storage, p2p_node, close_group_size)
72        .await
73        .into_iter()
74        .map(|hint| hint.key)
75        .collect()
76}
77
78pub(crate) async fn build_replica_hints_for_peer_with_close_groups(
79    peer: &PeerId,
80    storage: &Arc<LmdbStorage>,
81    p2p_node: &Arc<P2PNode>,
82    close_group_size: usize,
83) -> Vec<SentReplicaHint> {
84    let all_keys = match storage.all_keys().await {
85        Ok(keys) => keys,
86        Err(e) => {
87            warn!("Failed to read stored keys for hint construction: {e}");
88            return Vec::new();
89        }
90    };
91
92    let dht = p2p_node.dht_manager();
93    let mut hints = Vec::new();
94    for key in all_keys {
95        let closest = dht
96            .find_closest_nodes_local_with_self(&key, close_group_size)
97            .await;
98        let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
99        if close_peers.contains(peer) {
100            hints.push(SentReplicaHint { key, close_peers });
101        }
102    }
103    hints
104}
105
106/// Build outbound hint sets for a batch of peers with one scan over local
107/// storage and one scan over the paid list.
108pub(crate) async fn build_sync_hints_for_peers(
109    peers: &[PeerId],
110    storage: &Arc<LmdbStorage>,
111    paid_list: &Arc<PaidList>,
112    p2p_node: &Arc<P2PNode>,
113    close_group_size: usize,
114    paid_list_close_group_size: usize,
115) -> HashMap<PeerId, PeerSyncHints> {
116    let started = Instant::now();
117    let target_peers = peers.iter().copied().collect::<HashSet<_>>();
118    let mut hints_by_peer = peers
119        .iter()
120        .copied()
121        .map(|peer| (peer, PeerSyncHints::default()))
122        .collect::<HashMap<_, _>>();
123
124    if peers.is_empty() {
125        return hints_by_peer;
126    }
127
128    let all_keys = match storage.all_keys().await {
129        Ok(keys) => keys,
130        Err(e) => {
131            warn!("Failed to read stored keys for batch hint construction: {e}");
132            Vec::new()
133        }
134    };
135    let stored_key_count = all_keys.len();
136
137    let dht = p2p_node.dht_manager();
138    for key in all_keys {
139        let closest = dht
140            .find_closest_nodes_local_with_self(&key, close_group_size)
141            .await;
142        let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
143        for peer in close_peers.intersection(&target_peers) {
144            if let Some(peer_hints) = hints_by_peer.get_mut(peer) {
145                peer_hints.sent_replica_hints.push(SentReplicaHint {
146                    key,
147                    close_peers: close_peers.clone(),
148                });
149            }
150        }
151    }
152
153    let all_paid_keys = match paid_list.all_keys() {
154        Ok(keys) => keys,
155        Err(e) => {
156            warn!("Failed to read PaidForList for batch hint construction: {e}");
157            Vec::new()
158        }
159    };
160    let paid_key_count = all_paid_keys.len();
161
162    for key in all_paid_keys {
163        let closest = dht
164            .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
165            .await;
166        for node in closest {
167            if target_peers.contains(&node.peer_id) {
168                if let Some(peer_hints) = hints_by_peer.get_mut(&node.peer_id) {
169                    peer_hints.paid_hints.push(key);
170                }
171            }
172        }
173    }
174
175    let replica_hint_count = hints_by_peer
176        .values()
177        .map(|hints| hints.sent_replica_hints.len())
178        .sum::<usize>();
179    let paid_hint_count = hints_by_peer
180        .values()
181        .map(|hints| hints.paid_hints.len())
182        .sum::<usize>();
183    let elapsed_ms = started.elapsed().as_millis();
184
185    if elapsed_ms >= HINT_BUILD_SLOW_LOG_MS {
186        info!(
187            target: "ant_node::replication::neighbor_sync",
188            "Slow neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
189            peers.len(),
190            stored_key_count,
191            paid_key_count,
192            replica_hint_count,
193            paid_hint_count,
194        );
195    } else {
196        debug!(
197            target: "ant_node::replication::neighbor_sync",
198            "Neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
199            peers.len(),
200            stored_key_count,
201            paid_key_count,
202            replica_hint_count,
203            paid_hint_count,
204        );
205    }
206
207    hints_by_peer
208}
209
210/// Build paid hints for a specific peer.
211///
212/// Returns keys from our `PaidForList` that we believe the peer should
213/// track (peer is among `PAID_LIST_CLOSE_GROUP_SIZE` nearest to `K`).
214pub async fn build_paid_hints_for_peer(
215    peer: &PeerId,
216    paid_list: &Arc<PaidList>,
217    p2p_node: &Arc<P2PNode>,
218    paid_list_close_group_size: usize,
219) -> Vec<XorName> {
220    let all_paid_keys = match paid_list.all_keys() {
221        Ok(keys) => keys,
222        Err(e) => {
223            warn!("Failed to read PaidForList for hint construction: {e}");
224            return Vec::new();
225        }
226    };
227
228    let dht = p2p_node.dht_manager();
229    let mut hints = Vec::new();
230    for key in all_paid_keys {
231        let closest = dht
232            .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
233            .await;
234        if closest.iter().any(|n| n.peer_id == *peer) {
235            hints.push(key);
236        }
237    }
238    hints
239}
240
241/// Take a fresh snapshot of close neighbors for a new round-robin cycle.
242///
243/// Rule 1: Compute `CloseNeighbors(self)` as `NEIGHBOR_SYNC_SCOPE` nearest
244/// peers.
245pub async fn snapshot_close_neighbors(
246    p2p_node: &Arc<P2PNode>,
247    self_id: &PeerId,
248    scope: usize,
249) -> Vec<PeerId> {
250    let self_xor: XorName = *self_id.as_bytes();
251    let closest = p2p_node
252        .dht_manager()
253        .find_closest_nodes_local(&self_xor, scope)
254        .await;
255    closest.iter().map(|n| n.peer_id).collect()
256}
257
258/// Select the next batch of peers for sync from the current cycle.
259///
260/// Priority peers from routing-table churn are drained before the round-robin
261/// cursor. Rules 2-3 then scan forward from cursor, skip peers still under
262/// cooldown, and fill up to `peer_count` slots.
263pub fn select_sync_batch(
264    state: &mut NeighborSyncState,
265    peer_count: usize,
266    cooldown: Duration,
267) -> Vec<PeerId> {
268    let mut batch = Vec::new();
269    let now = Instant::now();
270
271    while batch.len() < peer_count {
272        let Some(peer) = select_next_sync_peer(state, now, cooldown) else {
273            break;
274        };
275        batch.push(peer);
276    }
277
278    batch
279}
280
281fn select_next_sync_peer(
282    state: &mut NeighborSyncState,
283    now: Instant,
284    cooldown: Duration,
285) -> Option<PeerId> {
286    while let Some(peer) = state.priority_order.pop_front() {
287        if peer_on_cooldown(state, &peer, now, cooldown) {
288            state.remove_peer(&peer);
289            continue;
290        }
291
292        state.remove_peer(&peer);
293        return Some(peer);
294    }
295
296    while state.cursor < state.order.len() {
297        let peer = state.order[state.cursor];
298
299        // Check cooldown (Rule 2a): if the peer was synced recently, remove
300        // from the snapshot and continue without advancing the cursor (the
301        // next element slides into the current cursor position).
302        if peer_on_cooldown(state, &peer, now, cooldown) {
303            state.order.remove(state.cursor);
304            continue;
305        }
306
307        state.cursor += 1;
308        return Some(peer);
309    }
310
311    None
312}
313
314fn peer_on_cooldown(
315    state: &NeighborSyncState,
316    peer: &PeerId,
317    now: Instant,
318    cooldown: Duration,
319) -> bool {
320    state
321        .last_sync_times
322        .get(peer)
323        .is_some_and(|last_sync| now.duration_since(*last_sync) < cooldown)
324}
325
326/// Execute a sync session with a single peer.
327///
328/// Returns the response hints if sync succeeded, or `None` if the peer
329/// was unreachable or the response could not be decoded.
330pub async fn sync_with_peer(
331    peer: &PeerId,
332    p2p_node: &Arc<P2PNode>,
333    storage: &Arc<LmdbStorage>,
334    paid_list: &Arc<PaidList>,
335    config: &ReplicationConfig,
336    is_bootstrapping: bool,
337) -> Option<NeighborSyncResponse> {
338    sync_with_peer_with_outcome(peer, p2p_node, storage, paid_list, config, is_bootstrapping)
339        .await
340        .map(|outcome| outcome.response)
341}
342
343pub(crate) async fn sync_with_peer_with_outcome(
344    peer: &PeerId,
345    p2p_node: &Arc<P2PNode>,
346    storage: &Arc<LmdbStorage>,
347    paid_list: &Arc<PaidList>,
348    config: &ReplicationConfig,
349    is_bootstrapping: bool,
350) -> Option<NeighborSyncOutcome> {
351    // Build peer-targeted hint sets (Rule 7).
352    let mut hints_by_peer = build_sync_hints_for_peers(
353        std::slice::from_ref(peer),
354        storage,
355        paid_list,
356        p2p_node,
357        config.close_group_size,
358        config.paid_list_close_group_size,
359    )
360    .await;
361    let hints = hints_by_peer.remove(peer).unwrap_or_default();
362    sync_with_peer_with_hints(peer, p2p_node, config, is_bootstrapping, hints).await
363}
364
365pub(crate) async fn sync_with_peer_with_hints(
366    peer: &PeerId,
367    p2p_node: &Arc<P2PNode>,
368    config: &ReplicationConfig,
369    is_bootstrapping: bool,
370    hints: PeerSyncHints,
371) -> Option<NeighborSyncOutcome> {
372    let replica_hints = hints
373        .sent_replica_hints
374        .iter()
375        .map(|hint| hint.key)
376        .collect::<Vec<_>>();
377    let sent_replica_hints = hints.sent_replica_hints;
378
379    let request = NeighborSyncRequest {
380        replica_hints,
381        paid_hints: hints.paid_hints,
382        bootstrapping: is_bootstrapping,
383    };
384    let request_id = rand::thread_rng().gen::<u64>();
385    let msg = ReplicationMessage {
386        request_id,
387        body: ReplicationMessageBody::NeighborSyncRequest(request),
388    };
389
390    let encoded = match msg.encode() {
391        Ok(data) => data,
392        Err(e) => {
393            warn!("Failed to encode sync request for {peer}: {e}");
394            return None;
395        }
396    };
397
398    let response = match p2p_node
399        .send_request(
400            peer,
401            REPLICATION_PROTOCOL_ID,
402            encoded,
403            config.verification_request_timeout,
404        )
405        .await
406    {
407        Ok(resp) => resp,
408        Err(e) => {
409            debug!("Sync with {peer} failed: {e}");
410            return None;
411        }
412    };
413
414    match ReplicationMessage::decode(&response.data) {
415        Ok(decoded) => {
416            if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
417                Some(NeighborSyncOutcome {
418                    response: resp,
419                    sent_replica_hints,
420                })
421            } else {
422                warn!("Unexpected response type from {peer} during sync");
423                None
424            }
425        }
426        Err(e) => {
427            warn!("Failed to decode sync response from {peer}: {e}");
428            None
429        }
430    }
431}
432
433/// Handle a failed sync attempt: remove peer from snapshot and try to fill
434/// the vacated slot.
435///
436/// Rule 3: Remove unreachable peer from pending sync state, attempt to fill
437/// by using the same priority-first scan as [`select_sync_batch`]. Applies the
438/// same cooldown filtering to avoid selecting a peer that was recently synced.
439pub fn handle_sync_failure(
440    state: &mut NeighborSyncState,
441    failed_peer: &PeerId,
442    cooldown: Duration,
443) -> Option<PeerId> {
444    // Find and remove the failed peer from the ordering.
445    state.remove_peer(failed_peer);
446
447    // Try to fill the vacated slot, applying the same priority and cooldown
448    // filtering used by select_sync_batch.
449    let now = Instant::now();
450    select_next_sync_peer(state, now, cooldown)
451}
452
453/// Record a successful sync with a peer.
454pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
455    state.last_sync_times.insert(*peer, Instant::now());
456}
457
458/// Handle incoming sync request from a peer.
459///
460/// Rules 4-6: Validate peer is in `LocalRT`. If yes, bidirectional sync.
461/// If not, outbound-only (send hints but don't accept inbound).
462///
463/// Returns `(response, sender_in_routing_table)` where the second element
464/// indicates whether the caller should process the sender's inbound hints.
465pub async fn handle_sync_request(
466    sender: &PeerId,
467    request: &NeighborSyncRequest,
468    p2p_node: &Arc<P2PNode>,
469    storage: &Arc<LmdbStorage>,
470    paid_list: &Arc<PaidList>,
471    config: &ReplicationConfig,
472    is_bootstrapping: bool,
473) -> (NeighborSyncResponse, bool) {
474    let (response, _, sender_in_rt) = handle_sync_request_with_proofs(
475        sender,
476        request,
477        p2p_node,
478        storage,
479        paid_list,
480        config,
481        is_bootstrapping,
482    )
483    .await;
484    (response, sender_in_rt)
485}
486
487pub(crate) async fn handle_sync_request_with_proofs(
488    sender: &PeerId,
489    _request: &NeighborSyncRequest,
490    p2p_node: &Arc<P2PNode>,
491    storage: &Arc<LmdbStorage>,
492    paid_list: &Arc<PaidList>,
493    config: &ReplicationConfig,
494    is_bootstrapping: bool,
495) -> (NeighborSyncResponse, Vec<SentReplicaHint>, bool) {
496    let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
497
498    // Build outbound hints (always sent, even to non-RT peers).
499    let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
500        sender,
501        storage,
502        p2p_node,
503        config.close_group_size,
504    )
505    .await;
506    let replica_hints = sent_replica_hints
507        .iter()
508        .map(|hint| hint.key)
509        .collect::<Vec<_>>();
510    let paid_hints = build_paid_hints_for_peer(
511        sender,
512        paid_list,
513        p2p_node,
514        config.paid_list_close_group_size,
515    )
516    .await;
517
518    let response = NeighborSyncResponse {
519        replica_hints,
520        paid_hints,
521        bootstrapping: is_bootstrapping,
522        rejected_keys: Vec::new(),
523    };
524
525    // Rule 4-6: accept inbound hints only if sender is in LocalRT.
526    (response, sent_replica_hints, sender_in_rt)
527}
528
529// ---------------------------------------------------------------------------
530// Tests
531// ---------------------------------------------------------------------------
532
533#[cfg(test)]
534#[allow(clippy::unwrap_used, clippy::expect_used)]
535mod tests {
536    use super::*;
537    use crate::replication::types::PeerSyncRecord;
538    use std::collections::HashMap;
539
540    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
541    fn peer_id_from_byte(b: u8) -> PeerId {
542        let mut bytes = [0u8; 32];
543        bytes[0] = b;
544        PeerId::from_bytes(bytes)
545    }
546
547    // -- select_sync_batch ---------------------------------------------------
548
549    #[test]
550    fn select_sync_batch_returns_up_to_peer_count() {
551        let peers = vec![
552            peer_id_from_byte(1),
553            peer_id_from_byte(2),
554            peer_id_from_byte(3),
555            peer_id_from_byte(4),
556            peer_id_from_byte(5),
557        ];
558        let mut state = NeighborSyncState::new_cycle(peers);
559        let batch_size = 3;
560
561        let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
562
563        assert_eq!(batch.len(), batch_size);
564        assert_eq!(batch[0], peer_id_from_byte(1));
565        assert_eq!(batch[1], peer_id_from_byte(2));
566        assert_eq!(batch[2], peer_id_from_byte(3));
567        assert_eq!(state.cursor, 3);
568    }
569
570    #[test]
571    fn select_sync_batch_skips_cooldown_peers() {
572        let peers = vec![
573            peer_id_from_byte(1),
574            peer_id_from_byte(2),
575            peer_id_from_byte(3),
576            peer_id_from_byte(4),
577        ];
578        let mut state = NeighborSyncState::new_cycle(peers);
579
580        // Mark peer 1 and peer 3 as recently synced.
581        state
582            .last_sync_times
583            .insert(peer_id_from_byte(1), Instant::now());
584        state
585            .last_sync_times
586            .insert(peer_id_from_byte(3), Instant::now());
587
588        let cooldown = Duration::from_secs(3600); // 1 hour
589        let batch = select_sync_batch(&mut state, 2, cooldown);
590
591        // Peer 1 and peer 3 should be skipped (removed from order).
592        assert_eq!(batch.len(), 2);
593        assert_eq!(batch[0], peer_id_from_byte(2));
594        assert_eq!(batch[1], peer_id_from_byte(4));
595
596        // Cooldown peers should have been removed from the order.
597        assert!(!state.order.contains(&peer_id_from_byte(1)));
598        assert!(!state.order.contains(&peer_id_from_byte(3)));
599    }
600
601    #[test]
602    fn select_sync_batch_expired_cooldown_not_skipped() {
603        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
604        let mut state = NeighborSyncState::new_cycle(peers);
605
606        // Mark peer 1 as synced a long time ago (simulate expired cooldown).
607        // Use a small subtraction (2s) and a smaller cooldown (1s) to avoid
608        // `checked_sub` returning `None` on freshly-booted CI runners where
609        // `Instant::now()` (system uptime) may be very small.
610        state.last_sync_times.insert(
611            peer_id_from_byte(1),
612            Instant::now()
613                .checked_sub(Duration::from_secs(2))
614                .unwrap_or_else(Instant::now),
615        );
616
617        let cooldown = Duration::from_secs(1);
618        let batch = select_sync_batch(&mut state, 2, cooldown);
619
620        // Peer 1's cooldown expired so it should be included.
621        assert_eq!(batch.len(), 2);
622        assert_eq!(batch[0], peer_id_from_byte(1));
623        assert_eq!(batch[1], peer_id_from_byte(2));
624    }
625
626    #[test]
627    fn select_sync_batch_empty_order() {
628        let mut state = NeighborSyncState::new_cycle(vec![]);
629
630        let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
631
632        assert!(batch.is_empty());
633        assert_eq!(state.cursor, 0);
634    }
635
636    #[test]
637    fn select_sync_batch_all_on_cooldown() {
638        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
639        let mut state = NeighborSyncState::new_cycle(peers);
640
641        state
642            .last_sync_times
643            .insert(peer_id_from_byte(1), Instant::now());
644        state
645            .last_sync_times
646            .insert(peer_id_from_byte(2), Instant::now());
647
648        let cooldown = Duration::from_secs(3600);
649        let batch = select_sync_batch(&mut state, 4, cooldown);
650
651        assert!(batch.is_empty());
652        assert!(state.order.is_empty());
653    }
654
655    // -- handle_sync_failure -------------------------------------------------
656
657    #[test]
658    fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
659        let peers = vec![
660            peer_id_from_byte(1),
661            peer_id_from_byte(2),
662            peer_id_from_byte(3),
663            peer_id_from_byte(4),
664        ];
665        let mut state = NeighborSyncState::new_cycle(peers);
666        // Simulate having already processed peers at indices 0 and 1.
667        state.cursor = 2;
668
669        // Peer 2 (index 1, before cursor) fails.
670        let replacement =
671            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
672
673        // Cursor should be adjusted down by 1 (was 2, now 1).
674        assert_eq!(state.cursor, 2); // was 2, removed at pos 1, adjusted to 1, then replacement advances to 2
675        assert!(!state.order.contains(&peer_id_from_byte(2)));
676
677        // Should get peer 4 as replacement (index 1 after removal = peer 3,
678        // but cursor was adjusted to 1 so peer 3 is at index 1; it returns
679        // the peer at the new cursor and advances).
680        assert!(replacement.is_some());
681    }
682
683    #[test]
684    fn handle_sync_failure_removes_peer_after_cursor() {
685        let peers = vec![
686            peer_id_from_byte(1),
687            peer_id_from_byte(2),
688            peer_id_from_byte(3),
689            peer_id_from_byte(4),
690        ];
691        let mut state = NeighborSyncState::new_cycle(peers);
692        state.cursor = 1;
693
694        // Peer 3 (index 2, after cursor) fails.
695        let replacement =
696            handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
697
698        // Cursor should stay at 1 (removal was after cursor).
699        assert_eq!(state.cursor, 2); // cursor was 1, replacement advances to 2
700        assert!(!state.order.contains(&peer_id_from_byte(3)));
701
702        // Replacement should be peer 2 (now at cursor position 1).
703        assert_eq!(replacement, Some(peer_id_from_byte(2)));
704    }
705
706    #[test]
707    fn handle_sync_failure_no_replacement_when_exhausted() {
708        let peers = vec![peer_id_from_byte(1)];
709        let mut state = NeighborSyncState::new_cycle(peers);
710        state.cursor = 1; // Already past the only peer.
711
712        let replacement =
713            handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
714
715        assert!(state.order.is_empty());
716        assert!(replacement.is_none());
717    }
718
719    #[test]
720    fn handle_sync_failure_unknown_peer_is_noop() {
721        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
722        let mut state = NeighborSyncState::new_cycle(peers);
723        state.cursor = 1;
724
725        let replacement =
726            handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
727
728        // Order should be unchanged.
729        assert_eq!(state.order.len(), 2);
730        // Still tries to fill from cursor.
731        assert_eq!(replacement, Some(peer_id_from_byte(2)));
732        assert_eq!(state.cursor, 2);
733    }
734
735    // -- record_successful_sync ----------------------------------------------
736
737    #[test]
738    fn record_successful_sync_updates_last_sync_time() {
739        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
740        let mut state = NeighborSyncState::new_cycle(peers);
741        let peer = peer_id_from_byte(1);
742
743        assert!(!state.last_sync_times.contains_key(&peer));
744
745        let before = Instant::now();
746        record_successful_sync(&mut state, &peer);
747        let after = Instant::now();
748
749        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
750        assert!(*ts >= before);
751        assert!(*ts <= after);
752    }
753
754    #[test]
755    fn record_successful_sync_overwrites_previous() {
756        let peers = vec![peer_id_from_byte(1)];
757        let mut state = NeighborSyncState::new_cycle(peers);
758        let peer = peer_id_from_byte(1);
759
760        // Record a sync at an old time. Use a small subtraction to avoid
761        // `checked_sub` returning `None` on freshly-booted CI runners.
762        let old_time = Instant::now()
763            .checked_sub(Duration::from_secs(2))
764            .unwrap_or_else(Instant::now);
765        state.last_sync_times.insert(peer, old_time);
766
767        record_successful_sync(&mut state, &peer);
768
769        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
770        assert!(*ts > old_time, "sync time should be updated");
771    }
772
773    // -- Section 18: Neighbor sync scenarios --------------------------------
774
775    #[test]
776    fn scenario_35_round_robin_with_cooldown_skip() {
777        // With >PEER_COUNT eligible peers, consecutive rounds scan forward
778        // from cursor, skip cooldown peers, sync next batch.
779        // Create 8 peers, mark peers 2,4 on cooldown.
780        // First batch of 4: peers 1,3,5,6 (2,4 skipped and removed).
781        // Second batch of 4: peers 7,8 (only 2 remain).
782        // Cycle should complete after second batch.
783        let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
784        let mut state = NeighborSyncState::new_cycle(peers);
785        let batch_size = 4;
786        let cooldown = Duration::from_secs(3600);
787
788        // Mark peers 2 and 4 as recently synced (on cooldown).
789        state
790            .last_sync_times
791            .insert(peer_id_from_byte(2), Instant::now());
792        state
793            .last_sync_times
794            .insert(peer_id_from_byte(4), Instant::now());
795
796        // First batch: scan from cursor 0. Peers 2 and 4 are removed,
797        // leaving [1,3,5,6,7,8]. We pick the first 4: [1,3,5,6].
798        let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
799        assert_eq!(batch1.len(), 4);
800        assert_eq!(batch1[0], peer_id_from_byte(1));
801        assert_eq!(batch1[1], peer_id_from_byte(3));
802        assert_eq!(batch1[2], peer_id_from_byte(5));
803        assert_eq!(batch1[3], peer_id_from_byte(6));
804
805        // Cooldown peers should have been removed from the order.
806        assert!(!state.order.contains(&peer_id_from_byte(2)));
807        assert!(!state.order.contains(&peer_id_from_byte(4)));
808
809        // Second batch: only peers 7,8 remain after cursor.
810        let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
811        assert_eq!(batch2.len(), 2);
812        assert_eq!(batch2[0], peer_id_from_byte(7));
813        assert_eq!(batch2[1], peer_id_from_byte(8));
814
815        // Cycle should be complete after second batch.
816        assert!(state.is_cycle_complete());
817    }
818
819    #[test]
820    fn cycle_complete_when_cursor_past_order() {
821        // is_cycle_complete() returns true when cursor >= order.len().
822        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
823        let mut state = NeighborSyncState::new_cycle(peers);
824
825        // Not complete at the start.
826        assert!(!state.is_cycle_complete());
827
828        // Advance cursor to exactly order.len().
829        state.cursor = 3;
830        assert!(state.is_cycle_complete());
831
832        // Also complete when cursor exceeds order.len().
833        state.cursor = 10;
834        assert!(state.is_cycle_complete());
835
836        // Edge case: order is emptied (peers removed) with cursor at 0.
837        state.order.clear();
838        state.cursor = 0;
839        assert!(state.is_cycle_complete());
840    }
841
842    /// Scenario 36: Post-cycle responsibility pruning with time-based
843    /// hysteresis.
844    ///
845    /// When a full round-robin cycle completes, node runs one prune pass
846    /// over BOTH stored records and `PaidForList` entries using current
847    /// `SelfInclusiveRT`. Out-of-range items have timestamps recorded but
848    /// are deleted only after `PRUNE_HYSTERESIS_DURATION`. In-range items
849    /// have their timestamps cleared.
850    ///
851    /// Full `run_prune_pass` requires a live `P2PNode`. This test verifies
852    /// the deterministic trigger condition (cycle completion) and the
853    /// combined record + paid-list pruning contract:
854    ///   (1) Cycle completes -> prune pass should run.
855    ///   (2) Both `RecordOutOfRangeFirstSeen` and `PaidOutOfRangeFirstSeen`
856    ///       are tracked independently in the same pass.
857    ///   (3) Keys within hysteresis window are retained.
858    #[test]
859    fn scenario_36_post_cycle_triggers_combined_prune_pass() {
860        let config = ReplicationConfig::default();
861
862        // Step 1: Run a full cycle to completion.
863        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
864        let mut state = NeighborSyncState::new_cycle(peers);
865        let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
866        assert!(
867            state.is_cycle_complete(),
868            "cycle must be complete before prune pass triggers"
869        );
870
871        // Step 2: Verify prune hysteresis parameters are configured.
872        assert!(
873            !config.prune_hysteresis_duration.is_zero(),
874            "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
875        );
876
877        // Step 3: Simulate the prune-pass timestamp tracking for BOTH
878        // record and paid-list entries (the two independent timestamp
879        // families that Section 11 requires in a single pass).
880        //
881        // Record timestamps and paid timestamps are independent — clearing
882        // one must not affect the other (tested in scenario_52). Here we
883        // verify the combined trigger: cycle completion -> both kinds of
884        // timestamps are eligible for evaluation.
885        let record_key: [u8; 32] = [0x36; 32];
886        let paid_key: [u8; 32] = [0x37; 32];
887
888        // Simulate: record_key goes out of range, paid_key goes out of range.
889        let record_first_seen = Instant::now();
890        let paid_first_seen = Instant::now();
891
892        // Both timestamps are recent — well within hysteresis window.
893        let record_elapsed = record_first_seen.elapsed();
894        let paid_elapsed = paid_first_seen.elapsed();
895        assert!(
896            record_elapsed < config.prune_hysteresis_duration,
897            "record key should be retained within hysteresis window"
898        );
899        assert!(
900            paid_elapsed < config.prune_hysteresis_duration,
901            "paid key should be retained within hysteresis window"
902        );
903
904        // The prune pass evaluates both independently. Verify they don't
905        // interfere by using separate keys.
906        assert_ne!(
907            record_key, paid_key,
908            "record and paid pruning keys must be independent"
909        );
910
911        // Step 4: After the cycle, a new snapshot is taken and cursor resets.
912        let new_state = NeighborSyncState::new_cycle(vec![
913            peer_id_from_byte(1),
914            peer_id_from_byte(2),
915            peer_id_from_byte(3),
916        ]);
917        assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
918        assert!(
919            !new_state.is_cycle_complete(),
920            "new cycle should not be immediately complete"
921        );
922    }
923
924    #[test]
925    fn scenario_38_mid_cycle_peer_join_prioritized() {
926        // Peer D joins CloseNeighbors mid-cycle. It is queued for priority
927        // sync without being inserted into the current round-robin snapshot.
928        let peers = vec![
929            peer_id_from_byte(0xA),
930            peer_id_from_byte(0xB),
931            peer_id_from_byte(0xC),
932        ];
933        let mut state = NeighborSyncState::new_cycle(peers);
934
935        // Advance cursor to simulate mid-cycle.
936        let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
937        assert_eq!(state.cursor, 1);
938
939        // Peer D "joins" the close-neighbor set. It remains outside the
940        // stable snapshot but is queued ahead of the cursor.
941        let peer_d = peer_id_from_byte(0xD);
942        assert_eq!(state.queue_priority_peers([peer_d]), 1);
943        assert!(!state.order.contains(&peer_d));
944        assert!(
945            !state.is_cycle_complete(),
946            "pending priority sync keeps the cycle active"
947        );
948
949        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
950        assert_eq!(batch, vec![peer_d, peer_id_from_byte(0xB)]);
951    }
952
953    #[test]
954    fn scenario_39_unreachable_peer_removed_slot_filled() {
955        // Peer P is in snapshot. Sync fails. P removed from order.
956        // Node resumes scanning and picks next peer Q to fill the slot.
957        let peers = vec![
958            peer_id_from_byte(1),
959            peer_id_from_byte(2),
960            peer_id_from_byte(3),
961            peer_id_from_byte(4),
962            peer_id_from_byte(5),
963        ];
964        let mut state = NeighborSyncState::new_cycle(peers);
965
966        // First batch selects peers 1,2.
967        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
968        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
969
970        // Peer 2 becomes unreachable. Remove it and fill the slot.
971        let replacement =
972            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
973        assert!(!state.order.contains(&peer_id_from_byte(2)));
974
975        // Slot should be filled by the next available peer (peer 3).
976        assert_eq!(
977            replacement,
978            Some(peer_id_from_byte(3)),
979            "vacated slot should be filled by next peer in order"
980        );
981
982        // Continue: next batch should resume from after the replacement.
983        let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
984        assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
985        assert!(state.is_cycle_complete());
986    }
987
988    #[test]
989    fn scenario_40_cooldown_peer_removed_from_snapshot() {
990        // Peer synced within cooldown period. When batch selection reaches it,
991        // peer is REMOVED from order (not just skipped). Scanning continues to
992        // next peer.
993        let peers = vec![
994            peer_id_from_byte(1),
995            peer_id_from_byte(2),
996            peer_id_from_byte(3),
997        ];
998        let mut state = NeighborSyncState::new_cycle(peers);
999        let cooldown = Duration::from_secs(3600);
1000
1001        // Mark peer 2 as recently synced.
1002        state
1003            .last_sync_times
1004            .insert(peer_id_from_byte(2), Instant::now());
1005
1006        let batch = select_sync_batch(&mut state, 3, cooldown);
1007
1008        // Peer 2 should have been REMOVED from order, not just skipped.
1009        assert!(!state.order.contains(&peer_id_from_byte(2)));
1010        assert_eq!(state.order.len(), 2, "order should shrink by 1");
1011
1012        // Batch contains the non-cooldown peers.
1013        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
1014
1015        // Cycle is complete since all remaining peers were selected.
1016        assert!(state.is_cycle_complete());
1017    }
1018
1019    #[test]
1020    fn priority_peer_in_snapshot_is_not_selected_twice() {
1021        let peers = vec![
1022            peer_id_from_byte(1),
1023            peer_id_from_byte(2),
1024            peer_id_from_byte(3),
1025        ];
1026        let mut state = NeighborSyncState::new_cycle(peers);
1027        assert_eq!(state.queue_priority_peers([peer_id_from_byte(2)]), 1);
1028
1029        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1030
1031        assert_eq!(batch, vec![peer_id_from_byte(2), peer_id_from_byte(1)]);
1032        assert_eq!(
1033            state.order,
1034            vec![peer_id_from_byte(1), peer_id_from_byte(3)]
1035        );
1036        assert_eq!(state.cursor, 1);
1037    }
1038
1039    #[test]
1040    fn priority_peer_on_cooldown_is_skipped_and_removed_from_snapshot() {
1041        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1042        let mut state = NeighborSyncState::new_cycle(peers);
1043        let cooldown = Duration::from_secs(1);
1044        let priority_peer = peer_id_from_byte(2);
1045        state.last_sync_times.insert(priority_peer, Instant::now());
1046        assert_eq!(state.queue_priority_peers([priority_peer]), 1);
1047
1048        let batch = select_sync_batch(&mut state, 2, cooldown);
1049
1050        assert_eq!(batch, vec![peer_id_from_byte(1)]);
1051        assert!(state.priority_order.is_empty());
1052        assert!(!state.order.contains(&priority_peer));
1053    }
1054
1055    #[test]
1056    fn failure_replacement_prefers_remaining_priority_peer() {
1057        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1058        let mut state = NeighborSyncState::new_cycle(peers);
1059        let first_priority_peer = peer_id_from_byte(3);
1060        let second_priority_peer = peer_id_from_byte(4);
1061        assert_eq!(
1062            state.queue_priority_peers([first_priority_peer, second_priority_peer]),
1063            2
1064        );
1065
1066        let batch = select_sync_batch(&mut state, 1, Duration::from_secs(0));
1067        assert_eq!(batch, vec![first_priority_peer]);
1068
1069        let replacement =
1070            handle_sync_failure(&mut state, &first_priority_peer, Duration::from_secs(0));
1071
1072        assert_eq!(replacement, Some(second_priority_peer));
1073        assert!(state.priority_order.is_empty());
1074        assert_eq!(state.cursor, 0);
1075    }
1076
1077    #[test]
1078    fn scenario_41_cycle_always_terminates() {
1079        // Under arbitrary cooldowns and removals, cycle always terminates.
1080        // Create 10 peers. Mark ALL on cooldown. select_sync_batch
1081        // should remove all and return empty. Cycle complete.
1082        let peer_count: u8 = 10;
1083        let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
1084        let mut state = NeighborSyncState::new_cycle(peers);
1085        let cooldown = Duration::from_secs(3600);
1086
1087        // Mark all peers as recently synced.
1088        for i in 1..=peer_count {
1089            state
1090                .last_sync_times
1091                .insert(peer_id_from_byte(i), Instant::now());
1092        }
1093
1094        let batch = select_sync_batch(&mut state, 4, cooldown);
1095
1096        assert!(
1097            batch.is_empty(),
1098            "all peers on cooldown — batch must be empty"
1099        );
1100        assert!(state.order.is_empty(), "all peers should have been removed");
1101        assert!(
1102            state.is_cycle_complete(),
1103            "cycle must terminate when all peers are removed"
1104        );
1105    }
1106
1107    #[test]
1108    fn consecutive_rounds_advance_through_full_cycle() {
1109        // 6 peers, batch_size=2, no cooldowns.
1110        // Round 1: peers 0,1. Round 2: peers 2,3. Round 3: peers 4,5.
1111        // After round 3: cycle complete.
1112        let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
1113        let mut state = NeighborSyncState::new_cycle(peers);
1114        let batch_size = 2;
1115        let no_cooldown = Duration::from_secs(0);
1116
1117        let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
1118        assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
1119        assert_eq!(state.cursor, 2);
1120        assert!(!state.is_cycle_complete());
1121
1122        let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
1123        assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
1124        assert_eq!(state.cursor, 4);
1125        assert!(!state.is_cycle_complete());
1126
1127        let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
1128        assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
1129        assert_eq!(state.cursor, 6);
1130        assert!(state.is_cycle_complete());
1131
1132        // Extra call after cycle complete returns empty.
1133        let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
1134        assert!(round4.is_empty());
1135    }
1136
1137    /// Scenario 37: Non-`LocalRT` inbound sync behavior.
1138    ///
1139    /// When a peer not in `LocalRT(self)` opens a sync session:
1140    /// - Receiver STILL builds and sends outbound hints (response always
1141    ///   constructed via `handle_sync_request`).
1142    /// - Receiver drops ALL inbound replica/paid hints from that peer
1143    ///   (caller returns early in `mod.rs:handle_neighbor_sync_request`
1144    ///   when `sender_in_rt` is false).
1145    /// - Sync history is NOT updated for non-RT peers, so no
1146    ///   `RepairOpportunity` is created.
1147    ///
1148    /// Full integration requires a live `P2PNode` (`handle_sync_request`
1149    /// calls `is_in_routing_table`). This test verifies the deterministic
1150    /// contract:
1151    ///   (1) `NeighborSyncResponse` is always constructed regardless of
1152    ///       sender RT membership (outbound hints sent).
1153    ///   (2) When `sender_in_rt` is false, no admission runs and sync
1154    ///       history is not updated.
1155    ///   (3) When `sender_in_rt` is true, sync history IS updated and
1156    ///       inbound hints enter the admission pipeline.
1157    #[test]
1158    fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
1159        let sender = peer_id_from_byte(0x37);
1160
1161        // Simulate what handle_sync_request always builds: outbound hints
1162        // in the response, regardless of whether sender is in LocalRT.
1163        let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
1164        let outbound_paid_hints = vec![[0x03; 32]];
1165        let response = NeighborSyncResponse {
1166            replica_hints: outbound_replica_hints.clone(),
1167            paid_hints: outbound_paid_hints.clone(),
1168            bootstrapping: false,
1169            rejected_keys: Vec::new(),
1170        };
1171
1172        // Inbound hints from the sender (would be in the request).
1173        let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
1174        let inbound_paid_hints = vec![[0xB0; 32]];
1175
1176        // --- Case 1: sender NOT in LocalRT (sender_in_rt = false) ---
1177        let sender_in_rt = false;
1178        let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
1179
1180        // Response is still built — outbound hints are sent.
1181        assert_eq!(
1182            response.replica_hints, outbound_replica_hints,
1183            "outbound replica hints must be sent even when sender is not in LocalRT"
1184        );
1185        assert_eq!(
1186            response.paid_hints, outbound_paid_hints,
1187            "outbound paid hints must be sent even when sender is not in LocalRT"
1188        );
1189
1190        // Caller checks sender_in_rt and returns early. No admission runs.
1191        if !sender_in_rt {
1192            // This is the early-return path in mod.rs:964-966.
1193            // Inbound hints are never processed.
1194            let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
1195            let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
1196
1197            for key in &inbound_replica_hints {
1198                assert!(
1199                    !admitted_replica_keys.contains(key),
1200                    "inbound replica hints must NOT be admitted from non-RT sender"
1201                );
1202            }
1203            for key in &inbound_paid_hints {
1204                assert!(
1205                    !admitted_paid_keys.contains(key),
1206                    "inbound paid hints must NOT be admitted from non-RT sender"
1207                );
1208            }
1209
1210            // Sync history is NOT updated for non-RT peers.
1211            assert!(
1212                !sync_history.contains_key(&sender),
1213                "sync history must NOT be updated for non-LocalRT sender"
1214            );
1215        }
1216
1217        // --- Case 2: sender IS in LocalRT (sender_in_rt = true) ---
1218        let sender_in_rt = true;
1219        assert!(
1220            sender_in_rt,
1221            "when sender is in LocalRT, inbound hints are processed"
1222        );
1223
1224        // Sync history IS updated for RT peers.
1225        sync_history.insert(
1226            sender,
1227            PeerSyncRecord {
1228                last_sync: Some(Instant::now()),
1229                cycles_since_sync: 0,
1230            },
1231        );
1232        assert!(
1233            sync_history.contains_key(&sender),
1234            "sync history should be updated for LocalRT sender"
1235        );
1236        assert!(
1237            sync_history
1238                .get(&sender)
1239                .expect("sender in history")
1240                .last_sync
1241                .is_some(),
1242            "last_sync should be recorded for RT sender"
1243        );
1244    }
1245
1246    #[test]
1247    fn cycle_completion_resets_cursor_but_keeps_sync_times() {
1248        // Verify that after cycle completes, starting a new cycle
1249        // preserves the last_sync_times from the old state.
1250        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1251        let mut state = NeighborSyncState::new_cycle(peers);
1252
1253        // Sync both peers and record their times.
1254        let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1255        record_successful_sync(&mut state, &peer_id_from_byte(1));
1256        record_successful_sync(&mut state, &peer_id_from_byte(2));
1257        assert!(state.is_cycle_complete());
1258
1259        // Capture sync times before "resetting" for a new cycle.
1260        let old_sync_times = state.last_sync_times.clone();
1261        assert_eq!(old_sync_times.len(), 2);
1262
1263        // Simulate starting a new cycle: create fresh state but carry over
1264        // last_sync_times (as the real driver would).
1265        let new_peers = vec![
1266            peer_id_from_byte(1),
1267            peer_id_from_byte(2),
1268            peer_id_from_byte(3),
1269        ];
1270        let mut new_state = NeighborSyncState::new_cycle(new_peers);
1271        new_state.last_sync_times = old_sync_times;
1272
1273        // Cursor is reset.
1274        assert_eq!(new_state.cursor, 0);
1275        assert!(!new_state.is_cycle_complete());
1276
1277        // Sync times are preserved.
1278        assert_eq!(new_state.last_sync_times.len(), 2);
1279        assert!(new_state
1280            .last_sync_times
1281            .contains_key(&peer_id_from_byte(1)));
1282        assert!(new_state
1283            .last_sync_times
1284            .contains_key(&peer_id_from_byte(2)));
1285
1286        // The preserved cooldowns cause peers 1,2 to be removed, leaving
1287        // only peer 3 selected.
1288        let cooldown = Duration::from_secs(3600);
1289        let batch = select_sync_batch(&mut new_state, 3, cooldown);
1290        assert_eq!(
1291            batch,
1292            std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1293            "only the new peer should be selected; old peers are on cooldown"
1294        );
1295    }
1296}