Skip to main content

ant_node/replication/
neighbor_sync.rs

1//! Neighbor replication sync (Section 6.2).
2//!
3//! Round-robin cycle management: snapshot close neighbors, iterate through
4//! them in batches of `NEIGHBOR_SYNC_PEER_COUNT`, exchanging hint sets.
5
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use crate::logging::{debug, warn};
10use rand::Rng;
11use saorsa_core::identity::PeerId;
12use saorsa_core::P2PNode;
13
14use crate::ant_protocol::XorName;
15use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
16use crate::replication::paid_list::PaidList;
17use crate::replication::protocol::{
18    NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
19};
20use crate::replication::types::NeighborSyncState;
21use crate::storage::LmdbStorage;
22
23/// Build replica hints for a specific peer.
24///
25/// Returns keys that we believe the peer should hold (peer is among the
26/// `CLOSE_GROUP_SIZE` nearest to `K` in our `SelfInclusiveRT`).
27///
28/// This intentionally scans all locally stored records, not only records for
29/// which this node is still responsible. Out-of-range records retained by
30/// prune hysteresis therefore continue to repair their new close group before
31/// this node is allowed to delete them.
32pub async fn build_replica_hints_for_peer(
33    peer: &PeerId,
34    storage: &Arc<LmdbStorage>,
35    p2p_node: &Arc<P2PNode>,
36    close_group_size: usize,
37) -> Vec<XorName> {
38    let all_keys = match storage.all_keys().await {
39        Ok(keys) => keys,
40        Err(e) => {
41            warn!("Failed to read stored keys for hint construction: {e}");
42            return Vec::new();
43        }
44    };
45
46    let dht = p2p_node.dht_manager();
47    let mut hints = Vec::new();
48    for key in all_keys {
49        let closest = dht
50            .find_closest_nodes_local_with_self(&key, close_group_size)
51            .await;
52        if closest.iter().any(|n| n.peer_id == *peer) {
53            hints.push(key);
54        }
55    }
56    hints
57}
58
59/// Build paid hints for a specific peer.
60///
61/// Returns keys from our `PaidForList` that we believe the peer should
62/// track (peer is among `PAID_LIST_CLOSE_GROUP_SIZE` nearest to `K`).
63pub async fn build_paid_hints_for_peer(
64    peer: &PeerId,
65    paid_list: &Arc<PaidList>,
66    p2p_node: &Arc<P2PNode>,
67    paid_list_close_group_size: usize,
68) -> Vec<XorName> {
69    let all_paid_keys = match paid_list.all_keys() {
70        Ok(keys) => keys,
71        Err(e) => {
72            warn!("Failed to read PaidForList for hint construction: {e}");
73            return Vec::new();
74        }
75    };
76
77    let dht = p2p_node.dht_manager();
78    let mut hints = Vec::new();
79    for key in all_paid_keys {
80        let closest = dht
81            .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
82            .await;
83        if closest.iter().any(|n| n.peer_id == *peer) {
84            hints.push(key);
85        }
86    }
87    hints
88}
89
90/// Take a fresh snapshot of close neighbors for a new round-robin cycle.
91///
92/// Rule 1: Compute `CloseNeighbors(self)` as `NEIGHBOR_SYNC_SCOPE` nearest
93/// peers.
94pub async fn snapshot_close_neighbors(
95    p2p_node: &Arc<P2PNode>,
96    self_id: &PeerId,
97    scope: usize,
98) -> Vec<PeerId> {
99    let self_xor: XorName = *self_id.as_bytes();
100    let closest = p2p_node
101        .dht_manager()
102        .find_closest_nodes_local(&self_xor, scope)
103        .await;
104    closest.iter().map(|n| n.peer_id).collect()
105}
106
107/// Select the next batch of peers for sync from the current cycle.
108///
109/// Rules 2-3: Scan forward from cursor, skip peers still under cooldown,
110/// fill up to `peer_count` slots.
111pub fn select_sync_batch(
112    state: &mut NeighborSyncState,
113    peer_count: usize,
114    cooldown: Duration,
115) -> Vec<PeerId> {
116    let mut batch = Vec::new();
117    let now = Instant::now();
118
119    while batch.len() < peer_count && state.cursor < state.order.len() {
120        let peer = state.order[state.cursor];
121
122        // Check cooldown (Rule 2a): if the peer was synced recently, remove
123        // from the snapshot and continue without advancing the cursor (the
124        // next element slides into the current cursor position).
125        if let Some(last_sync) = state.last_sync_times.get(&peer) {
126            if now.duration_since(*last_sync) < cooldown {
127                state.order.remove(state.cursor);
128                continue;
129            }
130        }
131
132        batch.push(peer);
133        state.cursor += 1;
134    }
135
136    batch
137}
138
139/// Execute a sync session with a single peer.
140///
141/// Returns the response hints if sync succeeded, or `None` if the peer
142/// was unreachable or the response could not be decoded.
143pub async fn sync_with_peer(
144    peer: &PeerId,
145    p2p_node: &Arc<P2PNode>,
146    storage: &Arc<LmdbStorage>,
147    paid_list: &Arc<PaidList>,
148    config: &ReplicationConfig,
149    is_bootstrapping: bool,
150) -> Option<NeighborSyncResponse> {
151    // Build peer-targeted hint sets (Rule 7).
152    let replica_hints =
153        build_replica_hints_for_peer(peer, storage, p2p_node, config.close_group_size).await;
154    let paid_hints =
155        build_paid_hints_for_peer(peer, paid_list, p2p_node, config.paid_list_close_group_size)
156            .await;
157
158    let request = NeighborSyncRequest {
159        replica_hints,
160        paid_hints,
161        bootstrapping: is_bootstrapping,
162    };
163    let request_id = rand::thread_rng().gen::<u64>();
164    let msg = ReplicationMessage {
165        request_id,
166        body: ReplicationMessageBody::NeighborSyncRequest(request),
167    };
168
169    let encoded = match msg.encode() {
170        Ok(data) => data,
171        Err(e) => {
172            warn!("Failed to encode sync request for {peer}: {e}");
173            return None;
174        }
175    };
176
177    let response = match p2p_node
178        .send_request(
179            peer,
180            REPLICATION_PROTOCOL_ID,
181            encoded,
182            config.verification_request_timeout,
183        )
184        .await
185    {
186        Ok(resp) => resp,
187        Err(e) => {
188            debug!("Sync with {peer} failed: {e}");
189            return None;
190        }
191    };
192
193    match ReplicationMessage::decode(&response.data) {
194        Ok(decoded) => {
195            if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
196                Some(resp)
197            } else {
198                warn!("Unexpected response type from {peer} during sync");
199                None
200            }
201        }
202        Err(e) => {
203            warn!("Failed to decode sync response from {peer}: {e}");
204            None
205        }
206    }
207}
208
209/// Handle a failed sync attempt: remove peer from snapshot and try to fill
210/// the vacated slot.
211///
212/// Rule 3: Remove unreachable peer from `NeighborSyncOrder`, attempt to fill
213/// by resuming scan from where rule 2 left off. Applies the same cooldown
214/// filtering as [`select_sync_batch`] to avoid selecting a peer that was
215/// recently synced.
216pub fn handle_sync_failure(
217    state: &mut NeighborSyncState,
218    failed_peer: &PeerId,
219    cooldown: Duration,
220) -> Option<PeerId> {
221    // Find and remove the failed peer from the ordering.
222    if let Some(pos) = state.order.iter().position(|p| p == failed_peer) {
223        state.order.remove(pos);
224        // Adjust cursor if removal was before the current cursor position.
225        if pos < state.cursor {
226            state.cursor = state.cursor.saturating_sub(1);
227        }
228    }
229
230    // Try to fill the vacated slot, applying cooldown filtering (same as
231    // select_sync_batch Rule 2a).
232    let now = Instant::now();
233    while state.cursor < state.order.len() {
234        let candidate = state.order[state.cursor];
235
236        if let Some(last_sync) = state.last_sync_times.get(&candidate) {
237            if now.duration_since(*last_sync) < cooldown {
238                state.order.remove(state.cursor);
239                continue;
240            }
241        }
242
243        state.cursor += 1;
244        return Some(candidate);
245    }
246
247    None
248}
249
250/// Record a successful sync with a peer.
251pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
252    state.last_sync_times.insert(*peer, Instant::now());
253}
254
255/// Handle incoming sync request from a peer.
256///
257/// Rules 4-6: Validate peer is in `LocalRT`. If yes, bidirectional sync.
258/// If not, outbound-only (send hints but don't accept inbound).
259///
260/// Returns `(response, sender_in_routing_table)` where the second element
261/// indicates whether the caller should process the sender's inbound hints.
262pub async fn handle_sync_request(
263    sender: &PeerId,
264    _request: &NeighborSyncRequest,
265    p2p_node: &Arc<P2PNode>,
266    storage: &Arc<LmdbStorage>,
267    paid_list: &Arc<PaidList>,
268    config: &ReplicationConfig,
269    is_bootstrapping: bool,
270) -> (NeighborSyncResponse, bool) {
271    let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
272
273    // Build outbound hints (always sent, even to non-RT peers).
274    let replica_hints =
275        build_replica_hints_for_peer(sender, storage, p2p_node, config.close_group_size).await;
276    let paid_hints = build_paid_hints_for_peer(
277        sender,
278        paid_list,
279        p2p_node,
280        config.paid_list_close_group_size,
281    )
282    .await;
283
284    let response = NeighborSyncResponse {
285        replica_hints,
286        paid_hints,
287        bootstrapping: is_bootstrapping,
288        rejected_keys: Vec::new(),
289    };
290
291    // Rule 4-6: accept inbound hints only if sender is in LocalRT.
292    (response, sender_in_rt)
293}
294
295// ---------------------------------------------------------------------------
296// Tests
297// ---------------------------------------------------------------------------
298
299#[cfg(test)]
300#[allow(clippy::unwrap_used, clippy::expect_used)]
301mod tests {
302    use super::*;
303    use crate::replication::types::PeerSyncRecord;
304    use std::collections::HashMap;
305
306    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
307    fn peer_id_from_byte(b: u8) -> PeerId {
308        let mut bytes = [0u8; 32];
309        bytes[0] = b;
310        PeerId::from_bytes(bytes)
311    }
312
313    // -- select_sync_batch ---------------------------------------------------
314
315    #[test]
316    fn select_sync_batch_returns_up_to_peer_count() {
317        let peers = vec![
318            peer_id_from_byte(1),
319            peer_id_from_byte(2),
320            peer_id_from_byte(3),
321            peer_id_from_byte(4),
322            peer_id_from_byte(5),
323        ];
324        let mut state = NeighborSyncState::new_cycle(peers);
325        let batch_size = 3;
326
327        let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
328
329        assert_eq!(batch.len(), batch_size);
330        assert_eq!(batch[0], peer_id_from_byte(1));
331        assert_eq!(batch[1], peer_id_from_byte(2));
332        assert_eq!(batch[2], peer_id_from_byte(3));
333        assert_eq!(state.cursor, 3);
334    }
335
336    #[test]
337    fn select_sync_batch_skips_cooldown_peers() {
338        let peers = vec![
339            peer_id_from_byte(1),
340            peer_id_from_byte(2),
341            peer_id_from_byte(3),
342            peer_id_from_byte(4),
343        ];
344        let mut state = NeighborSyncState::new_cycle(peers);
345
346        // Mark peer 1 and peer 3 as recently synced.
347        state
348            .last_sync_times
349            .insert(peer_id_from_byte(1), Instant::now());
350        state
351            .last_sync_times
352            .insert(peer_id_from_byte(3), Instant::now());
353
354        let cooldown = Duration::from_secs(3600); // 1 hour
355        let batch = select_sync_batch(&mut state, 2, cooldown);
356
357        // Peer 1 and peer 3 should be skipped (removed from order).
358        assert_eq!(batch.len(), 2);
359        assert_eq!(batch[0], peer_id_from_byte(2));
360        assert_eq!(batch[1], peer_id_from_byte(4));
361
362        // Cooldown peers should have been removed from the order.
363        assert!(!state.order.contains(&peer_id_from_byte(1)));
364        assert!(!state.order.contains(&peer_id_from_byte(3)));
365    }
366
367    #[test]
368    fn select_sync_batch_expired_cooldown_not_skipped() {
369        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
370        let mut state = NeighborSyncState::new_cycle(peers);
371
372        // Mark peer 1 as synced a long time ago (simulate expired cooldown).
373        // Use a small subtraction (2s) and a smaller cooldown (1s) to avoid
374        // `checked_sub` returning `None` on freshly-booted CI runners where
375        // `Instant::now()` (system uptime) may be very small.
376        state.last_sync_times.insert(
377            peer_id_from_byte(1),
378            Instant::now()
379                .checked_sub(Duration::from_secs(2))
380                .unwrap_or_else(Instant::now),
381        );
382
383        let cooldown = Duration::from_secs(1);
384        let batch = select_sync_batch(&mut state, 2, cooldown);
385
386        // Peer 1's cooldown expired so it should be included.
387        assert_eq!(batch.len(), 2);
388        assert_eq!(batch[0], peer_id_from_byte(1));
389        assert_eq!(batch[1], peer_id_from_byte(2));
390    }
391
392    #[test]
393    fn select_sync_batch_empty_order() {
394        let mut state = NeighborSyncState::new_cycle(vec![]);
395
396        let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
397
398        assert!(batch.is_empty());
399        assert_eq!(state.cursor, 0);
400    }
401
402    #[test]
403    fn select_sync_batch_all_on_cooldown() {
404        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
405        let mut state = NeighborSyncState::new_cycle(peers);
406
407        state
408            .last_sync_times
409            .insert(peer_id_from_byte(1), Instant::now());
410        state
411            .last_sync_times
412            .insert(peer_id_from_byte(2), Instant::now());
413
414        let cooldown = Duration::from_secs(3600);
415        let batch = select_sync_batch(&mut state, 4, cooldown);
416
417        assert!(batch.is_empty());
418        assert!(state.order.is_empty());
419    }
420
421    // -- handle_sync_failure -------------------------------------------------
422
423    #[test]
424    fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
425        let peers = vec![
426            peer_id_from_byte(1),
427            peer_id_from_byte(2),
428            peer_id_from_byte(3),
429            peer_id_from_byte(4),
430        ];
431        let mut state = NeighborSyncState::new_cycle(peers);
432        // Simulate having already processed peers at indices 0 and 1.
433        state.cursor = 2;
434
435        // Peer 2 (index 1, before cursor) fails.
436        let replacement =
437            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
438
439        // Cursor should be adjusted down by 1 (was 2, now 1).
440        assert_eq!(state.cursor, 2); // was 2, removed at pos 1, adjusted to 1, then replacement advances to 2
441        assert!(!state.order.contains(&peer_id_from_byte(2)));
442
443        // Should get peer 4 as replacement (index 1 after removal = peer 3,
444        // but cursor was adjusted to 1 so peer 3 is at index 1; it returns
445        // the peer at the new cursor and advances).
446        assert!(replacement.is_some());
447    }
448
449    #[test]
450    fn handle_sync_failure_removes_peer_after_cursor() {
451        let peers = vec![
452            peer_id_from_byte(1),
453            peer_id_from_byte(2),
454            peer_id_from_byte(3),
455            peer_id_from_byte(4),
456        ];
457        let mut state = NeighborSyncState::new_cycle(peers);
458        state.cursor = 1;
459
460        // Peer 3 (index 2, after cursor) fails.
461        let replacement =
462            handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
463
464        // Cursor should stay at 1 (removal was after cursor).
465        assert_eq!(state.cursor, 2); // cursor was 1, replacement advances to 2
466        assert!(!state.order.contains(&peer_id_from_byte(3)));
467
468        // Replacement should be peer 2 (now at cursor position 1).
469        assert_eq!(replacement, Some(peer_id_from_byte(2)));
470    }
471
472    #[test]
473    fn handle_sync_failure_no_replacement_when_exhausted() {
474        let peers = vec![peer_id_from_byte(1)];
475        let mut state = NeighborSyncState::new_cycle(peers);
476        state.cursor = 1; // Already past the only peer.
477
478        let replacement =
479            handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
480
481        assert!(state.order.is_empty());
482        assert!(replacement.is_none());
483    }
484
485    #[test]
486    fn handle_sync_failure_unknown_peer_is_noop() {
487        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
488        let mut state = NeighborSyncState::new_cycle(peers);
489        state.cursor = 1;
490
491        let replacement =
492            handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
493
494        // Order should be unchanged.
495        assert_eq!(state.order.len(), 2);
496        // Still tries to fill from cursor.
497        assert_eq!(replacement, Some(peer_id_from_byte(2)));
498        assert_eq!(state.cursor, 2);
499    }
500
501    // -- record_successful_sync ----------------------------------------------
502
503    #[test]
504    fn record_successful_sync_updates_last_sync_time() {
505        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
506        let mut state = NeighborSyncState::new_cycle(peers);
507        let peer = peer_id_from_byte(1);
508
509        assert!(!state.last_sync_times.contains_key(&peer));
510
511        let before = Instant::now();
512        record_successful_sync(&mut state, &peer);
513        let after = Instant::now();
514
515        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
516        assert!(*ts >= before);
517        assert!(*ts <= after);
518    }
519
520    #[test]
521    fn record_successful_sync_overwrites_previous() {
522        let peers = vec![peer_id_from_byte(1)];
523        let mut state = NeighborSyncState::new_cycle(peers);
524        let peer = peer_id_from_byte(1);
525
526        // Record a sync at an old time. Use a small subtraction to avoid
527        // `checked_sub` returning `None` on freshly-booted CI runners.
528        let old_time = Instant::now()
529            .checked_sub(Duration::from_secs(2))
530            .unwrap_or_else(Instant::now);
531        state.last_sync_times.insert(peer, old_time);
532
533        record_successful_sync(&mut state, &peer);
534
535        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
536        assert!(*ts > old_time, "sync time should be updated");
537    }
538
539    // -- Section 18: Neighbor sync scenarios --------------------------------
540
541    #[test]
542    fn scenario_35_round_robin_with_cooldown_skip() {
543        // With >PEER_COUNT eligible peers, consecutive rounds scan forward
544        // from cursor, skip cooldown peers, sync next batch.
545        // Create 8 peers, mark peers 2,4 on cooldown.
546        // First batch of 4: peers 1,3,5,6 (2,4 skipped and removed).
547        // Second batch of 4: peers 7,8 (only 2 remain).
548        // Cycle should complete after second batch.
549        let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
550        let mut state = NeighborSyncState::new_cycle(peers);
551        let batch_size = 4;
552        let cooldown = Duration::from_secs(3600);
553
554        // Mark peers 2 and 4 as recently synced (on cooldown).
555        state
556            .last_sync_times
557            .insert(peer_id_from_byte(2), Instant::now());
558        state
559            .last_sync_times
560            .insert(peer_id_from_byte(4), Instant::now());
561
562        // First batch: scan from cursor 0. Peers 2 and 4 are removed,
563        // leaving [1,3,5,6,7,8]. We pick the first 4: [1,3,5,6].
564        let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
565        assert_eq!(batch1.len(), 4);
566        assert_eq!(batch1[0], peer_id_from_byte(1));
567        assert_eq!(batch1[1], peer_id_from_byte(3));
568        assert_eq!(batch1[2], peer_id_from_byte(5));
569        assert_eq!(batch1[3], peer_id_from_byte(6));
570
571        // Cooldown peers should have been removed from the order.
572        assert!(!state.order.contains(&peer_id_from_byte(2)));
573        assert!(!state.order.contains(&peer_id_from_byte(4)));
574
575        // Second batch: only peers 7,8 remain after cursor.
576        let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
577        assert_eq!(batch2.len(), 2);
578        assert_eq!(batch2[0], peer_id_from_byte(7));
579        assert_eq!(batch2[1], peer_id_from_byte(8));
580
581        // Cycle should be complete after second batch.
582        assert!(state.is_cycle_complete());
583    }
584
585    #[test]
586    fn cycle_complete_when_cursor_past_order() {
587        // is_cycle_complete() returns true when cursor >= order.len().
588        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
589        let mut state = NeighborSyncState::new_cycle(peers);
590
591        // Not complete at the start.
592        assert!(!state.is_cycle_complete());
593
594        // Advance cursor to exactly order.len().
595        state.cursor = 3;
596        assert!(state.is_cycle_complete());
597
598        // Also complete when cursor exceeds order.len().
599        state.cursor = 10;
600        assert!(state.is_cycle_complete());
601
602        // Edge case: order is emptied (peers removed) with cursor at 0.
603        state.order.clear();
604        state.cursor = 0;
605        assert!(state.is_cycle_complete());
606    }
607
608    /// Scenario 36: Post-cycle responsibility pruning with time-based
609    /// hysteresis.
610    ///
611    /// When a full round-robin cycle completes, node runs one prune pass
612    /// over BOTH stored records and `PaidForList` entries using current
613    /// `SelfInclusiveRT`. Out-of-range items have timestamps recorded but
614    /// are deleted only after `PRUNE_HYSTERESIS_DURATION`. In-range items
615    /// have their timestamps cleared.
616    ///
617    /// Full `run_prune_pass` requires a live `P2PNode`. This test verifies
618    /// the deterministic trigger condition (cycle completion) and the
619    /// combined record + paid-list pruning contract:
620    ///   (1) Cycle completes -> prune pass should run.
621    ///   (2) Both `RecordOutOfRangeFirstSeen` and `PaidOutOfRangeFirstSeen`
622    ///       are tracked independently in the same pass.
623    ///   (3) Keys within hysteresis window are retained.
624    #[test]
625    fn scenario_36_post_cycle_triggers_combined_prune_pass() {
626        let config = ReplicationConfig::default();
627
628        // Step 1: Run a full cycle to completion.
629        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
630        let mut state = NeighborSyncState::new_cycle(peers);
631        let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
632        assert!(
633            state.is_cycle_complete(),
634            "cycle must be complete before prune pass triggers"
635        );
636
637        // Step 2: Verify prune hysteresis parameters are configured.
638        assert!(
639            !config.prune_hysteresis_duration.is_zero(),
640            "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
641        );
642
643        // Step 3: Simulate the prune-pass timestamp tracking for BOTH
644        // record and paid-list entries (the two independent timestamp
645        // families that Section 11 requires in a single pass).
646        //
647        // Record timestamps and paid timestamps are independent — clearing
648        // one must not affect the other (tested in scenario_52). Here we
649        // verify the combined trigger: cycle completion -> both kinds of
650        // timestamps are eligible for evaluation.
651        let record_key: [u8; 32] = [0x36; 32];
652        let paid_key: [u8; 32] = [0x37; 32];
653
654        // Simulate: record_key goes out of range, paid_key goes out of range.
655        let record_first_seen = Instant::now();
656        let paid_first_seen = Instant::now();
657
658        // Both timestamps are recent — well within hysteresis window.
659        let record_elapsed = record_first_seen.elapsed();
660        let paid_elapsed = paid_first_seen.elapsed();
661        assert!(
662            record_elapsed < config.prune_hysteresis_duration,
663            "record key should be retained within hysteresis window"
664        );
665        assert!(
666            paid_elapsed < config.prune_hysteresis_duration,
667            "paid key should be retained within hysteresis window"
668        );
669
670        // The prune pass evaluates both independently. Verify they don't
671        // interfere by using separate keys.
672        assert_ne!(
673            record_key, paid_key,
674            "record and paid pruning keys must be independent"
675        );
676
677        // Step 4: After the cycle, a new snapshot is taken and cursor resets.
678        let new_state = NeighborSyncState::new_cycle(vec![
679            peer_id_from_byte(1),
680            peer_id_from_byte(2),
681            peer_id_from_byte(3),
682        ]);
683        assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
684        assert!(
685            !new_state.is_cycle_complete(),
686            "new cycle should not be immediately complete"
687        );
688    }
689
690    #[test]
691    fn scenario_38_mid_cycle_peer_join_excluded() {
692        // Peer D joins CloseNeighbors mid-cycle.
693        // D should NOT appear in the current NeighborSyncOrder snapshot.
694        // After cycle completes and a new snapshot is taken, D can be included.
695        let peers = vec![
696            peer_id_from_byte(0xA),
697            peer_id_from_byte(0xB),
698            peer_id_from_byte(0xC),
699        ];
700        let mut state = NeighborSyncState::new_cycle(peers);
701
702        // Advance cursor to simulate mid-cycle.
703        let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
704        assert_eq!(state.cursor, 1);
705
706        // Peer D "joins" the network. It should NOT be in the current snapshot.
707        let peer_d = peer_id_from_byte(0xD);
708        assert!(
709            !state.order.contains(&peer_d),
710            "mid-cycle joiner must not appear in the current snapshot"
711        );
712
713        // Complete the current cycle.
714        let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
715        assert!(state.is_cycle_complete());
716
717        // New cycle: now D can be included in the fresh snapshot.
718        let new_peers = vec![
719            peer_id_from_byte(0xA),
720            peer_id_from_byte(0xB),
721            peer_id_from_byte(0xC),
722            peer_d,
723        ];
724        let new_state = NeighborSyncState::new_cycle(new_peers);
725        assert!(
726            new_state.order.contains(&peer_d),
727            "after new snapshot, joiner D should be present"
728        );
729    }
730
731    #[test]
732    fn scenario_39_unreachable_peer_removed_slot_filled() {
733        // Peer P is in snapshot. Sync fails. P removed from order.
734        // Node resumes scanning and picks next peer Q to fill the slot.
735        let peers = vec![
736            peer_id_from_byte(1),
737            peer_id_from_byte(2),
738            peer_id_from_byte(3),
739            peer_id_from_byte(4),
740            peer_id_from_byte(5),
741        ];
742        let mut state = NeighborSyncState::new_cycle(peers);
743
744        // First batch selects peers 1,2.
745        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
746        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
747
748        // Peer 2 becomes unreachable. Remove it and fill the slot.
749        let replacement =
750            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
751        assert!(!state.order.contains(&peer_id_from_byte(2)));
752
753        // Slot should be filled by the next available peer (peer 3).
754        assert_eq!(
755            replacement,
756            Some(peer_id_from_byte(3)),
757            "vacated slot should be filled by next peer in order"
758        );
759
760        // Continue: next batch should resume from after the replacement.
761        let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
762        assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
763        assert!(state.is_cycle_complete());
764    }
765
766    #[test]
767    fn scenario_40_cooldown_peer_removed_from_snapshot() {
768        // Peer synced within cooldown period. When batch selection reaches it,
769        // peer is REMOVED from order (not just skipped). Scanning continues to
770        // next peer.
771        let peers = vec![
772            peer_id_from_byte(1),
773            peer_id_from_byte(2),
774            peer_id_from_byte(3),
775        ];
776        let mut state = NeighborSyncState::new_cycle(peers);
777        let cooldown = Duration::from_secs(3600);
778
779        // Mark peer 2 as recently synced.
780        state
781            .last_sync_times
782            .insert(peer_id_from_byte(2), Instant::now());
783
784        let batch = select_sync_batch(&mut state, 3, cooldown);
785
786        // Peer 2 should have been REMOVED from order, not just skipped.
787        assert!(!state.order.contains(&peer_id_from_byte(2)));
788        assert_eq!(state.order.len(), 2, "order should shrink by 1");
789
790        // Batch contains the non-cooldown peers.
791        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
792
793        // Cycle is complete since all remaining peers were selected.
794        assert!(state.is_cycle_complete());
795    }
796
797    #[test]
798    fn scenario_41_cycle_always_terminates() {
799        // Under arbitrary cooldowns and removals, cycle always terminates.
800        // Create 10 peers. Mark ALL on cooldown. select_sync_batch
801        // should remove all and return empty. Cycle complete.
802        let peer_count: u8 = 10;
803        let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
804        let mut state = NeighborSyncState::new_cycle(peers);
805        let cooldown = Duration::from_secs(3600);
806
807        // Mark all peers as recently synced.
808        for i in 1..=peer_count {
809            state
810                .last_sync_times
811                .insert(peer_id_from_byte(i), Instant::now());
812        }
813
814        let batch = select_sync_batch(&mut state, 4, cooldown);
815
816        assert!(
817            batch.is_empty(),
818            "all peers on cooldown — batch must be empty"
819        );
820        assert!(state.order.is_empty(), "all peers should have been removed");
821        assert!(
822            state.is_cycle_complete(),
823            "cycle must terminate when all peers are removed"
824        );
825    }
826
827    #[test]
828    fn consecutive_rounds_advance_through_full_cycle() {
829        // 6 peers, batch_size=2, no cooldowns.
830        // Round 1: peers 0,1. Round 2: peers 2,3. Round 3: peers 4,5.
831        // After round 3: cycle complete.
832        let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
833        let mut state = NeighborSyncState::new_cycle(peers);
834        let batch_size = 2;
835        let no_cooldown = Duration::from_secs(0);
836
837        let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
838        assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
839        assert_eq!(state.cursor, 2);
840        assert!(!state.is_cycle_complete());
841
842        let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
843        assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
844        assert_eq!(state.cursor, 4);
845        assert!(!state.is_cycle_complete());
846
847        let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
848        assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
849        assert_eq!(state.cursor, 6);
850        assert!(state.is_cycle_complete());
851
852        // Extra call after cycle complete returns empty.
853        let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
854        assert!(round4.is_empty());
855    }
856
857    /// Scenario 37: Non-`LocalRT` inbound sync behavior.
858    ///
859    /// When a peer not in `LocalRT(self)` opens a sync session:
860    /// - Receiver STILL builds and sends outbound hints (response always
861    ///   constructed via `handle_sync_request`).
862    /// - Receiver drops ALL inbound replica/paid hints from that peer
863    ///   (caller returns early in `mod.rs:handle_neighbor_sync_request`
864    ///   when `sender_in_rt` is false).
865    /// - Sync history is NOT updated for non-RT peers, so no
866    ///   `RepairOpportunity` is created.
867    ///
868    /// Full integration requires a live `P2PNode` (`handle_sync_request`
869    /// calls `is_in_routing_table`). This test verifies the deterministic
870    /// contract:
871    ///   (1) `NeighborSyncResponse` is always constructed regardless of
872    ///       sender RT membership (outbound hints sent).
873    ///   (2) When `sender_in_rt` is false, no admission runs and sync
874    ///       history is not updated.
875    ///   (3) When `sender_in_rt` is true, sync history IS updated and
876    ///       inbound hints enter the admission pipeline.
877    #[test]
878    fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
879        let sender = peer_id_from_byte(0x37);
880
881        // Simulate what handle_sync_request always builds: outbound hints
882        // in the response, regardless of whether sender is in LocalRT.
883        let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
884        let outbound_paid_hints = vec![[0x03; 32]];
885        let response = NeighborSyncResponse {
886            replica_hints: outbound_replica_hints.clone(),
887            paid_hints: outbound_paid_hints.clone(),
888            bootstrapping: false,
889            rejected_keys: Vec::new(),
890        };
891
892        // Inbound hints from the sender (would be in the request).
893        let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
894        let inbound_paid_hints = vec![[0xB0; 32]];
895
896        // --- Case 1: sender NOT in LocalRT (sender_in_rt = false) ---
897        let sender_in_rt = false;
898        let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
899
900        // Response is still built — outbound hints are sent.
901        assert_eq!(
902            response.replica_hints, outbound_replica_hints,
903            "outbound replica hints must be sent even when sender is not in LocalRT"
904        );
905        assert_eq!(
906            response.paid_hints, outbound_paid_hints,
907            "outbound paid hints must be sent even when sender is not in LocalRT"
908        );
909
910        // Caller checks sender_in_rt and returns early. No admission runs.
911        if !sender_in_rt {
912            // This is the early-return path in mod.rs:964-966.
913            // Inbound hints are never processed.
914            let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
915            let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
916
917            for key in &inbound_replica_hints {
918                assert!(
919                    !admitted_replica_keys.contains(key),
920                    "inbound replica hints must NOT be admitted from non-RT sender"
921                );
922            }
923            for key in &inbound_paid_hints {
924                assert!(
925                    !admitted_paid_keys.contains(key),
926                    "inbound paid hints must NOT be admitted from non-RT sender"
927                );
928            }
929
930            // Sync history is NOT updated for non-RT peers.
931            assert!(
932                !sync_history.contains_key(&sender),
933                "sync history must NOT be updated for non-LocalRT sender"
934            );
935        }
936
937        // --- Case 2: sender IS in LocalRT (sender_in_rt = true) ---
938        let sender_in_rt = true;
939        assert!(
940            sender_in_rt,
941            "when sender is in LocalRT, inbound hints are processed"
942        );
943
944        // Sync history IS updated for RT peers.
945        sync_history.insert(
946            sender,
947            PeerSyncRecord {
948                last_sync: Some(Instant::now()),
949                cycles_since_sync: 0,
950            },
951        );
952        assert!(
953            sync_history.contains_key(&sender),
954            "sync history should be updated for LocalRT sender"
955        );
956        assert!(
957            sync_history
958                .get(&sender)
959                .expect("sender in history")
960                .last_sync
961                .is_some(),
962            "last_sync should be recorded for RT sender"
963        );
964    }
965
966    #[test]
967    fn cycle_completion_resets_cursor_but_keeps_sync_times() {
968        // Verify that after cycle completes, starting a new cycle
969        // preserves the last_sync_times from the old state.
970        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
971        let mut state = NeighborSyncState::new_cycle(peers);
972
973        // Sync both peers and record their times.
974        let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
975        record_successful_sync(&mut state, &peer_id_from_byte(1));
976        record_successful_sync(&mut state, &peer_id_from_byte(2));
977        assert!(state.is_cycle_complete());
978
979        // Capture sync times before "resetting" for a new cycle.
980        let old_sync_times = state.last_sync_times.clone();
981        assert_eq!(old_sync_times.len(), 2);
982
983        // Simulate starting a new cycle: create fresh state but carry over
984        // last_sync_times (as the real driver would).
985        let new_peers = vec![
986            peer_id_from_byte(1),
987            peer_id_from_byte(2),
988            peer_id_from_byte(3),
989        ];
990        let mut new_state = NeighborSyncState::new_cycle(new_peers);
991        new_state.last_sync_times = old_sync_times;
992
993        // Cursor is reset.
994        assert_eq!(new_state.cursor, 0);
995        assert!(!new_state.is_cycle_complete());
996
997        // Sync times are preserved.
998        assert_eq!(new_state.last_sync_times.len(), 2);
999        assert!(new_state
1000            .last_sync_times
1001            .contains_key(&peer_id_from_byte(1)));
1002        assert!(new_state
1003            .last_sync_times
1004            .contains_key(&peer_id_from_byte(2)));
1005
1006        // The preserved cooldowns cause peers 1,2 to be removed, leaving
1007        // only peer 3 selected.
1008        let cooldown = Duration::from_secs(3600);
1009        let batch = select_sync_batch(&mut new_state, 3, cooldown);
1010        assert_eq!(
1011            batch,
1012            std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1013            "only the new peer should be selected; old peers are on cooldown"
1014        );
1015    }
1016}