1use std::collections::HashSet;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::logging::{debug, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14
15use crate::ant_protocol::XorName;
16use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
17use crate::replication::paid_list::PaidList;
18use crate::replication::protocol::{
19 NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
20};
21use crate::replication::types::NeighborSyncState;
22use crate::storage::LmdbStorage;
23
24#[derive(Debug)]
27pub(crate) struct SentReplicaHint {
28 pub(crate) key: XorName,
30 pub(crate) close_peers: HashSet<PeerId>,
32}
33
34#[derive(Debug)]
36pub(crate) struct NeighborSyncOutcome {
37 pub(crate) response: NeighborSyncResponse,
39 pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
41}
42
43pub async fn build_replica_hints_for_peer(
53 peer: &PeerId,
54 storage: &Arc<LmdbStorage>,
55 p2p_node: &Arc<P2PNode>,
56 close_group_size: usize,
57) -> Vec<XorName> {
58 build_replica_hints_for_peer_with_close_groups(peer, storage, p2p_node, close_group_size)
59 .await
60 .into_iter()
61 .map(|hint| hint.key)
62 .collect()
63}
64
65pub(crate) async fn build_replica_hints_for_peer_with_close_groups(
66 peer: &PeerId,
67 storage: &Arc<LmdbStorage>,
68 p2p_node: &Arc<P2PNode>,
69 close_group_size: usize,
70) -> Vec<SentReplicaHint> {
71 let all_keys = match storage.all_keys().await {
72 Ok(keys) => keys,
73 Err(e) => {
74 warn!("Failed to read stored keys for hint construction: {e}");
75 return Vec::new();
76 }
77 };
78
79 let dht = p2p_node.dht_manager();
80 let mut hints = Vec::new();
81 for key in all_keys {
82 let closest = dht
83 .find_closest_nodes_local_with_self(&key, close_group_size)
84 .await;
85 let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
86 if close_peers.contains(peer) {
87 hints.push(SentReplicaHint { key, close_peers });
88 }
89 }
90 hints
91}
92
93pub async fn build_paid_hints_for_peer(
98 peer: &PeerId,
99 paid_list: &Arc<PaidList>,
100 p2p_node: &Arc<P2PNode>,
101 paid_list_close_group_size: usize,
102) -> Vec<XorName> {
103 let all_paid_keys = match paid_list.all_keys() {
104 Ok(keys) => keys,
105 Err(e) => {
106 warn!("Failed to read PaidForList for hint construction: {e}");
107 return Vec::new();
108 }
109 };
110
111 let dht = p2p_node.dht_manager();
112 let mut hints = Vec::new();
113 for key in all_paid_keys {
114 let closest = dht
115 .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
116 .await;
117 if closest.iter().any(|n| n.peer_id == *peer) {
118 hints.push(key);
119 }
120 }
121 hints
122}
123
124pub async fn snapshot_close_neighbors(
129 p2p_node: &Arc<P2PNode>,
130 self_id: &PeerId,
131 scope: usize,
132) -> Vec<PeerId> {
133 let self_xor: XorName = *self_id.as_bytes();
134 let closest = p2p_node
135 .dht_manager()
136 .find_closest_nodes_local(&self_xor, scope)
137 .await;
138 closest.iter().map(|n| n.peer_id).collect()
139}
140
141pub fn select_sync_batch(
146 state: &mut NeighborSyncState,
147 peer_count: usize,
148 cooldown: Duration,
149) -> Vec<PeerId> {
150 let mut batch = Vec::new();
151 let now = Instant::now();
152
153 while batch.len() < peer_count && state.cursor < state.order.len() {
154 let peer = state.order[state.cursor];
155
156 if let Some(last_sync) = state.last_sync_times.get(&peer) {
160 if now.duration_since(*last_sync) < cooldown {
161 state.order.remove(state.cursor);
162 continue;
163 }
164 }
165
166 batch.push(peer);
167 state.cursor += 1;
168 }
169
170 batch
171}
172
173pub async fn sync_with_peer(
178 peer: &PeerId,
179 p2p_node: &Arc<P2PNode>,
180 storage: &Arc<LmdbStorage>,
181 paid_list: &Arc<PaidList>,
182 config: &ReplicationConfig,
183 is_bootstrapping: bool,
184) -> Option<NeighborSyncResponse> {
185 sync_with_peer_with_outcome(peer, p2p_node, storage, paid_list, config, is_bootstrapping)
186 .await
187 .map(|outcome| outcome.response)
188}
189
190pub(crate) async fn sync_with_peer_with_outcome(
191 peer: &PeerId,
192 p2p_node: &Arc<P2PNode>,
193 storage: &Arc<LmdbStorage>,
194 paid_list: &Arc<PaidList>,
195 config: &ReplicationConfig,
196 is_bootstrapping: bool,
197) -> Option<NeighborSyncOutcome> {
198 let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
200 peer,
201 storage,
202 p2p_node,
203 config.close_group_size,
204 )
205 .await;
206 let replica_hints = sent_replica_hints
207 .iter()
208 .map(|hint| hint.key)
209 .collect::<Vec<_>>();
210 let paid_hints =
211 build_paid_hints_for_peer(peer, paid_list, p2p_node, config.paid_list_close_group_size)
212 .await;
213
214 let request = NeighborSyncRequest {
215 replica_hints,
216 paid_hints,
217 bootstrapping: is_bootstrapping,
218 };
219 let request_id = rand::thread_rng().gen::<u64>();
220 let msg = ReplicationMessage {
221 request_id,
222 body: ReplicationMessageBody::NeighborSyncRequest(request),
223 };
224
225 let encoded = match msg.encode() {
226 Ok(data) => data,
227 Err(e) => {
228 warn!("Failed to encode sync request for {peer}: {e}");
229 return None;
230 }
231 };
232
233 let response = match p2p_node
234 .send_request(
235 peer,
236 REPLICATION_PROTOCOL_ID,
237 encoded,
238 config.verification_request_timeout,
239 )
240 .await
241 {
242 Ok(resp) => resp,
243 Err(e) => {
244 debug!("Sync with {peer} failed: {e}");
245 return None;
246 }
247 };
248
249 match ReplicationMessage::decode(&response.data) {
250 Ok(decoded) => {
251 if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
252 Some(NeighborSyncOutcome {
253 response: resp,
254 sent_replica_hints,
255 })
256 } else {
257 warn!("Unexpected response type from {peer} during sync");
258 None
259 }
260 }
261 Err(e) => {
262 warn!("Failed to decode sync response from {peer}: {e}");
263 None
264 }
265 }
266}
267
268pub fn handle_sync_failure(
276 state: &mut NeighborSyncState,
277 failed_peer: &PeerId,
278 cooldown: Duration,
279) -> Option<PeerId> {
280 if let Some(pos) = state.order.iter().position(|p| p == failed_peer) {
282 state.order.remove(pos);
283 if pos < state.cursor {
285 state.cursor = state.cursor.saturating_sub(1);
286 }
287 }
288
289 let now = Instant::now();
292 while state.cursor < state.order.len() {
293 let candidate = state.order[state.cursor];
294
295 if let Some(last_sync) = state.last_sync_times.get(&candidate) {
296 if now.duration_since(*last_sync) < cooldown {
297 state.order.remove(state.cursor);
298 continue;
299 }
300 }
301
302 state.cursor += 1;
303 return Some(candidate);
304 }
305
306 None
307}
308
309pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
311 state.last_sync_times.insert(*peer, Instant::now());
312}
313
314pub async fn handle_sync_request(
322 sender: &PeerId,
323 request: &NeighborSyncRequest,
324 p2p_node: &Arc<P2PNode>,
325 storage: &Arc<LmdbStorage>,
326 paid_list: &Arc<PaidList>,
327 config: &ReplicationConfig,
328 is_bootstrapping: bool,
329) -> (NeighborSyncResponse, bool) {
330 let (response, _, sender_in_rt) = handle_sync_request_with_proofs(
331 sender,
332 request,
333 p2p_node,
334 storage,
335 paid_list,
336 config,
337 is_bootstrapping,
338 )
339 .await;
340 (response, sender_in_rt)
341}
342
343pub(crate) async fn handle_sync_request_with_proofs(
344 sender: &PeerId,
345 _request: &NeighborSyncRequest,
346 p2p_node: &Arc<P2PNode>,
347 storage: &Arc<LmdbStorage>,
348 paid_list: &Arc<PaidList>,
349 config: &ReplicationConfig,
350 is_bootstrapping: bool,
351) -> (NeighborSyncResponse, Vec<SentReplicaHint>, bool) {
352 let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
353
354 let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
356 sender,
357 storage,
358 p2p_node,
359 config.close_group_size,
360 )
361 .await;
362 let replica_hints = sent_replica_hints
363 .iter()
364 .map(|hint| hint.key)
365 .collect::<Vec<_>>();
366 let paid_hints = build_paid_hints_for_peer(
367 sender,
368 paid_list,
369 p2p_node,
370 config.paid_list_close_group_size,
371 )
372 .await;
373
374 let response = NeighborSyncResponse {
375 replica_hints,
376 paid_hints,
377 bootstrapping: is_bootstrapping,
378 rejected_keys: Vec::new(),
379 };
380
381 (response, sent_replica_hints, sender_in_rt)
383}
384
385#[cfg(test)]
390#[allow(clippy::unwrap_used, clippy::expect_used)]
391mod tests {
392 use super::*;
393 use crate::replication::types::PeerSyncRecord;
394 use std::collections::HashMap;
395
396 fn peer_id_from_byte(b: u8) -> PeerId {
398 let mut bytes = [0u8; 32];
399 bytes[0] = b;
400 PeerId::from_bytes(bytes)
401 }
402
403 #[test]
406 fn select_sync_batch_returns_up_to_peer_count() {
407 let peers = vec![
408 peer_id_from_byte(1),
409 peer_id_from_byte(2),
410 peer_id_from_byte(3),
411 peer_id_from_byte(4),
412 peer_id_from_byte(5),
413 ];
414 let mut state = NeighborSyncState::new_cycle(peers);
415 let batch_size = 3;
416
417 let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
418
419 assert_eq!(batch.len(), batch_size);
420 assert_eq!(batch[0], peer_id_from_byte(1));
421 assert_eq!(batch[1], peer_id_from_byte(2));
422 assert_eq!(batch[2], peer_id_from_byte(3));
423 assert_eq!(state.cursor, 3);
424 }
425
426 #[test]
427 fn select_sync_batch_skips_cooldown_peers() {
428 let peers = vec![
429 peer_id_from_byte(1),
430 peer_id_from_byte(2),
431 peer_id_from_byte(3),
432 peer_id_from_byte(4),
433 ];
434 let mut state = NeighborSyncState::new_cycle(peers);
435
436 state
438 .last_sync_times
439 .insert(peer_id_from_byte(1), Instant::now());
440 state
441 .last_sync_times
442 .insert(peer_id_from_byte(3), Instant::now());
443
444 let cooldown = Duration::from_secs(3600); let batch = select_sync_batch(&mut state, 2, cooldown);
446
447 assert_eq!(batch.len(), 2);
449 assert_eq!(batch[0], peer_id_from_byte(2));
450 assert_eq!(batch[1], peer_id_from_byte(4));
451
452 assert!(!state.order.contains(&peer_id_from_byte(1)));
454 assert!(!state.order.contains(&peer_id_from_byte(3)));
455 }
456
457 #[test]
458 fn select_sync_batch_expired_cooldown_not_skipped() {
459 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
460 let mut state = NeighborSyncState::new_cycle(peers);
461
462 state.last_sync_times.insert(
467 peer_id_from_byte(1),
468 Instant::now()
469 .checked_sub(Duration::from_secs(2))
470 .unwrap_or_else(Instant::now),
471 );
472
473 let cooldown = Duration::from_secs(1);
474 let batch = select_sync_batch(&mut state, 2, cooldown);
475
476 assert_eq!(batch.len(), 2);
478 assert_eq!(batch[0], peer_id_from_byte(1));
479 assert_eq!(batch[1], peer_id_from_byte(2));
480 }
481
482 #[test]
483 fn select_sync_batch_empty_order() {
484 let mut state = NeighborSyncState::new_cycle(vec![]);
485
486 let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
487
488 assert!(batch.is_empty());
489 assert_eq!(state.cursor, 0);
490 }
491
492 #[test]
493 fn select_sync_batch_all_on_cooldown() {
494 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
495 let mut state = NeighborSyncState::new_cycle(peers);
496
497 state
498 .last_sync_times
499 .insert(peer_id_from_byte(1), Instant::now());
500 state
501 .last_sync_times
502 .insert(peer_id_from_byte(2), Instant::now());
503
504 let cooldown = Duration::from_secs(3600);
505 let batch = select_sync_batch(&mut state, 4, cooldown);
506
507 assert!(batch.is_empty());
508 assert!(state.order.is_empty());
509 }
510
511 #[test]
514 fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
515 let peers = vec![
516 peer_id_from_byte(1),
517 peer_id_from_byte(2),
518 peer_id_from_byte(3),
519 peer_id_from_byte(4),
520 ];
521 let mut state = NeighborSyncState::new_cycle(peers);
522 state.cursor = 2;
524
525 let replacement =
527 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
528
529 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(2)));
532
533 assert!(replacement.is_some());
537 }
538
539 #[test]
540 fn handle_sync_failure_removes_peer_after_cursor() {
541 let peers = vec![
542 peer_id_from_byte(1),
543 peer_id_from_byte(2),
544 peer_id_from_byte(3),
545 peer_id_from_byte(4),
546 ];
547 let mut state = NeighborSyncState::new_cycle(peers);
548 state.cursor = 1;
549
550 let replacement =
552 handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
553
554 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(3)));
557
558 assert_eq!(replacement, Some(peer_id_from_byte(2)));
560 }
561
562 #[test]
563 fn handle_sync_failure_no_replacement_when_exhausted() {
564 let peers = vec![peer_id_from_byte(1)];
565 let mut state = NeighborSyncState::new_cycle(peers);
566 state.cursor = 1; let replacement =
569 handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
570
571 assert!(state.order.is_empty());
572 assert!(replacement.is_none());
573 }
574
575 #[test]
576 fn handle_sync_failure_unknown_peer_is_noop() {
577 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
578 let mut state = NeighborSyncState::new_cycle(peers);
579 state.cursor = 1;
580
581 let replacement =
582 handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
583
584 assert_eq!(state.order.len(), 2);
586 assert_eq!(replacement, Some(peer_id_from_byte(2)));
588 assert_eq!(state.cursor, 2);
589 }
590
591 #[test]
594 fn record_successful_sync_updates_last_sync_time() {
595 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
596 let mut state = NeighborSyncState::new_cycle(peers);
597 let peer = peer_id_from_byte(1);
598
599 assert!(!state.last_sync_times.contains_key(&peer));
600
601 let before = Instant::now();
602 record_successful_sync(&mut state, &peer);
603 let after = Instant::now();
604
605 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
606 assert!(*ts >= before);
607 assert!(*ts <= after);
608 }
609
610 #[test]
611 fn record_successful_sync_overwrites_previous() {
612 let peers = vec![peer_id_from_byte(1)];
613 let mut state = NeighborSyncState::new_cycle(peers);
614 let peer = peer_id_from_byte(1);
615
616 let old_time = Instant::now()
619 .checked_sub(Duration::from_secs(2))
620 .unwrap_or_else(Instant::now);
621 state.last_sync_times.insert(peer, old_time);
622
623 record_successful_sync(&mut state, &peer);
624
625 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
626 assert!(*ts > old_time, "sync time should be updated");
627 }
628
629 #[test]
632 fn scenario_35_round_robin_with_cooldown_skip() {
633 let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
640 let mut state = NeighborSyncState::new_cycle(peers);
641 let batch_size = 4;
642 let cooldown = Duration::from_secs(3600);
643
644 state
646 .last_sync_times
647 .insert(peer_id_from_byte(2), Instant::now());
648 state
649 .last_sync_times
650 .insert(peer_id_from_byte(4), Instant::now());
651
652 let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
655 assert_eq!(batch1.len(), 4);
656 assert_eq!(batch1[0], peer_id_from_byte(1));
657 assert_eq!(batch1[1], peer_id_from_byte(3));
658 assert_eq!(batch1[2], peer_id_from_byte(5));
659 assert_eq!(batch1[3], peer_id_from_byte(6));
660
661 assert!(!state.order.contains(&peer_id_from_byte(2)));
663 assert!(!state.order.contains(&peer_id_from_byte(4)));
664
665 let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
667 assert_eq!(batch2.len(), 2);
668 assert_eq!(batch2[0], peer_id_from_byte(7));
669 assert_eq!(batch2[1], peer_id_from_byte(8));
670
671 assert!(state.is_cycle_complete());
673 }
674
675 #[test]
676 fn cycle_complete_when_cursor_past_order() {
677 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
679 let mut state = NeighborSyncState::new_cycle(peers);
680
681 assert!(!state.is_cycle_complete());
683
684 state.cursor = 3;
686 assert!(state.is_cycle_complete());
687
688 state.cursor = 10;
690 assert!(state.is_cycle_complete());
691
692 state.order.clear();
694 state.cursor = 0;
695 assert!(state.is_cycle_complete());
696 }
697
698 #[test]
715 fn scenario_36_post_cycle_triggers_combined_prune_pass() {
716 let config = ReplicationConfig::default();
717
718 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
720 let mut state = NeighborSyncState::new_cycle(peers);
721 let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
722 assert!(
723 state.is_cycle_complete(),
724 "cycle must be complete before prune pass triggers"
725 );
726
727 assert!(
729 !config.prune_hysteresis_duration.is_zero(),
730 "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
731 );
732
733 let record_key: [u8; 32] = [0x36; 32];
742 let paid_key: [u8; 32] = [0x37; 32];
743
744 let record_first_seen = Instant::now();
746 let paid_first_seen = Instant::now();
747
748 let record_elapsed = record_first_seen.elapsed();
750 let paid_elapsed = paid_first_seen.elapsed();
751 assert!(
752 record_elapsed < config.prune_hysteresis_duration,
753 "record key should be retained within hysteresis window"
754 );
755 assert!(
756 paid_elapsed < config.prune_hysteresis_duration,
757 "paid key should be retained within hysteresis window"
758 );
759
760 assert_ne!(
763 record_key, paid_key,
764 "record and paid pruning keys must be independent"
765 );
766
767 let new_state = NeighborSyncState::new_cycle(vec![
769 peer_id_from_byte(1),
770 peer_id_from_byte(2),
771 peer_id_from_byte(3),
772 ]);
773 assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
774 assert!(
775 !new_state.is_cycle_complete(),
776 "new cycle should not be immediately complete"
777 );
778 }
779
780 #[test]
781 fn scenario_38_mid_cycle_peer_join_excluded() {
782 let peers = vec![
786 peer_id_from_byte(0xA),
787 peer_id_from_byte(0xB),
788 peer_id_from_byte(0xC),
789 ];
790 let mut state = NeighborSyncState::new_cycle(peers);
791
792 let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
794 assert_eq!(state.cursor, 1);
795
796 let peer_d = peer_id_from_byte(0xD);
798 assert!(
799 !state.order.contains(&peer_d),
800 "mid-cycle joiner must not appear in the current snapshot"
801 );
802
803 let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
805 assert!(state.is_cycle_complete());
806
807 let new_peers = vec![
809 peer_id_from_byte(0xA),
810 peer_id_from_byte(0xB),
811 peer_id_from_byte(0xC),
812 peer_d,
813 ];
814 let new_state = NeighborSyncState::new_cycle(new_peers);
815 assert!(
816 new_state.order.contains(&peer_d),
817 "after new snapshot, joiner D should be present"
818 );
819 }
820
821 #[test]
822 fn scenario_39_unreachable_peer_removed_slot_filled() {
823 let peers = vec![
826 peer_id_from_byte(1),
827 peer_id_from_byte(2),
828 peer_id_from_byte(3),
829 peer_id_from_byte(4),
830 peer_id_from_byte(5),
831 ];
832 let mut state = NeighborSyncState::new_cycle(peers);
833
834 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
836 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
837
838 let replacement =
840 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
841 assert!(!state.order.contains(&peer_id_from_byte(2)));
842
843 assert_eq!(
845 replacement,
846 Some(peer_id_from_byte(3)),
847 "vacated slot should be filled by next peer in order"
848 );
849
850 let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
852 assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
853 assert!(state.is_cycle_complete());
854 }
855
856 #[test]
857 fn scenario_40_cooldown_peer_removed_from_snapshot() {
858 let peers = vec![
862 peer_id_from_byte(1),
863 peer_id_from_byte(2),
864 peer_id_from_byte(3),
865 ];
866 let mut state = NeighborSyncState::new_cycle(peers);
867 let cooldown = Duration::from_secs(3600);
868
869 state
871 .last_sync_times
872 .insert(peer_id_from_byte(2), Instant::now());
873
874 let batch = select_sync_batch(&mut state, 3, cooldown);
875
876 assert!(!state.order.contains(&peer_id_from_byte(2)));
878 assert_eq!(state.order.len(), 2, "order should shrink by 1");
879
880 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
882
883 assert!(state.is_cycle_complete());
885 }
886
887 #[test]
888 fn scenario_41_cycle_always_terminates() {
889 let peer_count: u8 = 10;
893 let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
894 let mut state = NeighborSyncState::new_cycle(peers);
895 let cooldown = Duration::from_secs(3600);
896
897 for i in 1..=peer_count {
899 state
900 .last_sync_times
901 .insert(peer_id_from_byte(i), Instant::now());
902 }
903
904 let batch = select_sync_batch(&mut state, 4, cooldown);
905
906 assert!(
907 batch.is_empty(),
908 "all peers on cooldown — batch must be empty"
909 );
910 assert!(state.order.is_empty(), "all peers should have been removed");
911 assert!(
912 state.is_cycle_complete(),
913 "cycle must terminate when all peers are removed"
914 );
915 }
916
917 #[test]
918 fn consecutive_rounds_advance_through_full_cycle() {
919 let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
923 let mut state = NeighborSyncState::new_cycle(peers);
924 let batch_size = 2;
925 let no_cooldown = Duration::from_secs(0);
926
927 let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
928 assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
929 assert_eq!(state.cursor, 2);
930 assert!(!state.is_cycle_complete());
931
932 let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
933 assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
934 assert_eq!(state.cursor, 4);
935 assert!(!state.is_cycle_complete());
936
937 let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
938 assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
939 assert_eq!(state.cursor, 6);
940 assert!(state.is_cycle_complete());
941
942 let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
944 assert!(round4.is_empty());
945 }
946
947 #[test]
968 fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
969 let sender = peer_id_from_byte(0x37);
970
971 let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
974 let outbound_paid_hints = vec![[0x03; 32]];
975 let response = NeighborSyncResponse {
976 replica_hints: outbound_replica_hints.clone(),
977 paid_hints: outbound_paid_hints.clone(),
978 bootstrapping: false,
979 rejected_keys: Vec::new(),
980 };
981
982 let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
984 let inbound_paid_hints = vec![[0xB0; 32]];
985
986 let sender_in_rt = false;
988 let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
989
990 assert_eq!(
992 response.replica_hints, outbound_replica_hints,
993 "outbound replica hints must be sent even when sender is not in LocalRT"
994 );
995 assert_eq!(
996 response.paid_hints, outbound_paid_hints,
997 "outbound paid hints must be sent even when sender is not in LocalRT"
998 );
999
1000 if !sender_in_rt {
1002 let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
1005 let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
1006
1007 for key in &inbound_replica_hints {
1008 assert!(
1009 !admitted_replica_keys.contains(key),
1010 "inbound replica hints must NOT be admitted from non-RT sender"
1011 );
1012 }
1013 for key in &inbound_paid_hints {
1014 assert!(
1015 !admitted_paid_keys.contains(key),
1016 "inbound paid hints must NOT be admitted from non-RT sender"
1017 );
1018 }
1019
1020 assert!(
1022 !sync_history.contains_key(&sender),
1023 "sync history must NOT be updated for non-LocalRT sender"
1024 );
1025 }
1026
1027 let sender_in_rt = true;
1029 assert!(
1030 sender_in_rt,
1031 "when sender is in LocalRT, inbound hints are processed"
1032 );
1033
1034 sync_history.insert(
1036 sender,
1037 PeerSyncRecord {
1038 last_sync: Some(Instant::now()),
1039 cycles_since_sync: 0,
1040 },
1041 );
1042 assert!(
1043 sync_history.contains_key(&sender),
1044 "sync history should be updated for LocalRT sender"
1045 );
1046 assert!(
1047 sync_history
1048 .get(&sender)
1049 .expect("sender in history")
1050 .last_sync
1051 .is_some(),
1052 "last_sync should be recorded for RT sender"
1053 );
1054 }
1055
1056 #[test]
1057 fn cycle_completion_resets_cursor_but_keeps_sync_times() {
1058 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1061 let mut state = NeighborSyncState::new_cycle(peers);
1062
1063 let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1065 record_successful_sync(&mut state, &peer_id_from_byte(1));
1066 record_successful_sync(&mut state, &peer_id_from_byte(2));
1067 assert!(state.is_cycle_complete());
1068
1069 let old_sync_times = state.last_sync_times.clone();
1071 assert_eq!(old_sync_times.len(), 2);
1072
1073 let new_peers = vec![
1076 peer_id_from_byte(1),
1077 peer_id_from_byte(2),
1078 peer_id_from_byte(3),
1079 ];
1080 let mut new_state = NeighborSyncState::new_cycle(new_peers);
1081 new_state.last_sync_times = old_sync_times;
1082
1083 assert_eq!(new_state.cursor, 0);
1085 assert!(!new_state.is_cycle_complete());
1086
1087 assert_eq!(new_state.last_sync_times.len(), 2);
1089 assert!(new_state
1090 .last_sync_times
1091 .contains_key(&peer_id_from_byte(1)));
1092 assert!(new_state
1093 .last_sync_times
1094 .contains_key(&peer_id_from_byte(2)));
1095
1096 let cooldown = Duration::from_secs(3600);
1099 let batch = select_sync_batch(&mut new_state, 3, cooldown);
1100 assert_eq!(
1101 batch,
1102 std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1103 "only the new peer should be selected; old peers are on cooldown"
1104 );
1105 }
1106}