1use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use crate::logging::{debug, warn};
10use rand::Rng;
11use saorsa_core::identity::PeerId;
12use saorsa_core::P2PNode;
13
14use crate::ant_protocol::XorName;
15use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
16use crate::replication::paid_list::PaidList;
17use crate::replication::protocol::{
18 NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
19};
20use crate::replication::types::NeighborSyncState;
21use crate::storage::LmdbStorage;
22
23pub async fn build_replica_hints_for_peer(
28 peer: &PeerId,
29 storage: &Arc<LmdbStorage>,
30 p2p_node: &Arc<P2PNode>,
31 close_group_size: usize,
32) -> Vec<XorName> {
33 let all_keys = match storage.all_keys().await {
34 Ok(keys) => keys,
35 Err(e) => {
36 warn!("Failed to read stored keys for hint construction: {e}");
37 return Vec::new();
38 }
39 };
40
41 let dht = p2p_node.dht_manager();
42 let mut hints = Vec::new();
43 for key in all_keys {
44 let closest = dht
45 .find_closest_nodes_local_with_self(&key, close_group_size)
46 .await;
47 if closest.iter().any(|n| n.peer_id == *peer) {
48 hints.push(key);
49 }
50 }
51 hints
52}
53
54pub async fn build_paid_hints_for_peer(
59 peer: &PeerId,
60 paid_list: &Arc<PaidList>,
61 p2p_node: &Arc<P2PNode>,
62 paid_list_close_group_size: usize,
63) -> Vec<XorName> {
64 let all_paid_keys = match paid_list.all_keys() {
65 Ok(keys) => keys,
66 Err(e) => {
67 warn!("Failed to read PaidForList for hint construction: {e}");
68 return Vec::new();
69 }
70 };
71
72 let dht = p2p_node.dht_manager();
73 let mut hints = Vec::new();
74 for key in all_paid_keys {
75 let closest = dht
76 .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
77 .await;
78 if closest.iter().any(|n| n.peer_id == *peer) {
79 hints.push(key);
80 }
81 }
82 hints
83}
84
85pub async fn snapshot_close_neighbors(
90 p2p_node: &Arc<P2PNode>,
91 self_id: &PeerId,
92 scope: usize,
93) -> Vec<PeerId> {
94 let self_xor: XorName = *self_id.as_bytes();
95 let closest = p2p_node
96 .dht_manager()
97 .find_closest_nodes_local(&self_xor, scope)
98 .await;
99 closest.iter().map(|n| n.peer_id).collect()
100}
101
102pub fn select_sync_batch(
107 state: &mut NeighborSyncState,
108 peer_count: usize,
109 cooldown: Duration,
110) -> Vec<PeerId> {
111 let mut batch = Vec::new();
112 let now = Instant::now();
113
114 while batch.len() < peer_count && state.cursor < state.order.len() {
115 let peer = state.order[state.cursor];
116
117 if let Some(last_sync) = state.last_sync_times.get(&peer) {
121 if now.duration_since(*last_sync) < cooldown {
122 state.order.remove(state.cursor);
123 continue;
124 }
125 }
126
127 batch.push(peer);
128 state.cursor += 1;
129 }
130
131 batch
132}
133
134pub async fn sync_with_peer(
139 peer: &PeerId,
140 p2p_node: &Arc<P2PNode>,
141 storage: &Arc<LmdbStorage>,
142 paid_list: &Arc<PaidList>,
143 config: &ReplicationConfig,
144 is_bootstrapping: bool,
145) -> Option<NeighborSyncResponse> {
146 let replica_hints =
148 build_replica_hints_for_peer(peer, storage, p2p_node, config.close_group_size).await;
149 let paid_hints =
150 build_paid_hints_for_peer(peer, paid_list, p2p_node, config.paid_list_close_group_size)
151 .await;
152
153 let request = NeighborSyncRequest {
154 replica_hints,
155 paid_hints,
156 bootstrapping: is_bootstrapping,
157 };
158 let request_id = rand::thread_rng().gen::<u64>();
159 let msg = ReplicationMessage {
160 request_id,
161 body: ReplicationMessageBody::NeighborSyncRequest(request),
162 };
163
164 let encoded = match msg.encode() {
165 Ok(data) => data,
166 Err(e) => {
167 warn!("Failed to encode sync request for {peer}: {e}");
168 return None;
169 }
170 };
171
172 let response = match p2p_node
173 .send_request(
174 peer,
175 REPLICATION_PROTOCOL_ID,
176 encoded,
177 config.verification_request_timeout,
178 )
179 .await
180 {
181 Ok(resp) => resp,
182 Err(e) => {
183 debug!("Sync with {peer} failed: {e}");
184 return None;
185 }
186 };
187
188 match ReplicationMessage::decode(&response.data) {
189 Ok(decoded) => {
190 if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
191 Some(resp)
192 } else {
193 warn!("Unexpected response type from {peer} during sync");
194 None
195 }
196 }
197 Err(e) => {
198 warn!("Failed to decode sync response from {peer}: {e}");
199 None
200 }
201 }
202}
203
204pub fn handle_sync_failure(
212 state: &mut NeighborSyncState,
213 failed_peer: &PeerId,
214 cooldown: Duration,
215) -> Option<PeerId> {
216 if let Some(pos) = state.order.iter().position(|p| p == failed_peer) {
218 state.order.remove(pos);
219 if pos < state.cursor {
221 state.cursor = state.cursor.saturating_sub(1);
222 }
223 }
224
225 let now = Instant::now();
228 while state.cursor < state.order.len() {
229 let candidate = state.order[state.cursor];
230
231 if let Some(last_sync) = state.last_sync_times.get(&candidate) {
232 if now.duration_since(*last_sync) < cooldown {
233 state.order.remove(state.cursor);
234 continue;
235 }
236 }
237
238 state.cursor += 1;
239 return Some(candidate);
240 }
241
242 None
243}
244
245pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
247 state.last_sync_times.insert(*peer, Instant::now());
248}
249
250pub async fn handle_sync_request(
258 sender: &PeerId,
259 _request: &NeighborSyncRequest,
260 p2p_node: &Arc<P2PNode>,
261 storage: &Arc<LmdbStorage>,
262 paid_list: &Arc<PaidList>,
263 config: &ReplicationConfig,
264 is_bootstrapping: bool,
265) -> (NeighborSyncResponse, bool) {
266 let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
267
268 let replica_hints =
270 build_replica_hints_for_peer(sender, storage, p2p_node, config.close_group_size).await;
271 let paid_hints = build_paid_hints_for_peer(
272 sender,
273 paid_list,
274 p2p_node,
275 config.paid_list_close_group_size,
276 )
277 .await;
278
279 let response = NeighborSyncResponse {
280 replica_hints,
281 paid_hints,
282 bootstrapping: is_bootstrapping,
283 rejected_keys: Vec::new(),
284 };
285
286 (response, sender_in_rt)
288}
289
290#[cfg(test)]
295#[allow(clippy::unwrap_used, clippy::expect_used)]
296mod tests {
297 use super::*;
298 use crate::replication::types::PeerSyncRecord;
299 use std::collections::HashMap;
300
301 fn peer_id_from_byte(b: u8) -> PeerId {
303 let mut bytes = [0u8; 32];
304 bytes[0] = b;
305 PeerId::from_bytes(bytes)
306 }
307
308 #[test]
311 fn select_sync_batch_returns_up_to_peer_count() {
312 let peers = vec![
313 peer_id_from_byte(1),
314 peer_id_from_byte(2),
315 peer_id_from_byte(3),
316 peer_id_from_byte(4),
317 peer_id_from_byte(5),
318 ];
319 let mut state = NeighborSyncState::new_cycle(peers);
320 let batch_size = 3;
321
322 let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
323
324 assert_eq!(batch.len(), batch_size);
325 assert_eq!(batch[0], peer_id_from_byte(1));
326 assert_eq!(batch[1], peer_id_from_byte(2));
327 assert_eq!(batch[2], peer_id_from_byte(3));
328 assert_eq!(state.cursor, 3);
329 }
330
331 #[test]
332 fn select_sync_batch_skips_cooldown_peers() {
333 let peers = vec![
334 peer_id_from_byte(1),
335 peer_id_from_byte(2),
336 peer_id_from_byte(3),
337 peer_id_from_byte(4),
338 ];
339 let mut state = NeighborSyncState::new_cycle(peers);
340
341 state
343 .last_sync_times
344 .insert(peer_id_from_byte(1), Instant::now());
345 state
346 .last_sync_times
347 .insert(peer_id_from_byte(3), Instant::now());
348
349 let cooldown = Duration::from_secs(3600); let batch = select_sync_batch(&mut state, 2, cooldown);
351
352 assert_eq!(batch.len(), 2);
354 assert_eq!(batch[0], peer_id_from_byte(2));
355 assert_eq!(batch[1], peer_id_from_byte(4));
356
357 assert!(!state.order.contains(&peer_id_from_byte(1)));
359 assert!(!state.order.contains(&peer_id_from_byte(3)));
360 }
361
362 #[test]
363 fn select_sync_batch_expired_cooldown_not_skipped() {
364 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
365 let mut state = NeighborSyncState::new_cycle(peers);
366
367 state.last_sync_times.insert(
372 peer_id_from_byte(1),
373 Instant::now()
374 .checked_sub(Duration::from_secs(2))
375 .unwrap_or_else(Instant::now),
376 );
377
378 let cooldown = Duration::from_secs(1);
379 let batch = select_sync_batch(&mut state, 2, cooldown);
380
381 assert_eq!(batch.len(), 2);
383 assert_eq!(batch[0], peer_id_from_byte(1));
384 assert_eq!(batch[1], peer_id_from_byte(2));
385 }
386
387 #[test]
388 fn select_sync_batch_empty_order() {
389 let mut state = NeighborSyncState::new_cycle(vec![]);
390
391 let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
392
393 assert!(batch.is_empty());
394 assert_eq!(state.cursor, 0);
395 }
396
397 #[test]
398 fn select_sync_batch_all_on_cooldown() {
399 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
400 let mut state = NeighborSyncState::new_cycle(peers);
401
402 state
403 .last_sync_times
404 .insert(peer_id_from_byte(1), Instant::now());
405 state
406 .last_sync_times
407 .insert(peer_id_from_byte(2), Instant::now());
408
409 let cooldown = Duration::from_secs(3600);
410 let batch = select_sync_batch(&mut state, 4, cooldown);
411
412 assert!(batch.is_empty());
413 assert!(state.order.is_empty());
414 }
415
416 #[test]
419 fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
420 let peers = vec![
421 peer_id_from_byte(1),
422 peer_id_from_byte(2),
423 peer_id_from_byte(3),
424 peer_id_from_byte(4),
425 ];
426 let mut state = NeighborSyncState::new_cycle(peers);
427 state.cursor = 2;
429
430 let replacement =
432 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
433
434 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(2)));
437
438 assert!(replacement.is_some());
442 }
443
444 #[test]
445 fn handle_sync_failure_removes_peer_after_cursor() {
446 let peers = vec![
447 peer_id_from_byte(1),
448 peer_id_from_byte(2),
449 peer_id_from_byte(3),
450 peer_id_from_byte(4),
451 ];
452 let mut state = NeighborSyncState::new_cycle(peers);
453 state.cursor = 1;
454
455 let replacement =
457 handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
458
459 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(3)));
462
463 assert_eq!(replacement, Some(peer_id_from_byte(2)));
465 }
466
467 #[test]
468 fn handle_sync_failure_no_replacement_when_exhausted() {
469 let peers = vec![peer_id_from_byte(1)];
470 let mut state = NeighborSyncState::new_cycle(peers);
471 state.cursor = 1; let replacement =
474 handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
475
476 assert!(state.order.is_empty());
477 assert!(replacement.is_none());
478 }
479
480 #[test]
481 fn handle_sync_failure_unknown_peer_is_noop() {
482 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
483 let mut state = NeighborSyncState::new_cycle(peers);
484 state.cursor = 1;
485
486 let replacement =
487 handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
488
489 assert_eq!(state.order.len(), 2);
491 assert_eq!(replacement, Some(peer_id_from_byte(2)));
493 assert_eq!(state.cursor, 2);
494 }
495
496 #[test]
499 fn record_successful_sync_updates_last_sync_time() {
500 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
501 let mut state = NeighborSyncState::new_cycle(peers);
502 let peer = peer_id_from_byte(1);
503
504 assert!(!state.last_sync_times.contains_key(&peer));
505
506 let before = Instant::now();
507 record_successful_sync(&mut state, &peer);
508 let after = Instant::now();
509
510 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
511 assert!(*ts >= before);
512 assert!(*ts <= after);
513 }
514
515 #[test]
516 fn record_successful_sync_overwrites_previous() {
517 let peers = vec![peer_id_from_byte(1)];
518 let mut state = NeighborSyncState::new_cycle(peers);
519 let peer = peer_id_from_byte(1);
520
521 let old_time = Instant::now()
524 .checked_sub(Duration::from_secs(2))
525 .unwrap_or_else(Instant::now);
526 state.last_sync_times.insert(peer, old_time);
527
528 record_successful_sync(&mut state, &peer);
529
530 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
531 assert!(*ts > old_time, "sync time should be updated");
532 }
533
534 #[test]
537 fn scenario_35_round_robin_with_cooldown_skip() {
538 let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
545 let mut state = NeighborSyncState::new_cycle(peers);
546 let batch_size = 4;
547 let cooldown = Duration::from_secs(3600);
548
549 state
551 .last_sync_times
552 .insert(peer_id_from_byte(2), Instant::now());
553 state
554 .last_sync_times
555 .insert(peer_id_from_byte(4), Instant::now());
556
557 let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
560 assert_eq!(batch1.len(), 4);
561 assert_eq!(batch1[0], peer_id_from_byte(1));
562 assert_eq!(batch1[1], peer_id_from_byte(3));
563 assert_eq!(batch1[2], peer_id_from_byte(5));
564 assert_eq!(batch1[3], peer_id_from_byte(6));
565
566 assert!(!state.order.contains(&peer_id_from_byte(2)));
568 assert!(!state.order.contains(&peer_id_from_byte(4)));
569
570 let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
572 assert_eq!(batch2.len(), 2);
573 assert_eq!(batch2[0], peer_id_from_byte(7));
574 assert_eq!(batch2[1], peer_id_from_byte(8));
575
576 assert!(state.is_cycle_complete());
578 }
579
580 #[test]
581 fn cycle_complete_when_cursor_past_order() {
582 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
584 let mut state = NeighborSyncState::new_cycle(peers);
585
586 assert!(!state.is_cycle_complete());
588
589 state.cursor = 3;
591 assert!(state.is_cycle_complete());
592
593 state.cursor = 10;
595 assert!(state.is_cycle_complete());
596
597 state.order.clear();
599 state.cursor = 0;
600 assert!(state.is_cycle_complete());
601 }
602
603 #[test]
620 fn scenario_36_post_cycle_triggers_combined_prune_pass() {
621 let config = ReplicationConfig::default();
622
623 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
625 let mut state = NeighborSyncState::new_cycle(peers);
626 let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
627 assert!(
628 state.is_cycle_complete(),
629 "cycle must be complete before prune pass triggers"
630 );
631
632 assert!(
634 !config.prune_hysteresis_duration.is_zero(),
635 "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
636 );
637
638 let record_key: [u8; 32] = [0x36; 32];
647 let paid_key: [u8; 32] = [0x37; 32];
648
649 let record_first_seen = Instant::now();
651 let paid_first_seen = Instant::now();
652
653 let record_elapsed = record_first_seen.elapsed();
655 let paid_elapsed = paid_first_seen.elapsed();
656 assert!(
657 record_elapsed < config.prune_hysteresis_duration,
658 "record key should be retained within hysteresis window"
659 );
660 assert!(
661 paid_elapsed < config.prune_hysteresis_duration,
662 "paid key should be retained within hysteresis window"
663 );
664
665 assert_ne!(
668 record_key, paid_key,
669 "record and paid pruning keys must be independent"
670 );
671
672 let new_state = NeighborSyncState::new_cycle(vec![
674 peer_id_from_byte(1),
675 peer_id_from_byte(2),
676 peer_id_from_byte(3),
677 ]);
678 assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
679 assert!(
680 !new_state.is_cycle_complete(),
681 "new cycle should not be immediately complete"
682 );
683 }
684
685 #[test]
686 fn scenario_38_mid_cycle_peer_join_excluded() {
687 let peers = vec![
691 peer_id_from_byte(0xA),
692 peer_id_from_byte(0xB),
693 peer_id_from_byte(0xC),
694 ];
695 let mut state = NeighborSyncState::new_cycle(peers);
696
697 let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
699 assert_eq!(state.cursor, 1);
700
701 let peer_d = peer_id_from_byte(0xD);
703 assert!(
704 !state.order.contains(&peer_d),
705 "mid-cycle joiner must not appear in the current snapshot"
706 );
707
708 let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
710 assert!(state.is_cycle_complete());
711
712 let new_peers = vec![
714 peer_id_from_byte(0xA),
715 peer_id_from_byte(0xB),
716 peer_id_from_byte(0xC),
717 peer_d,
718 ];
719 let new_state = NeighborSyncState::new_cycle(new_peers);
720 assert!(
721 new_state.order.contains(&peer_d),
722 "after new snapshot, joiner D should be present"
723 );
724 }
725
726 #[test]
727 fn scenario_39_unreachable_peer_removed_slot_filled() {
728 let peers = vec![
731 peer_id_from_byte(1),
732 peer_id_from_byte(2),
733 peer_id_from_byte(3),
734 peer_id_from_byte(4),
735 peer_id_from_byte(5),
736 ];
737 let mut state = NeighborSyncState::new_cycle(peers);
738
739 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
741 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
742
743 let replacement =
745 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
746 assert!(!state.order.contains(&peer_id_from_byte(2)));
747
748 assert_eq!(
750 replacement,
751 Some(peer_id_from_byte(3)),
752 "vacated slot should be filled by next peer in order"
753 );
754
755 let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
757 assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
758 assert!(state.is_cycle_complete());
759 }
760
761 #[test]
762 fn scenario_40_cooldown_peer_removed_from_snapshot() {
763 let peers = vec![
767 peer_id_from_byte(1),
768 peer_id_from_byte(2),
769 peer_id_from_byte(3),
770 ];
771 let mut state = NeighborSyncState::new_cycle(peers);
772 let cooldown = Duration::from_secs(3600);
773
774 state
776 .last_sync_times
777 .insert(peer_id_from_byte(2), Instant::now());
778
779 let batch = select_sync_batch(&mut state, 3, cooldown);
780
781 assert!(!state.order.contains(&peer_id_from_byte(2)));
783 assert_eq!(state.order.len(), 2, "order should shrink by 1");
784
785 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
787
788 assert!(state.is_cycle_complete());
790 }
791
792 #[test]
793 fn scenario_41_cycle_always_terminates() {
794 let peer_count: u8 = 10;
798 let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
799 let mut state = NeighborSyncState::new_cycle(peers);
800 let cooldown = Duration::from_secs(3600);
801
802 for i in 1..=peer_count {
804 state
805 .last_sync_times
806 .insert(peer_id_from_byte(i), Instant::now());
807 }
808
809 let batch = select_sync_batch(&mut state, 4, cooldown);
810
811 assert!(
812 batch.is_empty(),
813 "all peers on cooldown — batch must be empty"
814 );
815 assert!(state.order.is_empty(), "all peers should have been removed");
816 assert!(
817 state.is_cycle_complete(),
818 "cycle must terminate when all peers are removed"
819 );
820 }
821
822 #[test]
823 fn consecutive_rounds_advance_through_full_cycle() {
824 let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
828 let mut state = NeighborSyncState::new_cycle(peers);
829 let batch_size = 2;
830 let no_cooldown = Duration::from_secs(0);
831
832 let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
833 assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
834 assert_eq!(state.cursor, 2);
835 assert!(!state.is_cycle_complete());
836
837 let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
838 assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
839 assert_eq!(state.cursor, 4);
840 assert!(!state.is_cycle_complete());
841
842 let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
843 assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
844 assert_eq!(state.cursor, 6);
845 assert!(state.is_cycle_complete());
846
847 let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
849 assert!(round4.is_empty());
850 }
851
852 #[test]
873 fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
874 let sender = peer_id_from_byte(0x37);
875
876 let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
879 let outbound_paid_hints = vec![[0x03; 32]];
880 let response = NeighborSyncResponse {
881 replica_hints: outbound_replica_hints.clone(),
882 paid_hints: outbound_paid_hints.clone(),
883 bootstrapping: false,
884 rejected_keys: Vec::new(),
885 };
886
887 let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
889 let inbound_paid_hints = vec![[0xB0; 32]];
890
891 let sender_in_rt = false;
893 let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
894
895 assert_eq!(
897 response.replica_hints, outbound_replica_hints,
898 "outbound replica hints must be sent even when sender is not in LocalRT"
899 );
900 assert_eq!(
901 response.paid_hints, outbound_paid_hints,
902 "outbound paid hints must be sent even when sender is not in LocalRT"
903 );
904
905 if !sender_in_rt {
907 let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
910 let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
911
912 for key in &inbound_replica_hints {
913 assert!(
914 !admitted_replica_keys.contains(key),
915 "inbound replica hints must NOT be admitted from non-RT sender"
916 );
917 }
918 for key in &inbound_paid_hints {
919 assert!(
920 !admitted_paid_keys.contains(key),
921 "inbound paid hints must NOT be admitted from non-RT sender"
922 );
923 }
924
925 assert!(
927 !sync_history.contains_key(&sender),
928 "sync history must NOT be updated for non-LocalRT sender"
929 );
930 }
931
932 let sender_in_rt = true;
934 assert!(
935 sender_in_rt,
936 "when sender is in LocalRT, inbound hints are processed"
937 );
938
939 sync_history.insert(
941 sender,
942 PeerSyncRecord {
943 last_sync: Some(Instant::now()),
944 cycles_since_sync: 0,
945 },
946 );
947 assert!(
948 sync_history.contains_key(&sender),
949 "sync history should be updated for LocalRT sender"
950 );
951 assert!(
952 sync_history
953 .get(&sender)
954 .expect("sender in history")
955 .last_sync
956 .is_some(),
957 "last_sync should be recorded for RT sender"
958 );
959 }
960
961 #[test]
962 fn cycle_completion_resets_cursor_but_keeps_sync_times() {
963 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
966 let mut state = NeighborSyncState::new_cycle(peers);
967
968 let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
970 record_successful_sync(&mut state, &peer_id_from_byte(1));
971 record_successful_sync(&mut state, &peer_id_from_byte(2));
972 assert!(state.is_cycle_complete());
973
974 let old_sync_times = state.last_sync_times.clone();
976 assert_eq!(old_sync_times.len(), 2);
977
978 let new_peers = vec![
981 peer_id_from_byte(1),
982 peer_id_from_byte(2),
983 peer_id_from_byte(3),
984 ];
985 let mut new_state = NeighborSyncState::new_cycle(new_peers);
986 new_state.last_sync_times = old_sync_times;
987
988 assert_eq!(new_state.cursor, 0);
990 assert!(!new_state.is_cycle_complete());
991
992 assert_eq!(new_state.last_sync_times.len(), 2);
994 assert!(new_state
995 .last_sync_times
996 .contains_key(&peer_id_from_byte(1)));
997 assert!(new_state
998 .last_sync_times
999 .contains_key(&peer_id_from_byte(2)));
1000
1001 let cooldown = Duration::from_secs(3600);
1004 let batch = select_sync_batch(&mut new_state, 3, cooldown);
1005 assert_eq!(
1006 batch,
1007 std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1008 "only the new peer should be selected; old peers are on cooldown"
1009 );
1010 }
1011}