Skip to main content

ant_node/replication/
neighbor_sync.rs

1//! Neighbor replication sync (Section 6.2).
2//!
3//! Round-robin cycle management: snapshot close neighbors, iterate through
4//! them in batches of `NEIGHBOR_SYNC_PEER_COUNT`, exchanging hint sets.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::logging::{debug, info, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14
15use crate::ant_protocol::XorName;
16use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
17use crate::replication::paid_list::PaidList;
18use crate::replication::protocol::{
19    NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
20};
21use crate::replication::types::NeighborSyncState;
22use crate::storage::LmdbStorage;
23
24/// Hint-build duration that is worth surfacing at info level.
25const HINT_BUILD_SLOW_LOG_MS: u128 = 250;
26
27/// Replica hint sent to a peer, with the close-group snapshot used to decide
28/// that hint.
29#[derive(Debug)]
30pub(crate) struct SentReplicaHint {
31    /// Key included in the replica hint set.
32    pub(crate) key: XorName,
33    /// Self-inclusive close group observed during hint construction.
34    pub(crate) close_peers: HashSet<PeerId>,
35}
36
37/// Result of an outbound neighbor-sync exchange.
38#[derive(Debug)]
39pub(crate) struct NeighborSyncOutcome {
40    /// The peer's sync response.
41    pub(crate) response: NeighborSyncResponse,
42    /// Replica hints sent to the peer in our request.
43    pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
44}
45
46/// Prebuilt hint sets for one outbound neighbor-sync exchange.
47#[derive(Debug, Default)]
48pub(crate) struct PeerSyncHints {
49    /// Replica hints, including the close-group snapshot needed for repair
50    /// proof recording after the request is successfully sent.
51    pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
52    /// Paid-list-only hints for this peer.
53    paid_hints: Vec<XorName>,
54}
55
56/// Build replica hints for a specific peer.
57///
58/// Returns keys that we believe the peer should hold (peer is among the
59/// `CLOSE_GROUP_SIZE` nearest to `K` in our `SelfInclusiveRT`).
60///
61/// This intentionally scans all locally stored records, not only records for
62/// which this node is still responsible. Out-of-range records retained by
63/// prune hysteresis therefore continue to repair their new close group before
64/// this node is allowed to delete them.
65pub async fn build_replica_hints_for_peer(
66    peer: &PeerId,
67    storage: &Arc<LmdbStorage>,
68    p2p_node: &Arc<P2PNode>,
69    close_group_size: usize,
70) -> Vec<XorName> {
71    build_replica_hints_for_peer_with_close_groups(peer, storage, p2p_node, close_group_size)
72        .await
73        .into_iter()
74        .map(|hint| hint.key)
75        .collect()
76}
77
78pub(crate) async fn build_replica_hints_for_peer_with_close_groups(
79    peer: &PeerId,
80    storage: &Arc<LmdbStorage>,
81    p2p_node: &Arc<P2PNode>,
82    close_group_size: usize,
83) -> Vec<SentReplicaHint> {
84    let all_keys = match storage.all_keys().await {
85        Ok(keys) => keys,
86        Err(e) => {
87            warn!("Failed to read stored keys for hint construction: {e}");
88            return Vec::new();
89        }
90    };
91
92    let dht = p2p_node.dht_manager();
93    let mut hints = Vec::new();
94    for key in all_keys {
95        let closest = dht
96            .find_closest_nodes_local_with_self(&key, close_group_size)
97            .await;
98        let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
99        if close_peers.contains(peer) {
100            hints.push(SentReplicaHint { key, close_peers });
101        }
102    }
103    hints
104}
105
106/// Build outbound hint sets for a batch of peers with one scan over local
107/// storage and one scan over the paid list.
108pub(crate) async fn build_sync_hints_for_peers(
109    peers: &[PeerId],
110    storage: &Arc<LmdbStorage>,
111    paid_list: &Arc<PaidList>,
112    p2p_node: &Arc<P2PNode>,
113    close_group_size: usize,
114    paid_list_close_group_size: usize,
115) -> HashMap<PeerId, PeerSyncHints> {
116    let started = Instant::now();
117    let target_peers = peers.iter().copied().collect::<HashSet<_>>();
118    let mut hints_by_peer = peers
119        .iter()
120        .copied()
121        .map(|peer| (peer, PeerSyncHints::default()))
122        .collect::<HashMap<_, _>>();
123
124    if peers.is_empty() {
125        return hints_by_peer;
126    }
127
128    let all_keys = match storage.all_keys().await {
129        Ok(keys) => keys,
130        Err(e) => {
131            warn!("Failed to read stored keys for batch hint construction: {e}");
132            Vec::new()
133        }
134    };
135    let stored_key_count = all_keys.len();
136
137    let dht = p2p_node.dht_manager();
138    for key in all_keys {
139        let closest = dht
140            .find_closest_nodes_local_with_self(&key, close_group_size)
141            .await;
142        let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
143        for peer in close_peers.intersection(&target_peers) {
144            if let Some(peer_hints) = hints_by_peer.get_mut(peer) {
145                peer_hints.sent_replica_hints.push(SentReplicaHint {
146                    key,
147                    close_peers: close_peers.clone(),
148                });
149            }
150        }
151    }
152
153    let all_paid_keys = match paid_list.all_keys() {
154        Ok(keys) => keys,
155        Err(e) => {
156            warn!("Failed to read PaidForList for batch hint construction: {e}");
157            Vec::new()
158        }
159    };
160    let paid_key_count = all_paid_keys.len();
161
162    for key in all_paid_keys {
163        let closest = dht
164            .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
165            .await;
166        for node in closest {
167            if target_peers.contains(&node.peer_id) {
168                if let Some(peer_hints) = hints_by_peer.get_mut(&node.peer_id) {
169                    peer_hints.paid_hints.push(key);
170                }
171            }
172        }
173    }
174
175    let replica_hint_count = hints_by_peer
176        .values()
177        .map(|hints| hints.sent_replica_hints.len())
178        .sum::<usize>();
179    let paid_hint_count = hints_by_peer
180        .values()
181        .map(|hints| hints.paid_hints.len())
182        .sum::<usize>();
183    let elapsed_ms = started.elapsed().as_millis();
184
185    if elapsed_ms >= HINT_BUILD_SLOW_LOG_MS {
186        info!(
187            target: "ant_node::replication::neighbor_sync",
188            "Slow neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
189            peers.len(),
190            stored_key_count,
191            paid_key_count,
192            replica_hint_count,
193            paid_hint_count,
194        );
195    } else {
196        debug!(
197            target: "ant_node::replication::neighbor_sync",
198            "Neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
199            peers.len(),
200            stored_key_count,
201            paid_key_count,
202            replica_hint_count,
203            paid_hint_count,
204        );
205    }
206
207    hints_by_peer
208}
209
210/// Build paid hints for a specific peer.
211///
212/// Returns keys from our `PaidForList` that we believe the peer should
213/// track (peer is among `PAID_LIST_CLOSE_GROUP_SIZE` nearest to `K`).
214pub async fn build_paid_hints_for_peer(
215    peer: &PeerId,
216    paid_list: &Arc<PaidList>,
217    p2p_node: &Arc<P2PNode>,
218    paid_list_close_group_size: usize,
219) -> Vec<XorName> {
220    let all_paid_keys = match paid_list.all_keys() {
221        Ok(keys) => keys,
222        Err(e) => {
223            warn!("Failed to read PaidForList for hint construction: {e}");
224            return Vec::new();
225        }
226    };
227
228    let dht = p2p_node.dht_manager();
229    let mut hints = Vec::new();
230    for key in all_paid_keys {
231        let closest = dht
232            .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
233            .await;
234        if closest.iter().any(|n| n.peer_id == *peer) {
235            hints.push(key);
236        }
237    }
238    hints
239}
240
241/// Take a fresh snapshot of close neighbors for a new round-robin cycle.
242///
243/// Rule 1: Compute `CloseNeighbors(self)` as `NEIGHBOR_SYNC_SCOPE` nearest
244/// peers.
245pub async fn snapshot_close_neighbors(
246    p2p_node: &Arc<P2PNode>,
247    self_id: &PeerId,
248    scope: usize,
249) -> Vec<PeerId> {
250    let self_xor: XorName = *self_id.as_bytes();
251    let closest = p2p_node
252        .dht_manager()
253        .find_closest_nodes_local(&self_xor, scope)
254        .await;
255    closest.iter().map(|n| n.peer_id).collect()
256}
257
258/// Select the next batch of peers for sync from the current cycle.
259///
260/// Priority peers from routing-table churn are drained before the round-robin
261/// cursor. Rules 2-3 then scan forward from cursor, skip peers still under
262/// cooldown, and fill up to `peer_count` slots.
263pub fn select_sync_batch(
264    state: &mut NeighborSyncState,
265    peer_count: usize,
266    cooldown: Duration,
267) -> Vec<PeerId> {
268    let mut batch = Vec::new();
269    let now = Instant::now();
270
271    while batch.len() < peer_count {
272        let Some(peer) = select_next_sync_peer(state, now, cooldown) else {
273            break;
274        };
275        batch.push(peer);
276    }
277
278    batch
279}
280
281fn select_next_sync_peer(
282    state: &mut NeighborSyncState,
283    now: Instant,
284    cooldown: Duration,
285) -> Option<PeerId> {
286    while let Some(peer) = state.priority_order.pop_front() {
287        if peer_on_cooldown(state, &peer, now, cooldown) {
288            state.remove_peer(&peer);
289            continue;
290        }
291
292        state.remove_peer(&peer);
293        return Some(peer);
294    }
295
296    while state.cursor < state.order.len() {
297        let peer = state.order[state.cursor];
298
299        // Check cooldown (Rule 2a): if the peer was synced recently, remove
300        // from the snapshot and continue without advancing the cursor (the
301        // next element slides into the current cursor position).
302        if peer_on_cooldown(state, &peer, now, cooldown) {
303            state.order.remove(state.cursor);
304            continue;
305        }
306
307        state.cursor += 1;
308        return Some(peer);
309    }
310
311    None
312}
313
314fn peer_on_cooldown(
315    state: &NeighborSyncState,
316    peer: &PeerId,
317    now: Instant,
318    cooldown: Duration,
319) -> bool {
320    state
321        .last_sync_times
322        .get(peer)
323        .is_some_and(|last_sync| now.duration_since(*last_sync) < cooldown)
324}
325
326/// Execute a sync session with a single peer.
327///
328/// Returns the response hints if sync succeeded, or `None` if the peer
329/// was unreachable or the response could not be decoded.
330pub async fn sync_with_peer(
331    peer: &PeerId,
332    p2p_node: &Arc<P2PNode>,
333    storage: &Arc<LmdbStorage>,
334    paid_list: &Arc<PaidList>,
335    config: &ReplicationConfig,
336    is_bootstrapping: bool,
337) -> Option<NeighborSyncResponse> {
338    sync_with_peer_with_outcome(
339        peer,
340        p2p_node,
341        storage,
342        paid_list,
343        config,
344        is_bootstrapping,
345        None,
346    )
347    .await
348    .map(|outcome| outcome.response)
349}
350
351/// `commitment`: sender's current commitment to piggyback on the request.
352/// `None` if the responder hasn't rotated one yet (e.g. fresh boot,
353/// empty storage) — receiver falls back to legacy path.
354#[allow(clippy::too_many_arguments)]
355pub(crate) async fn sync_with_peer_with_outcome(
356    peer: &PeerId,
357    p2p_node: &Arc<P2PNode>,
358    storage: &Arc<LmdbStorage>,
359    paid_list: &Arc<PaidList>,
360    config: &ReplicationConfig,
361    is_bootstrapping: bool,
362    commitment: Option<crate::replication::commitment::StorageCommitment>,
363) -> Option<NeighborSyncOutcome> {
364    // Build peer-targeted hint sets (Rule 7).
365    let mut hints_by_peer = build_sync_hints_for_peers(
366        std::slice::from_ref(peer),
367        storage,
368        paid_list,
369        p2p_node,
370        config.close_group_size,
371        config.paid_list_close_group_size,
372    )
373    .await;
374    let hints = hints_by_peer.remove(peer).unwrap_or_default();
375    sync_with_peer_with_hints(peer, p2p_node, config, is_bootstrapping, hints, commitment).await
376}
377
378#[allow(clippy::too_many_arguments)]
379pub(crate) async fn sync_with_peer_with_hints(
380    peer: &PeerId,
381    p2p_node: &Arc<P2PNode>,
382    config: &ReplicationConfig,
383    is_bootstrapping: bool,
384    hints: PeerSyncHints,
385    // ADR-0002: sender's current commitment, piggybacked on the sync request so
386    // the receiver can ingest it (commitment gossip travels on neighbor sync) —
387    // see `NeighborSyncRequest::commitment` and `ingest_peer_commitment`.
388    commitment: Option<crate::replication::commitment::StorageCommitment>,
389) -> Option<NeighborSyncOutcome> {
390    let replica_hints = hints
391        .sent_replica_hints
392        .iter()
393        .map(|hint| hint.key)
394        .collect::<Vec<_>>();
395    let sent_replica_hints = hints.sent_replica_hints;
396
397    let request = NeighborSyncRequest {
398        replica_hints,
399        paid_hints: hints.paid_hints,
400        bootstrapping: is_bootstrapping,
401        commitment,
402    };
403    let request_id = rand::thread_rng().gen::<u64>();
404    let msg = ReplicationMessage {
405        request_id,
406        body: ReplicationMessageBody::NeighborSyncRequest(request),
407    };
408
409    let encoded = match msg.encode() {
410        Ok(data) => data,
411        Err(e) => {
412            warn!("Failed to encode sync request for {peer}: {e}");
413            return None;
414        }
415    };
416
417    let response = match p2p_node
418        .send_request(
419            peer,
420            REPLICATION_PROTOCOL_ID,
421            encoded,
422            config.verification_request_timeout,
423        )
424        .await
425    {
426        Ok(resp) => resp,
427        Err(e) => {
428            debug!("Sync with {peer} failed: {e}");
429            return None;
430        }
431    };
432
433    match ReplicationMessage::decode(&response.data) {
434        Ok(decoded) => {
435            if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
436                Some(NeighborSyncOutcome {
437                    response: resp,
438                    sent_replica_hints,
439                })
440            } else {
441                warn!("Unexpected response type from {peer} during sync");
442                None
443            }
444        }
445        Err(e) => {
446            warn!("Failed to decode sync response from {peer}: {e}");
447            None
448        }
449    }
450}
451
452/// Handle a failed sync attempt: remove peer from snapshot and try to fill
453/// the vacated slot.
454///
455/// Rule 3: Remove unreachable peer from pending sync state, attempt to fill
456/// by using the same priority-first scan as [`select_sync_batch`]. Applies the
457/// same cooldown filtering to avoid selecting a peer that was recently synced.
458pub fn handle_sync_failure(
459    state: &mut NeighborSyncState,
460    failed_peer: &PeerId,
461    cooldown: Duration,
462) -> Option<PeerId> {
463    // Find and remove the failed peer from the ordering.
464    state.remove_peer(failed_peer);
465
466    // Try to fill the vacated slot, applying the same priority and cooldown
467    // filtering used by select_sync_batch.
468    let now = Instant::now();
469    select_next_sync_peer(state, now, cooldown)
470}
471
472/// Record a successful sync with a peer.
473pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
474    state.last_sync_times.insert(*peer, Instant::now());
475}
476
477/// Handle incoming sync request from a peer.
478///
479/// Rules 4-6: Validate peer is in `LocalRT`. If yes, bidirectional sync.
480/// If not, outbound-only (send hints but don't accept inbound).
481///
482/// Returns `(response, sender_in_routing_table)` where the second element
483/// indicates whether the caller should process the sender's inbound hints.
484pub async fn handle_sync_request(
485    sender: &PeerId,
486    request: &NeighborSyncRequest,
487    p2p_node: &Arc<P2PNode>,
488    storage: &Arc<LmdbStorage>,
489    paid_list: &Arc<PaidList>,
490    config: &ReplicationConfig,
491    is_bootstrapping: bool,
492) -> (NeighborSyncResponse, bool) {
493    let (response, _, sender_in_rt) = handle_sync_request_with_proofs(
494        sender,
495        request,
496        p2p_node,
497        storage,
498        paid_list,
499        config,
500        is_bootstrapping,
501        None,
502    )
503    .await;
504    (response, sender_in_rt)
505}
506
507#[allow(clippy::too_many_arguments)]
508pub(crate) async fn handle_sync_request_with_proofs(
509    sender: &PeerId,
510    _request: &NeighborSyncRequest,
511    p2p_node: &Arc<P2PNode>,
512    storage: &Arc<LmdbStorage>,
513    paid_list: &Arc<PaidList>,
514    config: &ReplicationConfig,
515    is_bootstrapping: bool,
516    my_commitment: Option<crate::replication::commitment::StorageCommitment>,
517) -> (NeighborSyncResponse, Vec<SentReplicaHint>, bool) {
518    let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
519
520    // Build outbound hints (always sent, even to non-RT peers).
521    let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
522        sender,
523        storage,
524        p2p_node,
525        config.close_group_size,
526    )
527    .await;
528    let replica_hints = sent_replica_hints
529        .iter()
530        .map(|hint| hint.key)
531        .collect::<Vec<_>>();
532    let paid_hints = build_paid_hints_for_peer(
533        sender,
534        paid_list,
535        p2p_node,
536        config.paid_list_close_group_size,
537    )
538    .await;
539
540    let response = NeighborSyncResponse {
541        replica_hints,
542        paid_hints,
543        bootstrapping: is_bootstrapping,
544        rejected_keys: Vec::new(),
545        commitment: my_commitment,
546    };
547
548    // Rule 4-6: accept inbound hints only if sender is in LocalRT.
549    (response, sent_replica_hints, sender_in_rt)
550}
551
552// ---------------------------------------------------------------------------
553// Tests
554// ---------------------------------------------------------------------------
555
556#[cfg(test)]
557#[allow(clippy::unwrap_used, clippy::expect_used)]
558mod tests {
559    use super::*;
560    use crate::replication::types::PeerSyncRecord;
561    use std::collections::HashMap;
562
563    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
564    fn peer_id_from_byte(b: u8) -> PeerId {
565        let mut bytes = [0u8; 32];
566        bytes[0] = b;
567        PeerId::from_bytes(bytes)
568    }
569
570    // -- select_sync_batch ---------------------------------------------------
571
572    #[test]
573    fn select_sync_batch_returns_up_to_peer_count() {
574        let peers = vec![
575            peer_id_from_byte(1),
576            peer_id_from_byte(2),
577            peer_id_from_byte(3),
578            peer_id_from_byte(4),
579            peer_id_from_byte(5),
580        ];
581        let mut state = NeighborSyncState::new_cycle(peers);
582        let batch_size = 3;
583
584        let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
585
586        assert_eq!(batch.len(), batch_size);
587        assert_eq!(batch[0], peer_id_from_byte(1));
588        assert_eq!(batch[1], peer_id_from_byte(2));
589        assert_eq!(batch[2], peer_id_from_byte(3));
590        assert_eq!(state.cursor, 3);
591    }
592
593    #[test]
594    fn select_sync_batch_skips_cooldown_peers() {
595        let peers = vec![
596            peer_id_from_byte(1),
597            peer_id_from_byte(2),
598            peer_id_from_byte(3),
599            peer_id_from_byte(4),
600        ];
601        let mut state = NeighborSyncState::new_cycle(peers);
602
603        // Mark peer 1 and peer 3 as recently synced.
604        state
605            .last_sync_times
606            .insert(peer_id_from_byte(1), Instant::now());
607        state
608            .last_sync_times
609            .insert(peer_id_from_byte(3), Instant::now());
610
611        let cooldown = Duration::from_secs(3600); // 1 hour
612        let batch = select_sync_batch(&mut state, 2, cooldown);
613
614        // Peer 1 and peer 3 should be skipped (removed from order).
615        assert_eq!(batch.len(), 2);
616        assert_eq!(batch[0], peer_id_from_byte(2));
617        assert_eq!(batch[1], peer_id_from_byte(4));
618
619        // Cooldown peers should have been removed from the order.
620        assert!(!state.order.contains(&peer_id_from_byte(1)));
621        assert!(!state.order.contains(&peer_id_from_byte(3)));
622    }
623
624    #[test]
625    fn select_sync_batch_expired_cooldown_not_skipped() {
626        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
627        let mut state = NeighborSyncState::new_cycle(peers);
628
629        // Mark peer 1 as synced a long time ago (simulate expired cooldown).
630        // Use a small subtraction (2s) and a smaller cooldown (1s) to avoid
631        // `checked_sub` returning `None` on freshly-booted CI runners where
632        // `Instant::now()` (system uptime) may be very small.
633        state.last_sync_times.insert(
634            peer_id_from_byte(1),
635            Instant::now()
636                .checked_sub(Duration::from_secs(2))
637                .unwrap_or_else(Instant::now),
638        );
639
640        let cooldown = Duration::from_secs(1);
641        let batch = select_sync_batch(&mut state, 2, cooldown);
642
643        // Peer 1's cooldown expired so it should be included.
644        assert_eq!(batch.len(), 2);
645        assert_eq!(batch[0], peer_id_from_byte(1));
646        assert_eq!(batch[1], peer_id_from_byte(2));
647    }
648
649    #[test]
650    fn select_sync_batch_empty_order() {
651        let mut state = NeighborSyncState::new_cycle(vec![]);
652
653        let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
654
655        assert!(batch.is_empty());
656        assert_eq!(state.cursor, 0);
657    }
658
659    #[test]
660    fn select_sync_batch_all_on_cooldown() {
661        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
662        let mut state = NeighborSyncState::new_cycle(peers);
663
664        state
665            .last_sync_times
666            .insert(peer_id_from_byte(1), Instant::now());
667        state
668            .last_sync_times
669            .insert(peer_id_from_byte(2), Instant::now());
670
671        let cooldown = Duration::from_secs(3600);
672        let batch = select_sync_batch(&mut state, 4, cooldown);
673
674        assert!(batch.is_empty());
675        assert!(state.order.is_empty());
676    }
677
678    // -- handle_sync_failure -------------------------------------------------
679
680    #[test]
681    fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
682        let peers = vec![
683            peer_id_from_byte(1),
684            peer_id_from_byte(2),
685            peer_id_from_byte(3),
686            peer_id_from_byte(4),
687        ];
688        let mut state = NeighborSyncState::new_cycle(peers);
689        // Simulate having already processed peers at indices 0 and 1.
690        state.cursor = 2;
691
692        // Peer 2 (index 1, before cursor) fails.
693        let replacement =
694            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
695
696        // Cursor should be adjusted down by 1 (was 2, now 1).
697        assert_eq!(state.cursor, 2); // was 2, removed at pos 1, adjusted to 1, then replacement advances to 2
698        assert!(!state.order.contains(&peer_id_from_byte(2)));
699
700        // Should get peer 4 as replacement (index 1 after removal = peer 3,
701        // but cursor was adjusted to 1 so peer 3 is at index 1; it returns
702        // the peer at the new cursor and advances).
703        assert!(replacement.is_some());
704    }
705
706    #[test]
707    fn handle_sync_failure_removes_peer_after_cursor() {
708        let peers = vec![
709            peer_id_from_byte(1),
710            peer_id_from_byte(2),
711            peer_id_from_byte(3),
712            peer_id_from_byte(4),
713        ];
714        let mut state = NeighborSyncState::new_cycle(peers);
715        state.cursor = 1;
716
717        // Peer 3 (index 2, after cursor) fails.
718        let replacement =
719            handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
720
721        // Cursor should stay at 1 (removal was after cursor).
722        assert_eq!(state.cursor, 2); // cursor was 1, replacement advances to 2
723        assert!(!state.order.contains(&peer_id_from_byte(3)));
724
725        // Replacement should be peer 2 (now at cursor position 1).
726        assert_eq!(replacement, Some(peer_id_from_byte(2)));
727    }
728
729    #[test]
730    fn handle_sync_failure_no_replacement_when_exhausted() {
731        let peers = vec![peer_id_from_byte(1)];
732        let mut state = NeighborSyncState::new_cycle(peers);
733        state.cursor = 1; // Already past the only peer.
734
735        let replacement =
736            handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
737
738        assert!(state.order.is_empty());
739        assert!(replacement.is_none());
740    }
741
742    #[test]
743    fn handle_sync_failure_unknown_peer_is_noop() {
744        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
745        let mut state = NeighborSyncState::new_cycle(peers);
746        state.cursor = 1;
747
748        let replacement =
749            handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
750
751        // Order should be unchanged.
752        assert_eq!(state.order.len(), 2);
753        // Still tries to fill from cursor.
754        assert_eq!(replacement, Some(peer_id_from_byte(2)));
755        assert_eq!(state.cursor, 2);
756    }
757
758    // -- record_successful_sync ----------------------------------------------
759
760    #[test]
761    fn record_successful_sync_updates_last_sync_time() {
762        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
763        let mut state = NeighborSyncState::new_cycle(peers);
764        let peer = peer_id_from_byte(1);
765
766        assert!(!state.last_sync_times.contains_key(&peer));
767
768        let before = Instant::now();
769        record_successful_sync(&mut state, &peer);
770        let after = Instant::now();
771
772        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
773        assert!(*ts >= before);
774        assert!(*ts <= after);
775    }
776
777    #[test]
778    fn record_successful_sync_overwrites_previous() {
779        let peers = vec![peer_id_from_byte(1)];
780        let mut state = NeighborSyncState::new_cycle(peers);
781        let peer = peer_id_from_byte(1);
782
783        // Record a sync at an old time. Use a small subtraction to avoid
784        // `checked_sub` returning `None` on freshly-booted CI runners.
785        let old_time = Instant::now()
786            .checked_sub(Duration::from_secs(2))
787            .unwrap_or_else(Instant::now);
788        state.last_sync_times.insert(peer, old_time);
789
790        record_successful_sync(&mut state, &peer);
791
792        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
793        assert!(*ts > old_time, "sync time should be updated");
794    }
795
796    // -- Section 18: Neighbor sync scenarios --------------------------------
797
798    #[test]
799    fn scenario_35_round_robin_with_cooldown_skip() {
800        // With >PEER_COUNT eligible peers, consecutive rounds scan forward
801        // from cursor, skip cooldown peers, sync next batch.
802        // Create 8 peers, mark peers 2,4 on cooldown.
803        // First batch of 4: peers 1,3,5,6 (2,4 skipped and removed).
804        // Second batch of 4: peers 7,8 (only 2 remain).
805        // Cycle should complete after second batch.
806        let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
807        let mut state = NeighborSyncState::new_cycle(peers);
808        let batch_size = 4;
809        let cooldown = Duration::from_secs(3600);
810
811        // Mark peers 2 and 4 as recently synced (on cooldown).
812        state
813            .last_sync_times
814            .insert(peer_id_from_byte(2), Instant::now());
815        state
816            .last_sync_times
817            .insert(peer_id_from_byte(4), Instant::now());
818
819        // First batch: scan from cursor 0. Peers 2 and 4 are removed,
820        // leaving [1,3,5,6,7,8]. We pick the first 4: [1,3,5,6].
821        let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
822        assert_eq!(batch1.len(), 4);
823        assert_eq!(batch1[0], peer_id_from_byte(1));
824        assert_eq!(batch1[1], peer_id_from_byte(3));
825        assert_eq!(batch1[2], peer_id_from_byte(5));
826        assert_eq!(batch1[3], peer_id_from_byte(6));
827
828        // Cooldown peers should have been removed from the order.
829        assert!(!state.order.contains(&peer_id_from_byte(2)));
830        assert!(!state.order.contains(&peer_id_from_byte(4)));
831
832        // Second batch: only peers 7,8 remain after cursor.
833        let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
834        assert_eq!(batch2.len(), 2);
835        assert_eq!(batch2[0], peer_id_from_byte(7));
836        assert_eq!(batch2[1], peer_id_from_byte(8));
837
838        // Cycle should be complete after second batch.
839        assert!(state.is_cycle_complete());
840    }
841
842    #[test]
843    fn cycle_complete_when_cursor_past_order() {
844        // is_cycle_complete() returns true when cursor >= order.len().
845        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
846        let mut state = NeighborSyncState::new_cycle(peers);
847
848        // Not complete at the start.
849        assert!(!state.is_cycle_complete());
850
851        // Advance cursor to exactly order.len().
852        state.cursor = 3;
853        assert!(state.is_cycle_complete());
854
855        // Also complete when cursor exceeds order.len().
856        state.cursor = 10;
857        assert!(state.is_cycle_complete());
858
859        // Edge case: order is emptied (peers removed) with cursor at 0.
860        state.order.clear();
861        state.cursor = 0;
862        assert!(state.is_cycle_complete());
863    }
864
865    /// Scenario 36: Post-cycle responsibility pruning with time-based
866    /// hysteresis.
867    ///
868    /// When a full round-robin cycle completes, node runs one prune pass
869    /// over BOTH stored records and `PaidForList` entries using current
870    /// `SelfInclusiveRT`. Out-of-range items have timestamps recorded but
871    /// are deleted only after `PRUNE_HYSTERESIS_DURATION`. In-range items
872    /// have their timestamps cleared.
873    ///
874    /// Full `run_prune_pass` requires a live `P2PNode`. This test verifies
875    /// the deterministic trigger condition (cycle completion) and the
876    /// combined record + paid-list pruning contract:
877    ///   (1) Cycle completes -> prune pass should run.
878    ///   (2) Both `RecordOutOfRangeFirstSeen` and `PaidOutOfRangeFirstSeen`
879    ///       are tracked independently in the same pass.
880    ///   (3) Keys within hysteresis window are retained.
881    #[test]
882    fn scenario_36_post_cycle_triggers_combined_prune_pass() {
883        let config = ReplicationConfig::default();
884
885        // Step 1: Run a full cycle to completion.
886        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
887        let mut state = NeighborSyncState::new_cycle(peers);
888        let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
889        assert!(
890            state.is_cycle_complete(),
891            "cycle must be complete before prune pass triggers"
892        );
893
894        // Step 2: Verify prune hysteresis parameters are configured.
895        assert!(
896            !config.prune_hysteresis_duration.is_zero(),
897            "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
898        );
899
900        // Step 3: Simulate the prune-pass timestamp tracking for BOTH
901        // record and paid-list entries (the two independent timestamp
902        // families that Section 11 requires in a single pass).
903        //
904        // Record timestamps and paid timestamps are independent — clearing
905        // one must not affect the other (tested in scenario_52). Here we
906        // verify the combined trigger: cycle completion -> both kinds of
907        // timestamps are eligible for evaluation.
908        let record_key: [u8; 32] = [0x36; 32];
909        let paid_key: [u8; 32] = [0x37; 32];
910
911        // Simulate: record_key goes out of range, paid_key goes out of range.
912        let record_first_seen = Instant::now();
913        let paid_first_seen = Instant::now();
914
915        // Both timestamps are recent — well within hysteresis window.
916        let record_elapsed = record_first_seen.elapsed();
917        let paid_elapsed = paid_first_seen.elapsed();
918        assert!(
919            record_elapsed < config.prune_hysteresis_duration,
920            "record key should be retained within hysteresis window"
921        );
922        assert!(
923            paid_elapsed < config.prune_hysteresis_duration,
924            "paid key should be retained within hysteresis window"
925        );
926
927        // The prune pass evaluates both independently. Verify they don't
928        // interfere by using separate keys.
929        assert_ne!(
930            record_key, paid_key,
931            "record and paid pruning keys must be independent"
932        );
933
934        // Step 4: After the cycle, a new snapshot is taken and cursor resets.
935        let new_state = NeighborSyncState::new_cycle(vec![
936            peer_id_from_byte(1),
937            peer_id_from_byte(2),
938            peer_id_from_byte(3),
939        ]);
940        assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
941        assert!(
942            !new_state.is_cycle_complete(),
943            "new cycle should not be immediately complete"
944        );
945    }
946
947    #[test]
948    fn scenario_38_mid_cycle_peer_join_prioritized() {
949        // Peer D joins CloseNeighbors mid-cycle. It is queued for priority
950        // sync without being inserted into the current round-robin snapshot.
951        let peers = vec![
952            peer_id_from_byte(0xA),
953            peer_id_from_byte(0xB),
954            peer_id_from_byte(0xC),
955        ];
956        let mut state = NeighborSyncState::new_cycle(peers);
957
958        // Advance cursor to simulate mid-cycle.
959        let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
960        assert_eq!(state.cursor, 1);
961
962        // Peer D "joins" the close-neighbor set. It remains outside the
963        // stable snapshot but is queued ahead of the cursor.
964        let peer_d = peer_id_from_byte(0xD);
965        assert_eq!(state.queue_priority_peers([peer_d]), 1);
966        assert!(!state.order.contains(&peer_d));
967        assert!(
968            !state.is_cycle_complete(),
969            "pending priority sync keeps the cycle active"
970        );
971
972        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
973        assert_eq!(batch, vec![peer_d, peer_id_from_byte(0xB)]);
974    }
975
976    #[test]
977    fn scenario_39_unreachable_peer_removed_slot_filled() {
978        // Peer P is in snapshot. Sync fails. P removed from order.
979        // Node resumes scanning and picks next peer Q to fill the slot.
980        let peers = vec![
981            peer_id_from_byte(1),
982            peer_id_from_byte(2),
983            peer_id_from_byte(3),
984            peer_id_from_byte(4),
985            peer_id_from_byte(5),
986        ];
987        let mut state = NeighborSyncState::new_cycle(peers);
988
989        // First batch selects peers 1,2.
990        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
991        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
992
993        // Peer 2 becomes unreachable. Remove it and fill the slot.
994        let replacement =
995            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
996        assert!(!state.order.contains(&peer_id_from_byte(2)));
997
998        // Slot should be filled by the next available peer (peer 3).
999        assert_eq!(
1000            replacement,
1001            Some(peer_id_from_byte(3)),
1002            "vacated slot should be filled by next peer in order"
1003        );
1004
1005        // Continue: next batch should resume from after the replacement.
1006        let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1007        assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
1008        assert!(state.is_cycle_complete());
1009    }
1010
1011    #[test]
1012    fn scenario_40_cooldown_peer_removed_from_snapshot() {
1013        // Peer synced within cooldown period. When batch selection reaches it,
1014        // peer is REMOVED from order (not just skipped). Scanning continues to
1015        // next peer.
1016        let peers = vec![
1017            peer_id_from_byte(1),
1018            peer_id_from_byte(2),
1019            peer_id_from_byte(3),
1020        ];
1021        let mut state = NeighborSyncState::new_cycle(peers);
1022        let cooldown = Duration::from_secs(3600);
1023
1024        // Mark peer 2 as recently synced.
1025        state
1026            .last_sync_times
1027            .insert(peer_id_from_byte(2), Instant::now());
1028
1029        let batch = select_sync_batch(&mut state, 3, cooldown);
1030
1031        // Peer 2 should have been REMOVED from order, not just skipped.
1032        assert!(!state.order.contains(&peer_id_from_byte(2)));
1033        assert_eq!(state.order.len(), 2, "order should shrink by 1");
1034
1035        // Batch contains the non-cooldown peers.
1036        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
1037
1038        // Cycle is complete since all remaining peers were selected.
1039        assert!(state.is_cycle_complete());
1040    }
1041
1042    #[test]
1043    fn priority_peer_in_snapshot_is_not_selected_twice() {
1044        let peers = vec![
1045            peer_id_from_byte(1),
1046            peer_id_from_byte(2),
1047            peer_id_from_byte(3),
1048        ];
1049        let mut state = NeighborSyncState::new_cycle(peers);
1050        assert_eq!(state.queue_priority_peers([peer_id_from_byte(2)]), 1);
1051
1052        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1053
1054        assert_eq!(batch, vec![peer_id_from_byte(2), peer_id_from_byte(1)]);
1055        assert_eq!(
1056            state.order,
1057            vec![peer_id_from_byte(1), peer_id_from_byte(3)]
1058        );
1059        assert_eq!(state.cursor, 1);
1060    }
1061
1062    #[test]
1063    fn priority_peer_on_cooldown_is_skipped_and_removed_from_snapshot() {
1064        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1065        let mut state = NeighborSyncState::new_cycle(peers);
1066        let cooldown = Duration::from_secs(1);
1067        let priority_peer = peer_id_from_byte(2);
1068        state.last_sync_times.insert(priority_peer, Instant::now());
1069        assert_eq!(state.queue_priority_peers([priority_peer]), 1);
1070
1071        let batch = select_sync_batch(&mut state, 2, cooldown);
1072
1073        assert_eq!(batch, vec![peer_id_from_byte(1)]);
1074        assert!(state.priority_order.is_empty());
1075        assert!(!state.order.contains(&priority_peer));
1076    }
1077
1078    #[test]
1079    fn failure_replacement_prefers_remaining_priority_peer() {
1080        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1081        let mut state = NeighborSyncState::new_cycle(peers);
1082        let first_priority_peer = peer_id_from_byte(3);
1083        let second_priority_peer = peer_id_from_byte(4);
1084        assert_eq!(
1085            state.queue_priority_peers([first_priority_peer, second_priority_peer]),
1086            2
1087        );
1088
1089        let batch = select_sync_batch(&mut state, 1, Duration::from_secs(0));
1090        assert_eq!(batch, vec![first_priority_peer]);
1091
1092        let replacement =
1093            handle_sync_failure(&mut state, &first_priority_peer, Duration::from_secs(0));
1094
1095        assert_eq!(replacement, Some(second_priority_peer));
1096        assert!(state.priority_order.is_empty());
1097        assert_eq!(state.cursor, 0);
1098    }
1099
1100    #[test]
1101    fn scenario_41_cycle_always_terminates() {
1102        // Under arbitrary cooldowns and removals, cycle always terminates.
1103        // Create 10 peers. Mark ALL on cooldown. select_sync_batch
1104        // should remove all and return empty. Cycle complete.
1105        let peer_count: u8 = 10;
1106        let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
1107        let mut state = NeighborSyncState::new_cycle(peers);
1108        let cooldown = Duration::from_secs(3600);
1109
1110        // Mark all peers as recently synced.
1111        for i in 1..=peer_count {
1112            state
1113                .last_sync_times
1114                .insert(peer_id_from_byte(i), Instant::now());
1115        }
1116
1117        let batch = select_sync_batch(&mut state, 4, cooldown);
1118
1119        assert!(
1120            batch.is_empty(),
1121            "all peers on cooldown — batch must be empty"
1122        );
1123        assert!(state.order.is_empty(), "all peers should have been removed");
1124        assert!(
1125            state.is_cycle_complete(),
1126            "cycle must terminate when all peers are removed"
1127        );
1128    }
1129
1130    #[test]
1131    fn consecutive_rounds_advance_through_full_cycle() {
1132        // 6 peers, batch_size=2, no cooldowns.
1133        // Round 1: peers 0,1. Round 2: peers 2,3. Round 3: peers 4,5.
1134        // After round 3: cycle complete.
1135        let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
1136        let mut state = NeighborSyncState::new_cycle(peers);
1137        let batch_size = 2;
1138        let no_cooldown = Duration::from_secs(0);
1139
1140        let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
1141        assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
1142        assert_eq!(state.cursor, 2);
1143        assert!(!state.is_cycle_complete());
1144
1145        let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
1146        assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
1147        assert_eq!(state.cursor, 4);
1148        assert!(!state.is_cycle_complete());
1149
1150        let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
1151        assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
1152        assert_eq!(state.cursor, 6);
1153        assert!(state.is_cycle_complete());
1154
1155        // Extra call after cycle complete returns empty.
1156        let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
1157        assert!(round4.is_empty());
1158    }
1159
1160    /// Scenario 37: Non-`LocalRT` inbound sync behavior.
1161    ///
1162    /// When a peer not in `LocalRT(self)` opens a sync session:
1163    /// - Receiver STILL builds and sends outbound hints (response always
1164    ///   constructed via `handle_sync_request`).
1165    /// - Receiver drops ALL inbound replica/paid hints from that peer
1166    ///   (caller returns early in `mod.rs:handle_neighbor_sync_request`
1167    ///   when `sender_in_rt` is false).
1168    /// - Sync history is NOT updated for non-RT peers, so no
1169    ///   `RepairOpportunity` is created.
1170    ///
1171    /// Full integration requires a live `P2PNode` (`handle_sync_request`
1172    /// calls `is_in_routing_table`). This test verifies the deterministic
1173    /// contract:
1174    ///   (1) `NeighborSyncResponse` is always constructed regardless of
1175    ///       sender RT membership (outbound hints sent).
1176    ///   (2) When `sender_in_rt` is false, no admission runs and sync
1177    ///       history is not updated.
1178    ///   (3) When `sender_in_rt` is true, sync history IS updated and
1179    ///       inbound hints enter the admission pipeline.
1180    #[test]
1181    fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
1182        let sender = peer_id_from_byte(0x37);
1183
1184        // Simulate what handle_sync_request always builds: outbound hints
1185        // in the response, regardless of whether sender is in LocalRT.
1186        let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
1187        let outbound_paid_hints = vec![[0x03; 32]];
1188        let response = NeighborSyncResponse {
1189            replica_hints: outbound_replica_hints.clone(),
1190            paid_hints: outbound_paid_hints.clone(),
1191            bootstrapping: false,
1192            rejected_keys: Vec::new(),
1193            commitment: None,
1194        };
1195
1196        // Inbound hints from the sender (would be in the request).
1197        let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
1198        let inbound_paid_hints = vec![[0xB0; 32]];
1199
1200        // --- Case 1: sender NOT in LocalRT (sender_in_rt = false) ---
1201        let sender_in_rt = false;
1202        let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
1203
1204        // Response is still built — outbound hints are sent.
1205        assert_eq!(
1206            response.replica_hints, outbound_replica_hints,
1207            "outbound replica hints must be sent even when sender is not in LocalRT"
1208        );
1209        assert_eq!(
1210            response.paid_hints, outbound_paid_hints,
1211            "outbound paid hints must be sent even when sender is not in LocalRT"
1212        );
1213
1214        // Caller checks sender_in_rt and returns early. No admission runs.
1215        if !sender_in_rt {
1216            // This is the early-return path in mod.rs:964-966.
1217            // Inbound hints are never processed.
1218            let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
1219            let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
1220
1221            for key in &inbound_replica_hints {
1222                assert!(
1223                    !admitted_replica_keys.contains(key),
1224                    "inbound replica hints must NOT be admitted from non-RT sender"
1225                );
1226            }
1227            for key in &inbound_paid_hints {
1228                assert!(
1229                    !admitted_paid_keys.contains(key),
1230                    "inbound paid hints must NOT be admitted from non-RT sender"
1231                );
1232            }
1233
1234            // Sync history is NOT updated for non-RT peers.
1235            assert!(
1236                !sync_history.contains_key(&sender),
1237                "sync history must NOT be updated for non-LocalRT sender"
1238            );
1239        }
1240
1241        // --- Case 2: sender IS in LocalRT (sender_in_rt = true) ---
1242        let sender_in_rt = true;
1243        assert!(
1244            sender_in_rt,
1245            "when sender is in LocalRT, inbound hints are processed"
1246        );
1247
1248        // Sync history IS updated for RT peers.
1249        sync_history.insert(
1250            sender,
1251            PeerSyncRecord {
1252                last_sync: Some(Instant::now()),
1253                cycles_since_sync: 0,
1254            },
1255        );
1256        assert!(
1257            sync_history.contains_key(&sender),
1258            "sync history should be updated for LocalRT sender"
1259        );
1260        assert!(
1261            sync_history
1262                .get(&sender)
1263                .expect("sender in history")
1264                .last_sync
1265                .is_some(),
1266            "last_sync should be recorded for RT sender"
1267        );
1268    }
1269
1270    #[test]
1271    fn cycle_completion_resets_cursor_but_keeps_sync_times() {
1272        // Verify that after cycle completes, starting a new cycle
1273        // preserves the last_sync_times from the old state.
1274        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1275        let mut state = NeighborSyncState::new_cycle(peers);
1276
1277        // Sync both peers and record their times.
1278        let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1279        record_successful_sync(&mut state, &peer_id_from_byte(1));
1280        record_successful_sync(&mut state, &peer_id_from_byte(2));
1281        assert!(state.is_cycle_complete());
1282
1283        // Capture sync times before "resetting" for a new cycle.
1284        let old_sync_times = state.last_sync_times.clone();
1285        assert_eq!(old_sync_times.len(), 2);
1286
1287        // Simulate starting a new cycle: create fresh state but carry over
1288        // last_sync_times (as the real driver would).
1289        let new_peers = vec![
1290            peer_id_from_byte(1),
1291            peer_id_from_byte(2),
1292            peer_id_from_byte(3),
1293        ];
1294        let mut new_state = NeighborSyncState::new_cycle(new_peers);
1295        new_state.last_sync_times = old_sync_times;
1296
1297        // Cursor is reset.
1298        assert_eq!(new_state.cursor, 0);
1299        assert!(!new_state.is_cycle_complete());
1300
1301        // Sync times are preserved.
1302        assert_eq!(new_state.last_sync_times.len(), 2);
1303        assert!(new_state
1304            .last_sync_times
1305            .contains_key(&peer_id_from_byte(1)));
1306        assert!(new_state
1307            .last_sync_times
1308            .contains_key(&peer_id_from_byte(2)));
1309
1310        // The preserved cooldowns cause peers 1,2 to be removed, leaving
1311        // only peer 3 selected.
1312        let cooldown = Duration::from_secs(3600);
1313        let batch = select_sync_batch(&mut new_state, 3, cooldown);
1314        assert_eq!(
1315            batch,
1316            std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1317            "only the new peer should be selected; old peers are on cooldown"
1318        );
1319    }
1320}