1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::logging::{debug, info, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14
15use crate::ant_protocol::XorName;
16use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
17use crate::replication::paid_list::PaidList;
18use crate::replication::protocol::{
19 NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
20};
21use crate::replication::types::NeighborSyncState;
22use crate::storage::LmdbStorage;
23
24const HINT_BUILD_SLOW_LOG_MS: u128 = 250;
26
27#[derive(Debug)]
30pub(crate) struct SentReplicaHint {
31 pub(crate) key: XorName,
33 pub(crate) close_peers: HashSet<PeerId>,
35}
36
37#[derive(Debug)]
39pub(crate) struct NeighborSyncOutcome {
40 pub(crate) response: NeighborSyncResponse,
42 pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
44}
45
46#[derive(Debug, Default)]
48pub(crate) struct PeerSyncHints {
49 pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
52 paid_hints: Vec<XorName>,
54}
55
56pub async fn build_replica_hints_for_peer(
66 peer: &PeerId,
67 storage: &Arc<LmdbStorage>,
68 p2p_node: &Arc<P2PNode>,
69 close_group_size: usize,
70) -> Vec<XorName> {
71 build_replica_hints_for_peer_with_close_groups(peer, storage, p2p_node, close_group_size)
72 .await
73 .into_iter()
74 .map(|hint| hint.key)
75 .collect()
76}
77
78pub(crate) async fn build_replica_hints_for_peer_with_close_groups(
79 peer: &PeerId,
80 storage: &Arc<LmdbStorage>,
81 p2p_node: &Arc<P2PNode>,
82 close_group_size: usize,
83) -> Vec<SentReplicaHint> {
84 let all_keys = match storage.all_keys().await {
85 Ok(keys) => keys,
86 Err(e) => {
87 warn!("Failed to read stored keys for hint construction: {e}");
88 return Vec::new();
89 }
90 };
91
92 let dht = p2p_node.dht_manager();
93 let mut hints = Vec::new();
94 for key in all_keys {
95 let closest = dht
96 .find_closest_nodes_local_with_self(&key, close_group_size)
97 .await;
98 let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
99 if close_peers.contains(peer) {
100 hints.push(SentReplicaHint { key, close_peers });
101 }
102 }
103 hints
104}
105
106pub(crate) async fn build_sync_hints_for_peers(
109 peers: &[PeerId],
110 storage: &Arc<LmdbStorage>,
111 paid_list: &Arc<PaidList>,
112 p2p_node: &Arc<P2PNode>,
113 close_group_size: usize,
114 paid_list_close_group_size: usize,
115) -> HashMap<PeerId, PeerSyncHints> {
116 let started = Instant::now();
117 let target_peers = peers.iter().copied().collect::<HashSet<_>>();
118 let mut hints_by_peer = peers
119 .iter()
120 .copied()
121 .map(|peer| (peer, PeerSyncHints::default()))
122 .collect::<HashMap<_, _>>();
123
124 if peers.is_empty() {
125 return hints_by_peer;
126 }
127
128 let all_keys = match storage.all_keys().await {
129 Ok(keys) => keys,
130 Err(e) => {
131 warn!("Failed to read stored keys for batch hint construction: {e}");
132 Vec::new()
133 }
134 };
135 let stored_key_count = all_keys.len();
136
137 let dht = p2p_node.dht_manager();
138 for key in all_keys {
139 let closest = dht
140 .find_closest_nodes_local_with_self(&key, close_group_size)
141 .await;
142 let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
143 for peer in close_peers.intersection(&target_peers) {
144 if let Some(peer_hints) = hints_by_peer.get_mut(peer) {
145 peer_hints.sent_replica_hints.push(SentReplicaHint {
146 key,
147 close_peers: close_peers.clone(),
148 });
149 }
150 }
151 }
152
153 let all_paid_keys = match paid_list.all_keys() {
154 Ok(keys) => keys,
155 Err(e) => {
156 warn!("Failed to read PaidForList for batch hint construction: {e}");
157 Vec::new()
158 }
159 };
160 let paid_key_count = all_paid_keys.len();
161
162 for key in all_paid_keys {
163 let closest = dht
164 .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
165 .await;
166 for node in closest {
167 if target_peers.contains(&node.peer_id) {
168 if let Some(peer_hints) = hints_by_peer.get_mut(&node.peer_id) {
169 peer_hints.paid_hints.push(key);
170 }
171 }
172 }
173 }
174
175 let replica_hint_count = hints_by_peer
176 .values()
177 .map(|hints| hints.sent_replica_hints.len())
178 .sum::<usize>();
179 let paid_hint_count = hints_by_peer
180 .values()
181 .map(|hints| hints.paid_hints.len())
182 .sum::<usize>();
183 let elapsed_ms = started.elapsed().as_millis();
184
185 if elapsed_ms >= HINT_BUILD_SLOW_LOG_MS {
186 info!(
187 target: "ant_node::replication::neighbor_sync",
188 "Slow neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
189 peers.len(),
190 stored_key_count,
191 paid_key_count,
192 replica_hint_count,
193 paid_hint_count,
194 );
195 } else {
196 debug!(
197 target: "ant_node::replication::neighbor_sync",
198 "Neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
199 peers.len(),
200 stored_key_count,
201 paid_key_count,
202 replica_hint_count,
203 paid_hint_count,
204 );
205 }
206
207 hints_by_peer
208}
209
210pub async fn build_paid_hints_for_peer(
215 peer: &PeerId,
216 paid_list: &Arc<PaidList>,
217 p2p_node: &Arc<P2PNode>,
218 paid_list_close_group_size: usize,
219) -> Vec<XorName> {
220 let all_paid_keys = match paid_list.all_keys() {
221 Ok(keys) => keys,
222 Err(e) => {
223 warn!("Failed to read PaidForList for hint construction: {e}");
224 return Vec::new();
225 }
226 };
227
228 let dht = p2p_node.dht_manager();
229 let mut hints = Vec::new();
230 for key in all_paid_keys {
231 let closest = dht
232 .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
233 .await;
234 if closest.iter().any(|n| n.peer_id == *peer) {
235 hints.push(key);
236 }
237 }
238 hints
239}
240
241pub async fn snapshot_close_neighbors(
246 p2p_node: &Arc<P2PNode>,
247 self_id: &PeerId,
248 scope: usize,
249) -> Vec<PeerId> {
250 let self_xor: XorName = *self_id.as_bytes();
251 let closest = p2p_node
252 .dht_manager()
253 .find_closest_nodes_local(&self_xor, scope)
254 .await;
255 closest.iter().map(|n| n.peer_id).collect()
256}
257
258pub fn select_sync_batch(
264 state: &mut NeighborSyncState,
265 peer_count: usize,
266 cooldown: Duration,
267) -> Vec<PeerId> {
268 let mut batch = Vec::new();
269 let now = Instant::now();
270
271 while batch.len() < peer_count {
272 let Some(peer) = select_next_sync_peer(state, now, cooldown) else {
273 break;
274 };
275 batch.push(peer);
276 }
277
278 batch
279}
280
281fn select_next_sync_peer(
282 state: &mut NeighborSyncState,
283 now: Instant,
284 cooldown: Duration,
285) -> Option<PeerId> {
286 while let Some(peer) = state.priority_order.pop_front() {
287 if peer_on_cooldown(state, &peer, now, cooldown) {
288 state.remove_peer(&peer);
289 continue;
290 }
291
292 state.remove_peer(&peer);
293 return Some(peer);
294 }
295
296 while state.cursor < state.order.len() {
297 let peer = state.order[state.cursor];
298
299 if peer_on_cooldown(state, &peer, now, cooldown) {
303 state.order.remove(state.cursor);
304 continue;
305 }
306
307 state.cursor += 1;
308 return Some(peer);
309 }
310
311 None
312}
313
314fn peer_on_cooldown(
315 state: &NeighborSyncState,
316 peer: &PeerId,
317 now: Instant,
318 cooldown: Duration,
319) -> bool {
320 state
321 .last_sync_times
322 .get(peer)
323 .is_some_and(|last_sync| now.duration_since(*last_sync) < cooldown)
324}
325
326pub async fn sync_with_peer(
331 peer: &PeerId,
332 p2p_node: &Arc<P2PNode>,
333 storage: &Arc<LmdbStorage>,
334 paid_list: &Arc<PaidList>,
335 config: &ReplicationConfig,
336 is_bootstrapping: bool,
337) -> Option<NeighborSyncResponse> {
338 sync_with_peer_with_outcome(
339 peer,
340 p2p_node,
341 storage,
342 paid_list,
343 config,
344 is_bootstrapping,
345 None,
346 )
347 .await
348 .map(|outcome| outcome.response)
349}
350
351#[allow(clippy::too_many_arguments)]
355pub(crate) async fn sync_with_peer_with_outcome(
356 peer: &PeerId,
357 p2p_node: &Arc<P2PNode>,
358 storage: &Arc<LmdbStorage>,
359 paid_list: &Arc<PaidList>,
360 config: &ReplicationConfig,
361 is_bootstrapping: bool,
362 commitment: Option<crate::replication::commitment::StorageCommitment>,
363) -> Option<NeighborSyncOutcome> {
364 let mut hints_by_peer = build_sync_hints_for_peers(
366 std::slice::from_ref(peer),
367 storage,
368 paid_list,
369 p2p_node,
370 config.close_group_size,
371 config.paid_list_close_group_size,
372 )
373 .await;
374 let hints = hints_by_peer.remove(peer).unwrap_or_default();
375 sync_with_peer_with_hints(peer, p2p_node, config, is_bootstrapping, hints, commitment).await
376}
377
378#[allow(clippy::too_many_arguments)]
379pub(crate) async fn sync_with_peer_with_hints(
380 peer: &PeerId,
381 p2p_node: &Arc<P2PNode>,
382 config: &ReplicationConfig,
383 is_bootstrapping: bool,
384 hints: PeerSyncHints,
385 commitment: Option<crate::replication::commitment::StorageCommitment>,
389) -> Option<NeighborSyncOutcome> {
390 let replica_hints = hints
391 .sent_replica_hints
392 .iter()
393 .map(|hint| hint.key)
394 .collect::<Vec<_>>();
395 let sent_replica_hints = hints.sent_replica_hints;
396
397 let request = NeighborSyncRequest {
398 replica_hints,
399 paid_hints: hints.paid_hints,
400 bootstrapping: is_bootstrapping,
401 commitment,
402 };
403 let request_id = rand::thread_rng().gen::<u64>();
404 let msg = ReplicationMessage {
405 request_id,
406 body: ReplicationMessageBody::NeighborSyncRequest(request),
407 };
408
409 let encoded = match msg.encode() {
410 Ok(data) => data,
411 Err(e) => {
412 warn!("Failed to encode sync request for {peer}: {e}");
413 return None;
414 }
415 };
416
417 let response = match p2p_node
418 .send_request(
419 peer,
420 REPLICATION_PROTOCOL_ID,
421 encoded,
422 config.verification_request_timeout,
423 )
424 .await
425 {
426 Ok(resp) => resp,
427 Err(e) => {
428 debug!("Sync with {peer} failed: {e}");
429 return None;
430 }
431 };
432
433 match ReplicationMessage::decode(&response.data) {
434 Ok(decoded) => {
435 if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
436 Some(NeighborSyncOutcome {
437 response: resp,
438 sent_replica_hints,
439 })
440 } else {
441 warn!("Unexpected response type from {peer} during sync");
442 None
443 }
444 }
445 Err(e) => {
446 warn!("Failed to decode sync response from {peer}: {e}");
447 None
448 }
449 }
450}
451
452pub fn handle_sync_failure(
459 state: &mut NeighborSyncState,
460 failed_peer: &PeerId,
461 cooldown: Duration,
462) -> Option<PeerId> {
463 state.remove_peer(failed_peer);
465
466 let now = Instant::now();
469 select_next_sync_peer(state, now, cooldown)
470}
471
472pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
474 state.last_sync_times.insert(*peer, Instant::now());
475}
476
477pub async fn handle_sync_request(
485 sender: &PeerId,
486 request: &NeighborSyncRequest,
487 p2p_node: &Arc<P2PNode>,
488 storage: &Arc<LmdbStorage>,
489 paid_list: &Arc<PaidList>,
490 config: &ReplicationConfig,
491 is_bootstrapping: bool,
492) -> (NeighborSyncResponse, bool) {
493 let (response, _, sender_in_rt) = handle_sync_request_with_proofs(
494 sender,
495 request,
496 p2p_node,
497 storage,
498 paid_list,
499 config,
500 is_bootstrapping,
501 None,
502 )
503 .await;
504 (response, sender_in_rt)
505}
506
507#[allow(clippy::too_many_arguments)]
508pub(crate) async fn handle_sync_request_with_proofs(
509 sender: &PeerId,
510 _request: &NeighborSyncRequest,
511 p2p_node: &Arc<P2PNode>,
512 storage: &Arc<LmdbStorage>,
513 paid_list: &Arc<PaidList>,
514 config: &ReplicationConfig,
515 is_bootstrapping: bool,
516 my_commitment: Option<crate::replication::commitment::StorageCommitment>,
517) -> (NeighborSyncResponse, Vec<SentReplicaHint>, bool) {
518 let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
519
520 let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
522 sender,
523 storage,
524 p2p_node,
525 config.close_group_size,
526 )
527 .await;
528 let replica_hints = sent_replica_hints
529 .iter()
530 .map(|hint| hint.key)
531 .collect::<Vec<_>>();
532 let paid_hints = build_paid_hints_for_peer(
533 sender,
534 paid_list,
535 p2p_node,
536 config.paid_list_close_group_size,
537 )
538 .await;
539
540 let response = NeighborSyncResponse {
541 replica_hints,
542 paid_hints,
543 bootstrapping: is_bootstrapping,
544 rejected_keys: Vec::new(),
545 commitment: my_commitment,
546 };
547
548 (response, sent_replica_hints, sender_in_rt)
550}
551
552#[cfg(test)]
557#[allow(clippy::unwrap_used, clippy::expect_used)]
558mod tests {
559 use super::*;
560 use crate::replication::types::PeerSyncRecord;
561 use std::collections::HashMap;
562
563 fn peer_id_from_byte(b: u8) -> PeerId {
565 let mut bytes = [0u8; 32];
566 bytes[0] = b;
567 PeerId::from_bytes(bytes)
568 }
569
570 #[test]
573 fn select_sync_batch_returns_up_to_peer_count() {
574 let peers = vec![
575 peer_id_from_byte(1),
576 peer_id_from_byte(2),
577 peer_id_from_byte(3),
578 peer_id_from_byte(4),
579 peer_id_from_byte(5),
580 ];
581 let mut state = NeighborSyncState::new_cycle(peers);
582 let batch_size = 3;
583
584 let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
585
586 assert_eq!(batch.len(), batch_size);
587 assert_eq!(batch[0], peer_id_from_byte(1));
588 assert_eq!(batch[1], peer_id_from_byte(2));
589 assert_eq!(batch[2], peer_id_from_byte(3));
590 assert_eq!(state.cursor, 3);
591 }
592
593 #[test]
594 fn select_sync_batch_skips_cooldown_peers() {
595 let peers = vec![
596 peer_id_from_byte(1),
597 peer_id_from_byte(2),
598 peer_id_from_byte(3),
599 peer_id_from_byte(4),
600 ];
601 let mut state = NeighborSyncState::new_cycle(peers);
602
603 state
605 .last_sync_times
606 .insert(peer_id_from_byte(1), Instant::now());
607 state
608 .last_sync_times
609 .insert(peer_id_from_byte(3), Instant::now());
610
611 let cooldown = Duration::from_secs(3600); let batch = select_sync_batch(&mut state, 2, cooldown);
613
614 assert_eq!(batch.len(), 2);
616 assert_eq!(batch[0], peer_id_from_byte(2));
617 assert_eq!(batch[1], peer_id_from_byte(4));
618
619 assert!(!state.order.contains(&peer_id_from_byte(1)));
621 assert!(!state.order.contains(&peer_id_from_byte(3)));
622 }
623
624 #[test]
625 fn select_sync_batch_expired_cooldown_not_skipped() {
626 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
627 let mut state = NeighborSyncState::new_cycle(peers);
628
629 state.last_sync_times.insert(
634 peer_id_from_byte(1),
635 Instant::now()
636 .checked_sub(Duration::from_secs(2))
637 .unwrap_or_else(Instant::now),
638 );
639
640 let cooldown = Duration::from_secs(1);
641 let batch = select_sync_batch(&mut state, 2, cooldown);
642
643 assert_eq!(batch.len(), 2);
645 assert_eq!(batch[0], peer_id_from_byte(1));
646 assert_eq!(batch[1], peer_id_from_byte(2));
647 }
648
649 #[test]
650 fn select_sync_batch_empty_order() {
651 let mut state = NeighborSyncState::new_cycle(vec![]);
652
653 let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
654
655 assert!(batch.is_empty());
656 assert_eq!(state.cursor, 0);
657 }
658
659 #[test]
660 fn select_sync_batch_all_on_cooldown() {
661 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
662 let mut state = NeighborSyncState::new_cycle(peers);
663
664 state
665 .last_sync_times
666 .insert(peer_id_from_byte(1), Instant::now());
667 state
668 .last_sync_times
669 .insert(peer_id_from_byte(2), Instant::now());
670
671 let cooldown = Duration::from_secs(3600);
672 let batch = select_sync_batch(&mut state, 4, cooldown);
673
674 assert!(batch.is_empty());
675 assert!(state.order.is_empty());
676 }
677
678 #[test]
681 fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
682 let peers = vec![
683 peer_id_from_byte(1),
684 peer_id_from_byte(2),
685 peer_id_from_byte(3),
686 peer_id_from_byte(4),
687 ];
688 let mut state = NeighborSyncState::new_cycle(peers);
689 state.cursor = 2;
691
692 let replacement =
694 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
695
696 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(2)));
699
700 assert!(replacement.is_some());
704 }
705
706 #[test]
707 fn handle_sync_failure_removes_peer_after_cursor() {
708 let peers = vec![
709 peer_id_from_byte(1),
710 peer_id_from_byte(2),
711 peer_id_from_byte(3),
712 peer_id_from_byte(4),
713 ];
714 let mut state = NeighborSyncState::new_cycle(peers);
715 state.cursor = 1;
716
717 let replacement =
719 handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
720
721 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(3)));
724
725 assert_eq!(replacement, Some(peer_id_from_byte(2)));
727 }
728
729 #[test]
730 fn handle_sync_failure_no_replacement_when_exhausted() {
731 let peers = vec![peer_id_from_byte(1)];
732 let mut state = NeighborSyncState::new_cycle(peers);
733 state.cursor = 1; let replacement =
736 handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
737
738 assert!(state.order.is_empty());
739 assert!(replacement.is_none());
740 }
741
742 #[test]
743 fn handle_sync_failure_unknown_peer_is_noop() {
744 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
745 let mut state = NeighborSyncState::new_cycle(peers);
746 state.cursor = 1;
747
748 let replacement =
749 handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
750
751 assert_eq!(state.order.len(), 2);
753 assert_eq!(replacement, Some(peer_id_from_byte(2)));
755 assert_eq!(state.cursor, 2);
756 }
757
758 #[test]
761 fn record_successful_sync_updates_last_sync_time() {
762 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
763 let mut state = NeighborSyncState::new_cycle(peers);
764 let peer = peer_id_from_byte(1);
765
766 assert!(!state.last_sync_times.contains_key(&peer));
767
768 let before = Instant::now();
769 record_successful_sync(&mut state, &peer);
770 let after = Instant::now();
771
772 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
773 assert!(*ts >= before);
774 assert!(*ts <= after);
775 }
776
777 #[test]
778 fn record_successful_sync_overwrites_previous() {
779 let peers = vec![peer_id_from_byte(1)];
780 let mut state = NeighborSyncState::new_cycle(peers);
781 let peer = peer_id_from_byte(1);
782
783 let old_time = Instant::now()
786 .checked_sub(Duration::from_secs(2))
787 .unwrap_or_else(Instant::now);
788 state.last_sync_times.insert(peer, old_time);
789
790 record_successful_sync(&mut state, &peer);
791
792 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
793 assert!(*ts > old_time, "sync time should be updated");
794 }
795
796 #[test]
799 fn scenario_35_round_robin_with_cooldown_skip() {
800 let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
807 let mut state = NeighborSyncState::new_cycle(peers);
808 let batch_size = 4;
809 let cooldown = Duration::from_secs(3600);
810
811 state
813 .last_sync_times
814 .insert(peer_id_from_byte(2), Instant::now());
815 state
816 .last_sync_times
817 .insert(peer_id_from_byte(4), Instant::now());
818
819 let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
822 assert_eq!(batch1.len(), 4);
823 assert_eq!(batch1[0], peer_id_from_byte(1));
824 assert_eq!(batch1[1], peer_id_from_byte(3));
825 assert_eq!(batch1[2], peer_id_from_byte(5));
826 assert_eq!(batch1[3], peer_id_from_byte(6));
827
828 assert!(!state.order.contains(&peer_id_from_byte(2)));
830 assert!(!state.order.contains(&peer_id_from_byte(4)));
831
832 let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
834 assert_eq!(batch2.len(), 2);
835 assert_eq!(batch2[0], peer_id_from_byte(7));
836 assert_eq!(batch2[1], peer_id_from_byte(8));
837
838 assert!(state.is_cycle_complete());
840 }
841
842 #[test]
843 fn cycle_complete_when_cursor_past_order() {
844 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
846 let mut state = NeighborSyncState::new_cycle(peers);
847
848 assert!(!state.is_cycle_complete());
850
851 state.cursor = 3;
853 assert!(state.is_cycle_complete());
854
855 state.cursor = 10;
857 assert!(state.is_cycle_complete());
858
859 state.order.clear();
861 state.cursor = 0;
862 assert!(state.is_cycle_complete());
863 }
864
865 #[test]
882 fn scenario_36_post_cycle_triggers_combined_prune_pass() {
883 let config = ReplicationConfig::default();
884
885 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
887 let mut state = NeighborSyncState::new_cycle(peers);
888 let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
889 assert!(
890 state.is_cycle_complete(),
891 "cycle must be complete before prune pass triggers"
892 );
893
894 assert!(
896 !config.prune_hysteresis_duration.is_zero(),
897 "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
898 );
899
900 let record_key: [u8; 32] = [0x36; 32];
909 let paid_key: [u8; 32] = [0x37; 32];
910
911 let record_first_seen = Instant::now();
913 let paid_first_seen = Instant::now();
914
915 let record_elapsed = record_first_seen.elapsed();
917 let paid_elapsed = paid_first_seen.elapsed();
918 assert!(
919 record_elapsed < config.prune_hysteresis_duration,
920 "record key should be retained within hysteresis window"
921 );
922 assert!(
923 paid_elapsed < config.prune_hysteresis_duration,
924 "paid key should be retained within hysteresis window"
925 );
926
927 assert_ne!(
930 record_key, paid_key,
931 "record and paid pruning keys must be independent"
932 );
933
934 let new_state = NeighborSyncState::new_cycle(vec![
936 peer_id_from_byte(1),
937 peer_id_from_byte(2),
938 peer_id_from_byte(3),
939 ]);
940 assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
941 assert!(
942 !new_state.is_cycle_complete(),
943 "new cycle should not be immediately complete"
944 );
945 }
946
947 #[test]
948 fn scenario_38_mid_cycle_peer_join_prioritized() {
949 let peers = vec![
952 peer_id_from_byte(0xA),
953 peer_id_from_byte(0xB),
954 peer_id_from_byte(0xC),
955 ];
956 let mut state = NeighborSyncState::new_cycle(peers);
957
958 let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
960 assert_eq!(state.cursor, 1);
961
962 let peer_d = peer_id_from_byte(0xD);
965 assert_eq!(state.queue_priority_peers([peer_d]), 1);
966 assert!(!state.order.contains(&peer_d));
967 assert!(
968 !state.is_cycle_complete(),
969 "pending priority sync keeps the cycle active"
970 );
971
972 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
973 assert_eq!(batch, vec![peer_d, peer_id_from_byte(0xB)]);
974 }
975
976 #[test]
977 fn scenario_39_unreachable_peer_removed_slot_filled() {
978 let peers = vec![
981 peer_id_from_byte(1),
982 peer_id_from_byte(2),
983 peer_id_from_byte(3),
984 peer_id_from_byte(4),
985 peer_id_from_byte(5),
986 ];
987 let mut state = NeighborSyncState::new_cycle(peers);
988
989 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
991 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
992
993 let replacement =
995 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
996 assert!(!state.order.contains(&peer_id_from_byte(2)));
997
998 assert_eq!(
1000 replacement,
1001 Some(peer_id_from_byte(3)),
1002 "vacated slot should be filled by next peer in order"
1003 );
1004
1005 let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1007 assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
1008 assert!(state.is_cycle_complete());
1009 }
1010
1011 #[test]
1012 fn scenario_40_cooldown_peer_removed_from_snapshot() {
1013 let peers = vec![
1017 peer_id_from_byte(1),
1018 peer_id_from_byte(2),
1019 peer_id_from_byte(3),
1020 ];
1021 let mut state = NeighborSyncState::new_cycle(peers);
1022 let cooldown = Duration::from_secs(3600);
1023
1024 state
1026 .last_sync_times
1027 .insert(peer_id_from_byte(2), Instant::now());
1028
1029 let batch = select_sync_batch(&mut state, 3, cooldown);
1030
1031 assert!(!state.order.contains(&peer_id_from_byte(2)));
1033 assert_eq!(state.order.len(), 2, "order should shrink by 1");
1034
1035 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
1037
1038 assert!(state.is_cycle_complete());
1040 }
1041
1042 #[test]
1043 fn priority_peer_in_snapshot_is_not_selected_twice() {
1044 let peers = vec![
1045 peer_id_from_byte(1),
1046 peer_id_from_byte(2),
1047 peer_id_from_byte(3),
1048 ];
1049 let mut state = NeighborSyncState::new_cycle(peers);
1050 assert_eq!(state.queue_priority_peers([peer_id_from_byte(2)]), 1);
1051
1052 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1053
1054 assert_eq!(batch, vec![peer_id_from_byte(2), peer_id_from_byte(1)]);
1055 assert_eq!(
1056 state.order,
1057 vec![peer_id_from_byte(1), peer_id_from_byte(3)]
1058 );
1059 assert_eq!(state.cursor, 1);
1060 }
1061
1062 #[test]
1063 fn priority_peer_on_cooldown_is_skipped_and_removed_from_snapshot() {
1064 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1065 let mut state = NeighborSyncState::new_cycle(peers);
1066 let cooldown = Duration::from_secs(1);
1067 let priority_peer = peer_id_from_byte(2);
1068 state.last_sync_times.insert(priority_peer, Instant::now());
1069 assert_eq!(state.queue_priority_peers([priority_peer]), 1);
1070
1071 let batch = select_sync_batch(&mut state, 2, cooldown);
1072
1073 assert_eq!(batch, vec![peer_id_from_byte(1)]);
1074 assert!(state.priority_order.is_empty());
1075 assert!(!state.order.contains(&priority_peer));
1076 }
1077
1078 #[test]
1079 fn failure_replacement_prefers_remaining_priority_peer() {
1080 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1081 let mut state = NeighborSyncState::new_cycle(peers);
1082 let first_priority_peer = peer_id_from_byte(3);
1083 let second_priority_peer = peer_id_from_byte(4);
1084 assert_eq!(
1085 state.queue_priority_peers([first_priority_peer, second_priority_peer]),
1086 2
1087 );
1088
1089 let batch = select_sync_batch(&mut state, 1, Duration::from_secs(0));
1090 assert_eq!(batch, vec![first_priority_peer]);
1091
1092 let replacement =
1093 handle_sync_failure(&mut state, &first_priority_peer, Duration::from_secs(0));
1094
1095 assert_eq!(replacement, Some(second_priority_peer));
1096 assert!(state.priority_order.is_empty());
1097 assert_eq!(state.cursor, 0);
1098 }
1099
1100 #[test]
1101 fn scenario_41_cycle_always_terminates() {
1102 let peer_count: u8 = 10;
1106 let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
1107 let mut state = NeighborSyncState::new_cycle(peers);
1108 let cooldown = Duration::from_secs(3600);
1109
1110 for i in 1..=peer_count {
1112 state
1113 .last_sync_times
1114 .insert(peer_id_from_byte(i), Instant::now());
1115 }
1116
1117 let batch = select_sync_batch(&mut state, 4, cooldown);
1118
1119 assert!(
1120 batch.is_empty(),
1121 "all peers on cooldown — batch must be empty"
1122 );
1123 assert!(state.order.is_empty(), "all peers should have been removed");
1124 assert!(
1125 state.is_cycle_complete(),
1126 "cycle must terminate when all peers are removed"
1127 );
1128 }
1129
1130 #[test]
1131 fn consecutive_rounds_advance_through_full_cycle() {
1132 let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
1136 let mut state = NeighborSyncState::new_cycle(peers);
1137 let batch_size = 2;
1138 let no_cooldown = Duration::from_secs(0);
1139
1140 let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
1141 assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
1142 assert_eq!(state.cursor, 2);
1143 assert!(!state.is_cycle_complete());
1144
1145 let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
1146 assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
1147 assert_eq!(state.cursor, 4);
1148 assert!(!state.is_cycle_complete());
1149
1150 let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
1151 assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
1152 assert_eq!(state.cursor, 6);
1153 assert!(state.is_cycle_complete());
1154
1155 let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
1157 assert!(round4.is_empty());
1158 }
1159
1160 #[test]
1181 fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
1182 let sender = peer_id_from_byte(0x37);
1183
1184 let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
1187 let outbound_paid_hints = vec![[0x03; 32]];
1188 let response = NeighborSyncResponse {
1189 replica_hints: outbound_replica_hints.clone(),
1190 paid_hints: outbound_paid_hints.clone(),
1191 bootstrapping: false,
1192 rejected_keys: Vec::new(),
1193 commitment: None,
1194 };
1195
1196 let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
1198 let inbound_paid_hints = vec![[0xB0; 32]];
1199
1200 let sender_in_rt = false;
1202 let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
1203
1204 assert_eq!(
1206 response.replica_hints, outbound_replica_hints,
1207 "outbound replica hints must be sent even when sender is not in LocalRT"
1208 );
1209 assert_eq!(
1210 response.paid_hints, outbound_paid_hints,
1211 "outbound paid hints must be sent even when sender is not in LocalRT"
1212 );
1213
1214 if !sender_in_rt {
1216 let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
1219 let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
1220
1221 for key in &inbound_replica_hints {
1222 assert!(
1223 !admitted_replica_keys.contains(key),
1224 "inbound replica hints must NOT be admitted from non-RT sender"
1225 );
1226 }
1227 for key in &inbound_paid_hints {
1228 assert!(
1229 !admitted_paid_keys.contains(key),
1230 "inbound paid hints must NOT be admitted from non-RT sender"
1231 );
1232 }
1233
1234 assert!(
1236 !sync_history.contains_key(&sender),
1237 "sync history must NOT be updated for non-LocalRT sender"
1238 );
1239 }
1240
1241 let sender_in_rt = true;
1243 assert!(
1244 sender_in_rt,
1245 "when sender is in LocalRT, inbound hints are processed"
1246 );
1247
1248 sync_history.insert(
1250 sender,
1251 PeerSyncRecord {
1252 last_sync: Some(Instant::now()),
1253 cycles_since_sync: 0,
1254 },
1255 );
1256 assert!(
1257 sync_history.contains_key(&sender),
1258 "sync history should be updated for LocalRT sender"
1259 );
1260 assert!(
1261 sync_history
1262 .get(&sender)
1263 .expect("sender in history")
1264 .last_sync
1265 .is_some(),
1266 "last_sync should be recorded for RT sender"
1267 );
1268 }
1269
1270 #[test]
1271 fn cycle_completion_resets_cursor_but_keeps_sync_times() {
1272 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1275 let mut state = NeighborSyncState::new_cycle(peers);
1276
1277 let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1279 record_successful_sync(&mut state, &peer_id_from_byte(1));
1280 record_successful_sync(&mut state, &peer_id_from_byte(2));
1281 assert!(state.is_cycle_complete());
1282
1283 let old_sync_times = state.last_sync_times.clone();
1285 assert_eq!(old_sync_times.len(), 2);
1286
1287 let new_peers = vec![
1290 peer_id_from_byte(1),
1291 peer_id_from_byte(2),
1292 peer_id_from_byte(3),
1293 ];
1294 let mut new_state = NeighborSyncState::new_cycle(new_peers);
1295 new_state.last_sync_times = old_sync_times;
1296
1297 assert_eq!(new_state.cursor, 0);
1299 assert!(!new_state.is_cycle_complete());
1300
1301 assert_eq!(new_state.last_sync_times.len(), 2);
1303 assert!(new_state
1304 .last_sync_times
1305 .contains_key(&peer_id_from_byte(1)));
1306 assert!(new_state
1307 .last_sync_times
1308 .contains_key(&peer_id_from_byte(2)));
1309
1310 let cooldown = Duration::from_secs(3600);
1313 let batch = select_sync_batch(&mut new_state, 3, cooldown);
1314 assert_eq!(
1315 batch,
1316 std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1317 "only the new peer should be selected; old peers are on cooldown"
1318 );
1319 }
1320}