1use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use crate::logging::{debug, warn};
10use rand::Rng;
11use saorsa_core::identity::PeerId;
12use saorsa_core::P2PNode;
13
14use crate::ant_protocol::XorName;
15use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
16use crate::replication::paid_list::PaidList;
17use crate::replication::protocol::{
18 NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
19};
20use crate::replication::types::NeighborSyncState;
21use crate::storage::LmdbStorage;
22
23pub async fn build_replica_hints_for_peer(
33 peer: &PeerId,
34 storage: &Arc<LmdbStorage>,
35 p2p_node: &Arc<P2PNode>,
36 close_group_size: usize,
37) -> Vec<XorName> {
38 let all_keys = match storage.all_keys().await {
39 Ok(keys) => keys,
40 Err(e) => {
41 warn!("Failed to read stored keys for hint construction: {e}");
42 return Vec::new();
43 }
44 };
45
46 let dht = p2p_node.dht_manager();
47 let mut hints = Vec::new();
48 for key in all_keys {
49 let closest = dht
50 .find_closest_nodes_local_with_self(&key, close_group_size)
51 .await;
52 if closest.iter().any(|n| n.peer_id == *peer) {
53 hints.push(key);
54 }
55 }
56 hints
57}
58
59pub async fn build_paid_hints_for_peer(
64 peer: &PeerId,
65 paid_list: &Arc<PaidList>,
66 p2p_node: &Arc<P2PNode>,
67 paid_list_close_group_size: usize,
68) -> Vec<XorName> {
69 let all_paid_keys = match paid_list.all_keys() {
70 Ok(keys) => keys,
71 Err(e) => {
72 warn!("Failed to read PaidForList for hint construction: {e}");
73 return Vec::new();
74 }
75 };
76
77 let dht = p2p_node.dht_manager();
78 let mut hints = Vec::new();
79 for key in all_paid_keys {
80 let closest = dht
81 .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
82 .await;
83 if closest.iter().any(|n| n.peer_id == *peer) {
84 hints.push(key);
85 }
86 }
87 hints
88}
89
90pub async fn snapshot_close_neighbors(
95 p2p_node: &Arc<P2PNode>,
96 self_id: &PeerId,
97 scope: usize,
98) -> Vec<PeerId> {
99 let self_xor: XorName = *self_id.as_bytes();
100 let closest = p2p_node
101 .dht_manager()
102 .find_closest_nodes_local(&self_xor, scope)
103 .await;
104 closest.iter().map(|n| n.peer_id).collect()
105}
106
107pub fn select_sync_batch(
112 state: &mut NeighborSyncState,
113 peer_count: usize,
114 cooldown: Duration,
115) -> Vec<PeerId> {
116 let mut batch = Vec::new();
117 let now = Instant::now();
118
119 while batch.len() < peer_count && state.cursor < state.order.len() {
120 let peer = state.order[state.cursor];
121
122 if let Some(last_sync) = state.last_sync_times.get(&peer) {
126 if now.duration_since(*last_sync) < cooldown {
127 state.order.remove(state.cursor);
128 continue;
129 }
130 }
131
132 batch.push(peer);
133 state.cursor += 1;
134 }
135
136 batch
137}
138
139pub async fn sync_with_peer(
144 peer: &PeerId,
145 p2p_node: &Arc<P2PNode>,
146 storage: &Arc<LmdbStorage>,
147 paid_list: &Arc<PaidList>,
148 config: &ReplicationConfig,
149 is_bootstrapping: bool,
150) -> Option<NeighborSyncResponse> {
151 let replica_hints =
153 build_replica_hints_for_peer(peer, storage, p2p_node, config.close_group_size).await;
154 let paid_hints =
155 build_paid_hints_for_peer(peer, paid_list, p2p_node, config.paid_list_close_group_size)
156 .await;
157
158 let request = NeighborSyncRequest {
159 replica_hints,
160 paid_hints,
161 bootstrapping: is_bootstrapping,
162 };
163 let request_id = rand::thread_rng().gen::<u64>();
164 let msg = ReplicationMessage {
165 request_id,
166 body: ReplicationMessageBody::NeighborSyncRequest(request),
167 };
168
169 let encoded = match msg.encode() {
170 Ok(data) => data,
171 Err(e) => {
172 warn!("Failed to encode sync request for {peer}: {e}");
173 return None;
174 }
175 };
176
177 let response = match p2p_node
178 .send_request(
179 peer,
180 REPLICATION_PROTOCOL_ID,
181 encoded,
182 config.verification_request_timeout,
183 )
184 .await
185 {
186 Ok(resp) => resp,
187 Err(e) => {
188 debug!("Sync with {peer} failed: {e}");
189 return None;
190 }
191 };
192
193 match ReplicationMessage::decode(&response.data) {
194 Ok(decoded) => {
195 if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
196 Some(resp)
197 } else {
198 warn!("Unexpected response type from {peer} during sync");
199 None
200 }
201 }
202 Err(e) => {
203 warn!("Failed to decode sync response from {peer}: {e}");
204 None
205 }
206 }
207}
208
209pub fn handle_sync_failure(
217 state: &mut NeighborSyncState,
218 failed_peer: &PeerId,
219 cooldown: Duration,
220) -> Option<PeerId> {
221 if let Some(pos) = state.order.iter().position(|p| p == failed_peer) {
223 state.order.remove(pos);
224 if pos < state.cursor {
226 state.cursor = state.cursor.saturating_sub(1);
227 }
228 }
229
230 let now = Instant::now();
233 while state.cursor < state.order.len() {
234 let candidate = state.order[state.cursor];
235
236 if let Some(last_sync) = state.last_sync_times.get(&candidate) {
237 if now.duration_since(*last_sync) < cooldown {
238 state.order.remove(state.cursor);
239 continue;
240 }
241 }
242
243 state.cursor += 1;
244 return Some(candidate);
245 }
246
247 None
248}
249
250pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
252 state.last_sync_times.insert(*peer, Instant::now());
253}
254
255pub async fn handle_sync_request(
263 sender: &PeerId,
264 _request: &NeighborSyncRequest,
265 p2p_node: &Arc<P2PNode>,
266 storage: &Arc<LmdbStorage>,
267 paid_list: &Arc<PaidList>,
268 config: &ReplicationConfig,
269 is_bootstrapping: bool,
270) -> (NeighborSyncResponse, bool) {
271 let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
272
273 let replica_hints =
275 build_replica_hints_for_peer(sender, storage, p2p_node, config.close_group_size).await;
276 let paid_hints = build_paid_hints_for_peer(
277 sender,
278 paid_list,
279 p2p_node,
280 config.paid_list_close_group_size,
281 )
282 .await;
283
284 let response = NeighborSyncResponse {
285 replica_hints,
286 paid_hints,
287 bootstrapping: is_bootstrapping,
288 rejected_keys: Vec::new(),
289 };
290
291 (response, sender_in_rt)
293}
294
295#[cfg(test)]
300#[allow(clippy::unwrap_used, clippy::expect_used)]
301mod tests {
302 use super::*;
303 use crate::replication::types::PeerSyncRecord;
304 use std::collections::HashMap;
305
306 fn peer_id_from_byte(b: u8) -> PeerId {
308 let mut bytes = [0u8; 32];
309 bytes[0] = b;
310 PeerId::from_bytes(bytes)
311 }
312
313 #[test]
316 fn select_sync_batch_returns_up_to_peer_count() {
317 let peers = vec![
318 peer_id_from_byte(1),
319 peer_id_from_byte(2),
320 peer_id_from_byte(3),
321 peer_id_from_byte(4),
322 peer_id_from_byte(5),
323 ];
324 let mut state = NeighborSyncState::new_cycle(peers);
325 let batch_size = 3;
326
327 let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
328
329 assert_eq!(batch.len(), batch_size);
330 assert_eq!(batch[0], peer_id_from_byte(1));
331 assert_eq!(batch[1], peer_id_from_byte(2));
332 assert_eq!(batch[2], peer_id_from_byte(3));
333 assert_eq!(state.cursor, 3);
334 }
335
336 #[test]
337 fn select_sync_batch_skips_cooldown_peers() {
338 let peers = vec![
339 peer_id_from_byte(1),
340 peer_id_from_byte(2),
341 peer_id_from_byte(3),
342 peer_id_from_byte(4),
343 ];
344 let mut state = NeighborSyncState::new_cycle(peers);
345
346 state
348 .last_sync_times
349 .insert(peer_id_from_byte(1), Instant::now());
350 state
351 .last_sync_times
352 .insert(peer_id_from_byte(3), Instant::now());
353
354 let cooldown = Duration::from_secs(3600); let batch = select_sync_batch(&mut state, 2, cooldown);
356
357 assert_eq!(batch.len(), 2);
359 assert_eq!(batch[0], peer_id_from_byte(2));
360 assert_eq!(batch[1], peer_id_from_byte(4));
361
362 assert!(!state.order.contains(&peer_id_from_byte(1)));
364 assert!(!state.order.contains(&peer_id_from_byte(3)));
365 }
366
367 #[test]
368 fn select_sync_batch_expired_cooldown_not_skipped() {
369 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
370 let mut state = NeighborSyncState::new_cycle(peers);
371
372 state.last_sync_times.insert(
377 peer_id_from_byte(1),
378 Instant::now()
379 .checked_sub(Duration::from_secs(2))
380 .unwrap_or_else(Instant::now),
381 );
382
383 let cooldown = Duration::from_secs(1);
384 let batch = select_sync_batch(&mut state, 2, cooldown);
385
386 assert_eq!(batch.len(), 2);
388 assert_eq!(batch[0], peer_id_from_byte(1));
389 assert_eq!(batch[1], peer_id_from_byte(2));
390 }
391
392 #[test]
393 fn select_sync_batch_empty_order() {
394 let mut state = NeighborSyncState::new_cycle(vec![]);
395
396 let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
397
398 assert!(batch.is_empty());
399 assert_eq!(state.cursor, 0);
400 }
401
402 #[test]
403 fn select_sync_batch_all_on_cooldown() {
404 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
405 let mut state = NeighborSyncState::new_cycle(peers);
406
407 state
408 .last_sync_times
409 .insert(peer_id_from_byte(1), Instant::now());
410 state
411 .last_sync_times
412 .insert(peer_id_from_byte(2), Instant::now());
413
414 let cooldown = Duration::from_secs(3600);
415 let batch = select_sync_batch(&mut state, 4, cooldown);
416
417 assert!(batch.is_empty());
418 assert!(state.order.is_empty());
419 }
420
421 #[test]
424 fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
425 let peers = vec![
426 peer_id_from_byte(1),
427 peer_id_from_byte(2),
428 peer_id_from_byte(3),
429 peer_id_from_byte(4),
430 ];
431 let mut state = NeighborSyncState::new_cycle(peers);
432 state.cursor = 2;
434
435 let replacement =
437 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
438
439 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(2)));
442
443 assert!(replacement.is_some());
447 }
448
449 #[test]
450 fn handle_sync_failure_removes_peer_after_cursor() {
451 let peers = vec![
452 peer_id_from_byte(1),
453 peer_id_from_byte(2),
454 peer_id_from_byte(3),
455 peer_id_from_byte(4),
456 ];
457 let mut state = NeighborSyncState::new_cycle(peers);
458 state.cursor = 1;
459
460 let replacement =
462 handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
463
464 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(3)));
467
468 assert_eq!(replacement, Some(peer_id_from_byte(2)));
470 }
471
472 #[test]
473 fn handle_sync_failure_no_replacement_when_exhausted() {
474 let peers = vec![peer_id_from_byte(1)];
475 let mut state = NeighborSyncState::new_cycle(peers);
476 state.cursor = 1; let replacement =
479 handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
480
481 assert!(state.order.is_empty());
482 assert!(replacement.is_none());
483 }
484
485 #[test]
486 fn handle_sync_failure_unknown_peer_is_noop() {
487 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
488 let mut state = NeighborSyncState::new_cycle(peers);
489 state.cursor = 1;
490
491 let replacement =
492 handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
493
494 assert_eq!(state.order.len(), 2);
496 assert_eq!(replacement, Some(peer_id_from_byte(2)));
498 assert_eq!(state.cursor, 2);
499 }
500
501 #[test]
504 fn record_successful_sync_updates_last_sync_time() {
505 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
506 let mut state = NeighborSyncState::new_cycle(peers);
507 let peer = peer_id_from_byte(1);
508
509 assert!(!state.last_sync_times.contains_key(&peer));
510
511 let before = Instant::now();
512 record_successful_sync(&mut state, &peer);
513 let after = Instant::now();
514
515 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
516 assert!(*ts >= before);
517 assert!(*ts <= after);
518 }
519
520 #[test]
521 fn record_successful_sync_overwrites_previous() {
522 let peers = vec![peer_id_from_byte(1)];
523 let mut state = NeighborSyncState::new_cycle(peers);
524 let peer = peer_id_from_byte(1);
525
526 let old_time = Instant::now()
529 .checked_sub(Duration::from_secs(2))
530 .unwrap_or_else(Instant::now);
531 state.last_sync_times.insert(peer, old_time);
532
533 record_successful_sync(&mut state, &peer);
534
535 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
536 assert!(*ts > old_time, "sync time should be updated");
537 }
538
539 #[test]
542 fn scenario_35_round_robin_with_cooldown_skip() {
543 let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
550 let mut state = NeighborSyncState::new_cycle(peers);
551 let batch_size = 4;
552 let cooldown = Duration::from_secs(3600);
553
554 state
556 .last_sync_times
557 .insert(peer_id_from_byte(2), Instant::now());
558 state
559 .last_sync_times
560 .insert(peer_id_from_byte(4), Instant::now());
561
562 let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
565 assert_eq!(batch1.len(), 4);
566 assert_eq!(batch1[0], peer_id_from_byte(1));
567 assert_eq!(batch1[1], peer_id_from_byte(3));
568 assert_eq!(batch1[2], peer_id_from_byte(5));
569 assert_eq!(batch1[3], peer_id_from_byte(6));
570
571 assert!(!state.order.contains(&peer_id_from_byte(2)));
573 assert!(!state.order.contains(&peer_id_from_byte(4)));
574
575 let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
577 assert_eq!(batch2.len(), 2);
578 assert_eq!(batch2[0], peer_id_from_byte(7));
579 assert_eq!(batch2[1], peer_id_from_byte(8));
580
581 assert!(state.is_cycle_complete());
583 }
584
585 #[test]
586 fn cycle_complete_when_cursor_past_order() {
587 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
589 let mut state = NeighborSyncState::new_cycle(peers);
590
591 assert!(!state.is_cycle_complete());
593
594 state.cursor = 3;
596 assert!(state.is_cycle_complete());
597
598 state.cursor = 10;
600 assert!(state.is_cycle_complete());
601
602 state.order.clear();
604 state.cursor = 0;
605 assert!(state.is_cycle_complete());
606 }
607
608 #[test]
625 fn scenario_36_post_cycle_triggers_combined_prune_pass() {
626 let config = ReplicationConfig::default();
627
628 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
630 let mut state = NeighborSyncState::new_cycle(peers);
631 let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
632 assert!(
633 state.is_cycle_complete(),
634 "cycle must be complete before prune pass triggers"
635 );
636
637 assert!(
639 !config.prune_hysteresis_duration.is_zero(),
640 "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
641 );
642
643 let record_key: [u8; 32] = [0x36; 32];
652 let paid_key: [u8; 32] = [0x37; 32];
653
654 let record_first_seen = Instant::now();
656 let paid_first_seen = Instant::now();
657
658 let record_elapsed = record_first_seen.elapsed();
660 let paid_elapsed = paid_first_seen.elapsed();
661 assert!(
662 record_elapsed < config.prune_hysteresis_duration,
663 "record key should be retained within hysteresis window"
664 );
665 assert!(
666 paid_elapsed < config.prune_hysteresis_duration,
667 "paid key should be retained within hysteresis window"
668 );
669
670 assert_ne!(
673 record_key, paid_key,
674 "record and paid pruning keys must be independent"
675 );
676
677 let new_state = NeighborSyncState::new_cycle(vec![
679 peer_id_from_byte(1),
680 peer_id_from_byte(2),
681 peer_id_from_byte(3),
682 ]);
683 assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
684 assert!(
685 !new_state.is_cycle_complete(),
686 "new cycle should not be immediately complete"
687 );
688 }
689
690 #[test]
691 fn scenario_38_mid_cycle_peer_join_excluded() {
692 let peers = vec![
696 peer_id_from_byte(0xA),
697 peer_id_from_byte(0xB),
698 peer_id_from_byte(0xC),
699 ];
700 let mut state = NeighborSyncState::new_cycle(peers);
701
702 let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
704 assert_eq!(state.cursor, 1);
705
706 let peer_d = peer_id_from_byte(0xD);
708 assert!(
709 !state.order.contains(&peer_d),
710 "mid-cycle joiner must not appear in the current snapshot"
711 );
712
713 let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
715 assert!(state.is_cycle_complete());
716
717 let new_peers = vec![
719 peer_id_from_byte(0xA),
720 peer_id_from_byte(0xB),
721 peer_id_from_byte(0xC),
722 peer_d,
723 ];
724 let new_state = NeighborSyncState::new_cycle(new_peers);
725 assert!(
726 new_state.order.contains(&peer_d),
727 "after new snapshot, joiner D should be present"
728 );
729 }
730
731 #[test]
732 fn scenario_39_unreachable_peer_removed_slot_filled() {
733 let peers = vec![
736 peer_id_from_byte(1),
737 peer_id_from_byte(2),
738 peer_id_from_byte(3),
739 peer_id_from_byte(4),
740 peer_id_from_byte(5),
741 ];
742 let mut state = NeighborSyncState::new_cycle(peers);
743
744 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
746 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
747
748 let replacement =
750 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
751 assert!(!state.order.contains(&peer_id_from_byte(2)));
752
753 assert_eq!(
755 replacement,
756 Some(peer_id_from_byte(3)),
757 "vacated slot should be filled by next peer in order"
758 );
759
760 let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
762 assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
763 assert!(state.is_cycle_complete());
764 }
765
766 #[test]
767 fn scenario_40_cooldown_peer_removed_from_snapshot() {
768 let peers = vec![
772 peer_id_from_byte(1),
773 peer_id_from_byte(2),
774 peer_id_from_byte(3),
775 ];
776 let mut state = NeighborSyncState::new_cycle(peers);
777 let cooldown = Duration::from_secs(3600);
778
779 state
781 .last_sync_times
782 .insert(peer_id_from_byte(2), Instant::now());
783
784 let batch = select_sync_batch(&mut state, 3, cooldown);
785
786 assert!(!state.order.contains(&peer_id_from_byte(2)));
788 assert_eq!(state.order.len(), 2, "order should shrink by 1");
789
790 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
792
793 assert!(state.is_cycle_complete());
795 }
796
797 #[test]
798 fn scenario_41_cycle_always_terminates() {
799 let peer_count: u8 = 10;
803 let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
804 let mut state = NeighborSyncState::new_cycle(peers);
805 let cooldown = Duration::from_secs(3600);
806
807 for i in 1..=peer_count {
809 state
810 .last_sync_times
811 .insert(peer_id_from_byte(i), Instant::now());
812 }
813
814 let batch = select_sync_batch(&mut state, 4, cooldown);
815
816 assert!(
817 batch.is_empty(),
818 "all peers on cooldown — batch must be empty"
819 );
820 assert!(state.order.is_empty(), "all peers should have been removed");
821 assert!(
822 state.is_cycle_complete(),
823 "cycle must terminate when all peers are removed"
824 );
825 }
826
827 #[test]
828 fn consecutive_rounds_advance_through_full_cycle() {
829 let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
833 let mut state = NeighborSyncState::new_cycle(peers);
834 let batch_size = 2;
835 let no_cooldown = Duration::from_secs(0);
836
837 let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
838 assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
839 assert_eq!(state.cursor, 2);
840 assert!(!state.is_cycle_complete());
841
842 let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
843 assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
844 assert_eq!(state.cursor, 4);
845 assert!(!state.is_cycle_complete());
846
847 let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
848 assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
849 assert_eq!(state.cursor, 6);
850 assert!(state.is_cycle_complete());
851
852 let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
854 assert!(round4.is_empty());
855 }
856
857 #[test]
878 fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
879 let sender = peer_id_from_byte(0x37);
880
881 let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
884 let outbound_paid_hints = vec![[0x03; 32]];
885 let response = NeighborSyncResponse {
886 replica_hints: outbound_replica_hints.clone(),
887 paid_hints: outbound_paid_hints.clone(),
888 bootstrapping: false,
889 rejected_keys: Vec::new(),
890 };
891
892 let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
894 let inbound_paid_hints = vec![[0xB0; 32]];
895
896 let sender_in_rt = false;
898 let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
899
900 assert_eq!(
902 response.replica_hints, outbound_replica_hints,
903 "outbound replica hints must be sent even when sender is not in LocalRT"
904 );
905 assert_eq!(
906 response.paid_hints, outbound_paid_hints,
907 "outbound paid hints must be sent even when sender is not in LocalRT"
908 );
909
910 if !sender_in_rt {
912 let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
915 let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
916
917 for key in &inbound_replica_hints {
918 assert!(
919 !admitted_replica_keys.contains(key),
920 "inbound replica hints must NOT be admitted from non-RT sender"
921 );
922 }
923 for key in &inbound_paid_hints {
924 assert!(
925 !admitted_paid_keys.contains(key),
926 "inbound paid hints must NOT be admitted from non-RT sender"
927 );
928 }
929
930 assert!(
932 !sync_history.contains_key(&sender),
933 "sync history must NOT be updated for non-LocalRT sender"
934 );
935 }
936
937 let sender_in_rt = true;
939 assert!(
940 sender_in_rt,
941 "when sender is in LocalRT, inbound hints are processed"
942 );
943
944 sync_history.insert(
946 sender,
947 PeerSyncRecord {
948 last_sync: Some(Instant::now()),
949 cycles_since_sync: 0,
950 },
951 );
952 assert!(
953 sync_history.contains_key(&sender),
954 "sync history should be updated for LocalRT sender"
955 );
956 assert!(
957 sync_history
958 .get(&sender)
959 .expect("sender in history")
960 .last_sync
961 .is_some(),
962 "last_sync should be recorded for RT sender"
963 );
964 }
965
966 #[test]
967 fn cycle_completion_resets_cursor_but_keeps_sync_times() {
968 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
971 let mut state = NeighborSyncState::new_cycle(peers);
972
973 let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
975 record_successful_sync(&mut state, &peer_id_from_byte(1));
976 record_successful_sync(&mut state, &peer_id_from_byte(2));
977 assert!(state.is_cycle_complete());
978
979 let old_sync_times = state.last_sync_times.clone();
981 assert_eq!(old_sync_times.len(), 2);
982
983 let new_peers = vec![
986 peer_id_from_byte(1),
987 peer_id_from_byte(2),
988 peer_id_from_byte(3),
989 ];
990 let mut new_state = NeighborSyncState::new_cycle(new_peers);
991 new_state.last_sync_times = old_sync_times;
992
993 assert_eq!(new_state.cursor, 0);
995 assert!(!new_state.is_cycle_complete());
996
997 assert_eq!(new_state.last_sync_times.len(), 2);
999 assert!(new_state
1000 .last_sync_times
1001 .contains_key(&peer_id_from_byte(1)));
1002 assert!(new_state
1003 .last_sync_times
1004 .contains_key(&peer_id_from_byte(2)));
1005
1006 let cooldown = Duration::from_secs(3600);
1009 let batch = select_sync_batch(&mut new_state, 3, cooldown);
1010 assert_eq!(
1011 batch,
1012 std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1013 "only the new peer should be selected; old peers are on cooldown"
1014 );
1015 }
1016}