Skip to main content

ant_node/replication/
neighbor_sync.rs

1//! Neighbor replication sync (Section 6.2).
2//!
3//! Round-robin cycle management: snapshot close neighbors, iterate through
4//! them in batches of `NEIGHBOR_SYNC_PEER_COUNT`, exchanging hint sets.
5
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use crate::logging::{debug, warn};
10use rand::Rng;
11use saorsa_core::identity::PeerId;
12use saorsa_core::P2PNode;
13
14use crate::ant_protocol::XorName;
15use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
16use crate::replication::paid_list::PaidList;
17use crate::replication::protocol::{
18    NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
19};
20use crate::replication::types::NeighborSyncState;
21use crate::storage::LmdbStorage;
22
23/// Build replica hints for a specific peer.
24///
25/// Returns keys that we believe the peer should hold (peer is among the
26/// `CLOSE_GROUP_SIZE` nearest to `K` in our `SelfInclusiveRT`).
27pub async fn build_replica_hints_for_peer(
28    peer: &PeerId,
29    storage: &Arc<LmdbStorage>,
30    p2p_node: &Arc<P2PNode>,
31    close_group_size: usize,
32) -> Vec<XorName> {
33    let all_keys = match storage.all_keys().await {
34        Ok(keys) => keys,
35        Err(e) => {
36            warn!("Failed to read stored keys for hint construction: {e}");
37            return Vec::new();
38        }
39    };
40
41    let dht = p2p_node.dht_manager();
42    let mut hints = Vec::new();
43    for key in all_keys {
44        let closest = dht
45            .find_closest_nodes_local_with_self(&key, close_group_size)
46            .await;
47        if closest.iter().any(|n| n.peer_id == *peer) {
48            hints.push(key);
49        }
50    }
51    hints
52}
53
54/// Build paid hints for a specific peer.
55///
56/// Returns keys from our `PaidForList` that we believe the peer should
57/// track (peer is among `PAID_LIST_CLOSE_GROUP_SIZE` nearest to `K`).
58pub async fn build_paid_hints_for_peer(
59    peer: &PeerId,
60    paid_list: &Arc<PaidList>,
61    p2p_node: &Arc<P2PNode>,
62    paid_list_close_group_size: usize,
63) -> Vec<XorName> {
64    let all_paid_keys = match paid_list.all_keys() {
65        Ok(keys) => keys,
66        Err(e) => {
67            warn!("Failed to read PaidForList for hint construction: {e}");
68            return Vec::new();
69        }
70    };
71
72    let dht = p2p_node.dht_manager();
73    let mut hints = Vec::new();
74    for key in all_paid_keys {
75        let closest = dht
76            .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
77            .await;
78        if closest.iter().any(|n| n.peer_id == *peer) {
79            hints.push(key);
80        }
81    }
82    hints
83}
84
85/// Take a fresh snapshot of close neighbors for a new round-robin cycle.
86///
87/// Rule 1: Compute `CloseNeighbors(self)` as `NEIGHBOR_SYNC_SCOPE` nearest
88/// peers.
89pub async fn snapshot_close_neighbors(
90    p2p_node: &Arc<P2PNode>,
91    self_id: &PeerId,
92    scope: usize,
93) -> Vec<PeerId> {
94    let self_xor: XorName = *self_id.as_bytes();
95    let closest = p2p_node
96        .dht_manager()
97        .find_closest_nodes_local(&self_xor, scope)
98        .await;
99    closest.iter().map(|n| n.peer_id).collect()
100}
101
102/// Select the next batch of peers for sync from the current cycle.
103///
104/// Rules 2-3: Scan forward from cursor, skip peers still under cooldown,
105/// fill up to `peer_count` slots.
106pub fn select_sync_batch(
107    state: &mut NeighborSyncState,
108    peer_count: usize,
109    cooldown: Duration,
110) -> Vec<PeerId> {
111    let mut batch = Vec::new();
112    let now = Instant::now();
113
114    while batch.len() < peer_count && state.cursor < state.order.len() {
115        let peer = state.order[state.cursor];
116
117        // Check cooldown (Rule 2a): if the peer was synced recently, remove
118        // from the snapshot and continue without advancing the cursor (the
119        // next element slides into the current cursor position).
120        if let Some(last_sync) = state.last_sync_times.get(&peer) {
121            if now.duration_since(*last_sync) < cooldown {
122                state.order.remove(state.cursor);
123                continue;
124            }
125        }
126
127        batch.push(peer);
128        state.cursor += 1;
129    }
130
131    batch
132}
133
134/// Execute a sync session with a single peer.
135///
136/// Returns the response hints if sync succeeded, or `None` if the peer
137/// was unreachable or the response could not be decoded.
138pub async fn sync_with_peer(
139    peer: &PeerId,
140    p2p_node: &Arc<P2PNode>,
141    storage: &Arc<LmdbStorage>,
142    paid_list: &Arc<PaidList>,
143    config: &ReplicationConfig,
144    is_bootstrapping: bool,
145) -> Option<NeighborSyncResponse> {
146    // Build peer-targeted hint sets (Rule 7).
147    let replica_hints =
148        build_replica_hints_for_peer(peer, storage, p2p_node, config.close_group_size).await;
149    let paid_hints =
150        build_paid_hints_for_peer(peer, paid_list, p2p_node, config.paid_list_close_group_size)
151            .await;
152
153    let request = NeighborSyncRequest {
154        replica_hints,
155        paid_hints,
156        bootstrapping: is_bootstrapping,
157    };
158    let request_id = rand::thread_rng().gen::<u64>();
159    let msg = ReplicationMessage {
160        request_id,
161        body: ReplicationMessageBody::NeighborSyncRequest(request),
162    };
163
164    let encoded = match msg.encode() {
165        Ok(data) => data,
166        Err(e) => {
167            warn!("Failed to encode sync request for {peer}: {e}");
168            return None;
169        }
170    };
171
172    let response = match p2p_node
173        .send_request(
174            peer,
175            REPLICATION_PROTOCOL_ID,
176            encoded,
177            config.verification_request_timeout,
178        )
179        .await
180    {
181        Ok(resp) => resp,
182        Err(e) => {
183            debug!("Sync with {peer} failed: {e}");
184            return None;
185        }
186    };
187
188    match ReplicationMessage::decode(&response.data) {
189        Ok(decoded) => {
190            if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
191                Some(resp)
192            } else {
193                warn!("Unexpected response type from {peer} during sync");
194                None
195            }
196        }
197        Err(e) => {
198            warn!("Failed to decode sync response from {peer}: {e}");
199            None
200        }
201    }
202}
203
204/// Handle a failed sync attempt: remove peer from snapshot and try to fill
205/// the vacated slot.
206///
207/// Rule 3: Remove unreachable peer from `NeighborSyncOrder`, attempt to fill
208/// by resuming scan from where rule 2 left off. Applies the same cooldown
209/// filtering as [`select_sync_batch`] to avoid selecting a peer that was
210/// recently synced.
211pub fn handle_sync_failure(
212    state: &mut NeighborSyncState,
213    failed_peer: &PeerId,
214    cooldown: Duration,
215) -> Option<PeerId> {
216    // Find and remove the failed peer from the ordering.
217    if let Some(pos) = state.order.iter().position(|p| p == failed_peer) {
218        state.order.remove(pos);
219        // Adjust cursor if removal was before the current cursor position.
220        if pos < state.cursor {
221            state.cursor = state.cursor.saturating_sub(1);
222        }
223    }
224
225    // Try to fill the vacated slot, applying cooldown filtering (same as
226    // select_sync_batch Rule 2a).
227    let now = Instant::now();
228    while state.cursor < state.order.len() {
229        let candidate = state.order[state.cursor];
230
231        if let Some(last_sync) = state.last_sync_times.get(&candidate) {
232            if now.duration_since(*last_sync) < cooldown {
233                state.order.remove(state.cursor);
234                continue;
235            }
236        }
237
238        state.cursor += 1;
239        return Some(candidate);
240    }
241
242    None
243}
244
245/// Record a successful sync with a peer.
246pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
247    state.last_sync_times.insert(*peer, Instant::now());
248}
249
250/// Handle incoming sync request from a peer.
251///
252/// Rules 4-6: Validate peer is in `LocalRT`. If yes, bidirectional sync.
253/// If not, outbound-only (send hints but don't accept inbound).
254///
255/// Returns `(response, sender_in_routing_table)` where the second element
256/// indicates whether the caller should process the sender's inbound hints.
257pub async fn handle_sync_request(
258    sender: &PeerId,
259    _request: &NeighborSyncRequest,
260    p2p_node: &Arc<P2PNode>,
261    storage: &Arc<LmdbStorage>,
262    paid_list: &Arc<PaidList>,
263    config: &ReplicationConfig,
264    is_bootstrapping: bool,
265) -> (NeighborSyncResponse, bool) {
266    let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
267
268    // Build outbound hints (always sent, even to non-RT peers).
269    let replica_hints =
270        build_replica_hints_for_peer(sender, storage, p2p_node, config.close_group_size).await;
271    let paid_hints = build_paid_hints_for_peer(
272        sender,
273        paid_list,
274        p2p_node,
275        config.paid_list_close_group_size,
276    )
277    .await;
278
279    let response = NeighborSyncResponse {
280        replica_hints,
281        paid_hints,
282        bootstrapping: is_bootstrapping,
283        rejected_keys: Vec::new(),
284    };
285
286    // Rule 4-6: accept inbound hints only if sender is in LocalRT.
287    (response, sender_in_rt)
288}
289
290// ---------------------------------------------------------------------------
291// Tests
292// ---------------------------------------------------------------------------
293
294#[cfg(test)]
295#[allow(clippy::unwrap_used, clippy::expect_used)]
296mod tests {
297    use super::*;
298    use crate::replication::types::PeerSyncRecord;
299    use std::collections::HashMap;
300
301    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
302    fn peer_id_from_byte(b: u8) -> PeerId {
303        let mut bytes = [0u8; 32];
304        bytes[0] = b;
305        PeerId::from_bytes(bytes)
306    }
307
308    // -- select_sync_batch ---------------------------------------------------
309
310    #[test]
311    fn select_sync_batch_returns_up_to_peer_count() {
312        let peers = vec![
313            peer_id_from_byte(1),
314            peer_id_from_byte(2),
315            peer_id_from_byte(3),
316            peer_id_from_byte(4),
317            peer_id_from_byte(5),
318        ];
319        let mut state = NeighborSyncState::new_cycle(peers);
320        let batch_size = 3;
321
322        let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
323
324        assert_eq!(batch.len(), batch_size);
325        assert_eq!(batch[0], peer_id_from_byte(1));
326        assert_eq!(batch[1], peer_id_from_byte(2));
327        assert_eq!(batch[2], peer_id_from_byte(3));
328        assert_eq!(state.cursor, 3);
329    }
330
331    #[test]
332    fn select_sync_batch_skips_cooldown_peers() {
333        let peers = vec![
334            peer_id_from_byte(1),
335            peer_id_from_byte(2),
336            peer_id_from_byte(3),
337            peer_id_from_byte(4),
338        ];
339        let mut state = NeighborSyncState::new_cycle(peers);
340
341        // Mark peer 1 and peer 3 as recently synced.
342        state
343            .last_sync_times
344            .insert(peer_id_from_byte(1), Instant::now());
345        state
346            .last_sync_times
347            .insert(peer_id_from_byte(3), Instant::now());
348
349        let cooldown = Duration::from_secs(3600); // 1 hour
350        let batch = select_sync_batch(&mut state, 2, cooldown);
351
352        // Peer 1 and peer 3 should be skipped (removed from order).
353        assert_eq!(batch.len(), 2);
354        assert_eq!(batch[0], peer_id_from_byte(2));
355        assert_eq!(batch[1], peer_id_from_byte(4));
356
357        // Cooldown peers should have been removed from the order.
358        assert!(!state.order.contains(&peer_id_from_byte(1)));
359        assert!(!state.order.contains(&peer_id_from_byte(3)));
360    }
361
362    #[test]
363    fn select_sync_batch_expired_cooldown_not_skipped() {
364        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
365        let mut state = NeighborSyncState::new_cycle(peers);
366
367        // Mark peer 1 as synced a long time ago (simulate expired cooldown).
368        // Use a small subtraction (2s) and a smaller cooldown (1s) to avoid
369        // `checked_sub` returning `None` on freshly-booted CI runners where
370        // `Instant::now()` (system uptime) may be very small.
371        state.last_sync_times.insert(
372            peer_id_from_byte(1),
373            Instant::now()
374                .checked_sub(Duration::from_secs(2))
375                .unwrap_or_else(Instant::now),
376        );
377
378        let cooldown = Duration::from_secs(1);
379        let batch = select_sync_batch(&mut state, 2, cooldown);
380
381        // Peer 1's cooldown expired so it should be included.
382        assert_eq!(batch.len(), 2);
383        assert_eq!(batch[0], peer_id_from_byte(1));
384        assert_eq!(batch[1], peer_id_from_byte(2));
385    }
386
387    #[test]
388    fn select_sync_batch_empty_order() {
389        let mut state = NeighborSyncState::new_cycle(vec![]);
390
391        let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
392
393        assert!(batch.is_empty());
394        assert_eq!(state.cursor, 0);
395    }
396
397    #[test]
398    fn select_sync_batch_all_on_cooldown() {
399        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
400        let mut state = NeighborSyncState::new_cycle(peers);
401
402        state
403            .last_sync_times
404            .insert(peer_id_from_byte(1), Instant::now());
405        state
406            .last_sync_times
407            .insert(peer_id_from_byte(2), Instant::now());
408
409        let cooldown = Duration::from_secs(3600);
410        let batch = select_sync_batch(&mut state, 4, cooldown);
411
412        assert!(batch.is_empty());
413        assert!(state.order.is_empty());
414    }
415
416    // -- handle_sync_failure -------------------------------------------------
417
418    #[test]
419    fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
420        let peers = vec![
421            peer_id_from_byte(1),
422            peer_id_from_byte(2),
423            peer_id_from_byte(3),
424            peer_id_from_byte(4),
425        ];
426        let mut state = NeighborSyncState::new_cycle(peers);
427        // Simulate having already processed peers at indices 0 and 1.
428        state.cursor = 2;
429
430        // Peer 2 (index 1, before cursor) fails.
431        let replacement =
432            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
433
434        // Cursor should be adjusted down by 1 (was 2, now 1).
435        assert_eq!(state.cursor, 2); // was 2, removed at pos 1, adjusted to 1, then replacement advances to 2
436        assert!(!state.order.contains(&peer_id_from_byte(2)));
437
438        // Should get peer 4 as replacement (index 1 after removal = peer 3,
439        // but cursor was adjusted to 1 so peer 3 is at index 1; it returns
440        // the peer at the new cursor and advances).
441        assert!(replacement.is_some());
442    }
443
444    #[test]
445    fn handle_sync_failure_removes_peer_after_cursor() {
446        let peers = vec![
447            peer_id_from_byte(1),
448            peer_id_from_byte(2),
449            peer_id_from_byte(3),
450            peer_id_from_byte(4),
451        ];
452        let mut state = NeighborSyncState::new_cycle(peers);
453        state.cursor = 1;
454
455        // Peer 3 (index 2, after cursor) fails.
456        let replacement =
457            handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
458
459        // Cursor should stay at 1 (removal was after cursor).
460        assert_eq!(state.cursor, 2); // cursor was 1, replacement advances to 2
461        assert!(!state.order.contains(&peer_id_from_byte(3)));
462
463        // Replacement should be peer 2 (now at cursor position 1).
464        assert_eq!(replacement, Some(peer_id_from_byte(2)));
465    }
466
467    #[test]
468    fn handle_sync_failure_no_replacement_when_exhausted() {
469        let peers = vec![peer_id_from_byte(1)];
470        let mut state = NeighborSyncState::new_cycle(peers);
471        state.cursor = 1; // Already past the only peer.
472
473        let replacement =
474            handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
475
476        assert!(state.order.is_empty());
477        assert!(replacement.is_none());
478    }
479
480    #[test]
481    fn handle_sync_failure_unknown_peer_is_noop() {
482        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
483        let mut state = NeighborSyncState::new_cycle(peers);
484        state.cursor = 1;
485
486        let replacement =
487            handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
488
489        // Order should be unchanged.
490        assert_eq!(state.order.len(), 2);
491        // Still tries to fill from cursor.
492        assert_eq!(replacement, Some(peer_id_from_byte(2)));
493        assert_eq!(state.cursor, 2);
494    }
495
496    // -- record_successful_sync ----------------------------------------------
497
498    #[test]
499    fn record_successful_sync_updates_last_sync_time() {
500        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
501        let mut state = NeighborSyncState::new_cycle(peers);
502        let peer = peer_id_from_byte(1);
503
504        assert!(!state.last_sync_times.contains_key(&peer));
505
506        let before = Instant::now();
507        record_successful_sync(&mut state, &peer);
508        let after = Instant::now();
509
510        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
511        assert!(*ts >= before);
512        assert!(*ts <= after);
513    }
514
515    #[test]
516    fn record_successful_sync_overwrites_previous() {
517        let peers = vec![peer_id_from_byte(1)];
518        let mut state = NeighborSyncState::new_cycle(peers);
519        let peer = peer_id_from_byte(1);
520
521        // Record a sync at an old time. Use a small subtraction to avoid
522        // `checked_sub` returning `None` on freshly-booted CI runners.
523        let old_time = Instant::now()
524            .checked_sub(Duration::from_secs(2))
525            .unwrap_or_else(Instant::now);
526        state.last_sync_times.insert(peer, old_time);
527
528        record_successful_sync(&mut state, &peer);
529
530        let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
531        assert!(*ts > old_time, "sync time should be updated");
532    }
533
534    // -- Section 18: Neighbor sync scenarios --------------------------------
535
536    #[test]
537    fn scenario_35_round_robin_with_cooldown_skip() {
538        // With >PEER_COUNT eligible peers, consecutive rounds scan forward
539        // from cursor, skip cooldown peers, sync next batch.
540        // Create 8 peers, mark peers 2,4 on cooldown.
541        // First batch of 4: peers 1,3,5,6 (2,4 skipped and removed).
542        // Second batch of 4: peers 7,8 (only 2 remain).
543        // Cycle should complete after second batch.
544        let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
545        let mut state = NeighborSyncState::new_cycle(peers);
546        let batch_size = 4;
547        let cooldown = Duration::from_secs(3600);
548
549        // Mark peers 2 and 4 as recently synced (on cooldown).
550        state
551            .last_sync_times
552            .insert(peer_id_from_byte(2), Instant::now());
553        state
554            .last_sync_times
555            .insert(peer_id_from_byte(4), Instant::now());
556
557        // First batch: scan from cursor 0. Peers 2 and 4 are removed,
558        // leaving [1,3,5,6,7,8]. We pick the first 4: [1,3,5,6].
559        let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
560        assert_eq!(batch1.len(), 4);
561        assert_eq!(batch1[0], peer_id_from_byte(1));
562        assert_eq!(batch1[1], peer_id_from_byte(3));
563        assert_eq!(batch1[2], peer_id_from_byte(5));
564        assert_eq!(batch1[3], peer_id_from_byte(6));
565
566        // Cooldown peers should have been removed from the order.
567        assert!(!state.order.contains(&peer_id_from_byte(2)));
568        assert!(!state.order.contains(&peer_id_from_byte(4)));
569
570        // Second batch: only peers 7,8 remain after cursor.
571        let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
572        assert_eq!(batch2.len(), 2);
573        assert_eq!(batch2[0], peer_id_from_byte(7));
574        assert_eq!(batch2[1], peer_id_from_byte(8));
575
576        // Cycle should be complete after second batch.
577        assert!(state.is_cycle_complete());
578    }
579
580    #[test]
581    fn cycle_complete_when_cursor_past_order() {
582        // is_cycle_complete() returns true when cursor >= order.len().
583        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
584        let mut state = NeighborSyncState::new_cycle(peers);
585
586        // Not complete at the start.
587        assert!(!state.is_cycle_complete());
588
589        // Advance cursor to exactly order.len().
590        state.cursor = 3;
591        assert!(state.is_cycle_complete());
592
593        // Also complete when cursor exceeds order.len().
594        state.cursor = 10;
595        assert!(state.is_cycle_complete());
596
597        // Edge case: order is emptied (peers removed) with cursor at 0.
598        state.order.clear();
599        state.cursor = 0;
600        assert!(state.is_cycle_complete());
601    }
602
603    /// Scenario 36: Post-cycle responsibility pruning with time-based
604    /// hysteresis.
605    ///
606    /// When a full round-robin cycle completes, node runs one prune pass
607    /// over BOTH stored records and `PaidForList` entries using current
608    /// `SelfInclusiveRT`. Out-of-range items have timestamps recorded but
609    /// are deleted only after `PRUNE_HYSTERESIS_DURATION`. In-range items
610    /// have their timestamps cleared.
611    ///
612    /// Full `run_prune_pass` requires a live `P2PNode`. This test verifies
613    /// the deterministic trigger condition (cycle completion) and the
614    /// combined record + paid-list pruning contract:
615    ///   (1) Cycle completes -> prune pass should run.
616    ///   (2) Both `RecordOutOfRangeFirstSeen` and `PaidOutOfRangeFirstSeen`
617    ///       are tracked independently in the same pass.
618    ///   (3) Keys within hysteresis window are retained.
619    #[test]
620    fn scenario_36_post_cycle_triggers_combined_prune_pass() {
621        let config = ReplicationConfig::default();
622
623        // Step 1: Run a full cycle to completion.
624        let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
625        let mut state = NeighborSyncState::new_cycle(peers);
626        let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
627        assert!(
628            state.is_cycle_complete(),
629            "cycle must be complete before prune pass triggers"
630        );
631
632        // Step 2: Verify prune hysteresis parameters are configured.
633        assert!(
634            !config.prune_hysteresis_duration.is_zero(),
635            "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
636        );
637
638        // Step 3: Simulate the prune-pass timestamp tracking for BOTH
639        // record and paid-list entries (the two independent timestamp
640        // families that Section 11 requires in a single pass).
641        //
642        // Record timestamps and paid timestamps are independent — clearing
643        // one must not affect the other (tested in scenario_52). Here we
644        // verify the combined trigger: cycle completion -> both kinds of
645        // timestamps are eligible for evaluation.
646        let record_key: [u8; 32] = [0x36; 32];
647        let paid_key: [u8; 32] = [0x37; 32];
648
649        // Simulate: record_key goes out of range, paid_key goes out of range.
650        let record_first_seen = Instant::now();
651        let paid_first_seen = Instant::now();
652
653        // Both timestamps are recent — well within hysteresis window.
654        let record_elapsed = record_first_seen.elapsed();
655        let paid_elapsed = paid_first_seen.elapsed();
656        assert!(
657            record_elapsed < config.prune_hysteresis_duration,
658            "record key should be retained within hysteresis window"
659        );
660        assert!(
661            paid_elapsed < config.prune_hysteresis_duration,
662            "paid key should be retained within hysteresis window"
663        );
664
665        // The prune pass evaluates both independently. Verify they don't
666        // interfere by using separate keys.
667        assert_ne!(
668            record_key, paid_key,
669            "record and paid pruning keys must be independent"
670        );
671
672        // Step 4: After the cycle, a new snapshot is taken and cursor resets.
673        let new_state = NeighborSyncState::new_cycle(vec![
674            peer_id_from_byte(1),
675            peer_id_from_byte(2),
676            peer_id_from_byte(3),
677        ]);
678        assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
679        assert!(
680            !new_state.is_cycle_complete(),
681            "new cycle should not be immediately complete"
682        );
683    }
684
685    #[test]
686    fn scenario_38_mid_cycle_peer_join_excluded() {
687        // Peer D joins CloseNeighbors mid-cycle.
688        // D should NOT appear in the current NeighborSyncOrder snapshot.
689        // After cycle completes and a new snapshot is taken, D can be included.
690        let peers = vec![
691            peer_id_from_byte(0xA),
692            peer_id_from_byte(0xB),
693            peer_id_from_byte(0xC),
694        ];
695        let mut state = NeighborSyncState::new_cycle(peers);
696
697        // Advance cursor to simulate mid-cycle.
698        let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
699        assert_eq!(state.cursor, 1);
700
701        // Peer D "joins" the network. It should NOT be in the current snapshot.
702        let peer_d = peer_id_from_byte(0xD);
703        assert!(
704            !state.order.contains(&peer_d),
705            "mid-cycle joiner must not appear in the current snapshot"
706        );
707
708        // Complete the current cycle.
709        let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
710        assert!(state.is_cycle_complete());
711
712        // New cycle: now D can be included in the fresh snapshot.
713        let new_peers = vec![
714            peer_id_from_byte(0xA),
715            peer_id_from_byte(0xB),
716            peer_id_from_byte(0xC),
717            peer_d,
718        ];
719        let new_state = NeighborSyncState::new_cycle(new_peers);
720        assert!(
721            new_state.order.contains(&peer_d),
722            "after new snapshot, joiner D should be present"
723        );
724    }
725
726    #[test]
727    fn scenario_39_unreachable_peer_removed_slot_filled() {
728        // Peer P is in snapshot. Sync fails. P removed from order.
729        // Node resumes scanning and picks next peer Q to fill the slot.
730        let peers = vec![
731            peer_id_from_byte(1),
732            peer_id_from_byte(2),
733            peer_id_from_byte(3),
734            peer_id_from_byte(4),
735            peer_id_from_byte(5),
736        ];
737        let mut state = NeighborSyncState::new_cycle(peers);
738
739        // First batch selects peers 1,2.
740        let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
741        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
742
743        // Peer 2 becomes unreachable. Remove it and fill the slot.
744        let replacement =
745            handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
746        assert!(!state.order.contains(&peer_id_from_byte(2)));
747
748        // Slot should be filled by the next available peer (peer 3).
749        assert_eq!(
750            replacement,
751            Some(peer_id_from_byte(3)),
752            "vacated slot should be filled by next peer in order"
753        );
754
755        // Continue: next batch should resume from after the replacement.
756        let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
757        assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
758        assert!(state.is_cycle_complete());
759    }
760
761    #[test]
762    fn scenario_40_cooldown_peer_removed_from_snapshot() {
763        // Peer synced within cooldown period. When batch selection reaches it,
764        // peer is REMOVED from order (not just skipped). Scanning continues to
765        // next peer.
766        let peers = vec![
767            peer_id_from_byte(1),
768            peer_id_from_byte(2),
769            peer_id_from_byte(3),
770        ];
771        let mut state = NeighborSyncState::new_cycle(peers);
772        let cooldown = Duration::from_secs(3600);
773
774        // Mark peer 2 as recently synced.
775        state
776            .last_sync_times
777            .insert(peer_id_from_byte(2), Instant::now());
778
779        let batch = select_sync_batch(&mut state, 3, cooldown);
780
781        // Peer 2 should have been REMOVED from order, not just skipped.
782        assert!(!state.order.contains(&peer_id_from_byte(2)));
783        assert_eq!(state.order.len(), 2, "order should shrink by 1");
784
785        // Batch contains the non-cooldown peers.
786        assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
787
788        // Cycle is complete since all remaining peers were selected.
789        assert!(state.is_cycle_complete());
790    }
791
792    #[test]
793    fn scenario_41_cycle_always_terminates() {
794        // Under arbitrary cooldowns and removals, cycle always terminates.
795        // Create 10 peers. Mark ALL on cooldown. select_sync_batch
796        // should remove all and return empty. Cycle complete.
797        let peer_count: u8 = 10;
798        let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
799        let mut state = NeighborSyncState::new_cycle(peers);
800        let cooldown = Duration::from_secs(3600);
801
802        // Mark all peers as recently synced.
803        for i in 1..=peer_count {
804            state
805                .last_sync_times
806                .insert(peer_id_from_byte(i), Instant::now());
807        }
808
809        let batch = select_sync_batch(&mut state, 4, cooldown);
810
811        assert!(
812            batch.is_empty(),
813            "all peers on cooldown — batch must be empty"
814        );
815        assert!(state.order.is_empty(), "all peers should have been removed");
816        assert!(
817            state.is_cycle_complete(),
818            "cycle must terminate when all peers are removed"
819        );
820    }
821
822    #[test]
823    fn consecutive_rounds_advance_through_full_cycle() {
824        // 6 peers, batch_size=2, no cooldowns.
825        // Round 1: peers 0,1. Round 2: peers 2,3. Round 3: peers 4,5.
826        // After round 3: cycle complete.
827        let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
828        let mut state = NeighborSyncState::new_cycle(peers);
829        let batch_size = 2;
830        let no_cooldown = Duration::from_secs(0);
831
832        let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
833        assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
834        assert_eq!(state.cursor, 2);
835        assert!(!state.is_cycle_complete());
836
837        let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
838        assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
839        assert_eq!(state.cursor, 4);
840        assert!(!state.is_cycle_complete());
841
842        let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
843        assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
844        assert_eq!(state.cursor, 6);
845        assert!(state.is_cycle_complete());
846
847        // Extra call after cycle complete returns empty.
848        let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
849        assert!(round4.is_empty());
850    }
851
852    /// Scenario 37: Non-`LocalRT` inbound sync behavior.
853    ///
854    /// When a peer not in `LocalRT(self)` opens a sync session:
855    /// - Receiver STILL builds and sends outbound hints (response always
856    ///   constructed via `handle_sync_request`).
857    /// - Receiver drops ALL inbound replica/paid hints from that peer
858    ///   (caller returns early in `mod.rs:handle_neighbor_sync_request`
859    ///   when `sender_in_rt` is false).
860    /// - Sync history is NOT updated for non-RT peers, so no
861    ///   `RepairOpportunity` is created.
862    ///
863    /// Full integration requires a live `P2PNode` (`handle_sync_request`
864    /// calls `is_in_routing_table`). This test verifies the deterministic
865    /// contract:
866    ///   (1) `NeighborSyncResponse` is always constructed regardless of
867    ///       sender RT membership (outbound hints sent).
868    ///   (2) When `sender_in_rt` is false, no admission runs and sync
869    ///       history is not updated.
870    ///   (3) When `sender_in_rt` is true, sync history IS updated and
871    ///       inbound hints enter the admission pipeline.
872    #[test]
873    fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
874        let sender = peer_id_from_byte(0x37);
875
876        // Simulate what handle_sync_request always builds: outbound hints
877        // in the response, regardless of whether sender is in LocalRT.
878        let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
879        let outbound_paid_hints = vec![[0x03; 32]];
880        let response = NeighborSyncResponse {
881            replica_hints: outbound_replica_hints.clone(),
882            paid_hints: outbound_paid_hints.clone(),
883            bootstrapping: false,
884            rejected_keys: Vec::new(),
885        };
886
887        // Inbound hints from the sender (would be in the request).
888        let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
889        let inbound_paid_hints = vec![[0xB0; 32]];
890
891        // --- Case 1: sender NOT in LocalRT (sender_in_rt = false) ---
892        let sender_in_rt = false;
893        let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
894
895        // Response is still built — outbound hints are sent.
896        assert_eq!(
897            response.replica_hints, outbound_replica_hints,
898            "outbound replica hints must be sent even when sender is not in LocalRT"
899        );
900        assert_eq!(
901            response.paid_hints, outbound_paid_hints,
902            "outbound paid hints must be sent even when sender is not in LocalRT"
903        );
904
905        // Caller checks sender_in_rt and returns early. No admission runs.
906        if !sender_in_rt {
907            // This is the early-return path in mod.rs:964-966.
908            // Inbound hints are never processed.
909            let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
910            let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
911
912            for key in &inbound_replica_hints {
913                assert!(
914                    !admitted_replica_keys.contains(key),
915                    "inbound replica hints must NOT be admitted from non-RT sender"
916                );
917            }
918            for key in &inbound_paid_hints {
919                assert!(
920                    !admitted_paid_keys.contains(key),
921                    "inbound paid hints must NOT be admitted from non-RT sender"
922                );
923            }
924
925            // Sync history is NOT updated for non-RT peers.
926            assert!(
927                !sync_history.contains_key(&sender),
928                "sync history must NOT be updated for non-LocalRT sender"
929            );
930        }
931
932        // --- Case 2: sender IS in LocalRT (sender_in_rt = true) ---
933        let sender_in_rt = true;
934        assert!(
935            sender_in_rt,
936            "when sender is in LocalRT, inbound hints are processed"
937        );
938
939        // Sync history IS updated for RT peers.
940        sync_history.insert(
941            sender,
942            PeerSyncRecord {
943                last_sync: Some(Instant::now()),
944                cycles_since_sync: 0,
945            },
946        );
947        assert!(
948            sync_history.contains_key(&sender),
949            "sync history should be updated for LocalRT sender"
950        );
951        assert!(
952            sync_history
953                .get(&sender)
954                .expect("sender in history")
955                .last_sync
956                .is_some(),
957            "last_sync should be recorded for RT sender"
958        );
959    }
960
961    #[test]
962    fn cycle_completion_resets_cursor_but_keeps_sync_times() {
963        // Verify that after cycle completes, starting a new cycle
964        // preserves the last_sync_times from the old state.
965        let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
966        let mut state = NeighborSyncState::new_cycle(peers);
967
968        // Sync both peers and record their times.
969        let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
970        record_successful_sync(&mut state, &peer_id_from_byte(1));
971        record_successful_sync(&mut state, &peer_id_from_byte(2));
972        assert!(state.is_cycle_complete());
973
974        // Capture sync times before "resetting" for a new cycle.
975        let old_sync_times = state.last_sync_times.clone();
976        assert_eq!(old_sync_times.len(), 2);
977
978        // Simulate starting a new cycle: create fresh state but carry over
979        // last_sync_times (as the real driver would).
980        let new_peers = vec![
981            peer_id_from_byte(1),
982            peer_id_from_byte(2),
983            peer_id_from_byte(3),
984        ];
985        let mut new_state = NeighborSyncState::new_cycle(new_peers);
986        new_state.last_sync_times = old_sync_times;
987
988        // Cursor is reset.
989        assert_eq!(new_state.cursor, 0);
990        assert!(!new_state.is_cycle_complete());
991
992        // Sync times are preserved.
993        assert_eq!(new_state.last_sync_times.len(), 2);
994        assert!(new_state
995            .last_sync_times
996            .contains_key(&peer_id_from_byte(1)));
997        assert!(new_state
998            .last_sync_times
999            .contains_key(&peer_id_from_byte(2)));
1000
1001        // The preserved cooldowns cause peers 1,2 to be removed, leaving
1002        // only peer 3 selected.
1003        let cooldown = Duration::from_secs(3600);
1004        let batch = select_sync_batch(&mut new_state, 3, cooldown);
1005        assert_eq!(
1006            batch,
1007            std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1008            "only the new peer should be selected; old peers are on cooldown"
1009        );
1010    }
1011}