1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::logging::{debug, info, warn};
11use rand::Rng;
12use saorsa_core::identity::PeerId;
13use saorsa_core::P2PNode;
14
15use crate::ant_protocol::XorName;
16use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
17use crate::replication::paid_list::PaidList;
18use crate::replication::protocol::{
19 NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
20};
21use crate::replication::types::NeighborSyncState;
22use crate::storage::LmdbStorage;
23
24const HINT_BUILD_SLOW_LOG_MS: u128 = 250;
26
27#[derive(Debug)]
30pub(crate) struct SentReplicaHint {
31 pub(crate) key: XorName,
33 pub(crate) close_peers: HashSet<PeerId>,
35}
36
37#[derive(Debug)]
39pub(crate) struct NeighborSyncOutcome {
40 pub(crate) response: NeighborSyncResponse,
42 pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
44}
45
46#[derive(Debug, Default)]
48pub(crate) struct PeerSyncHints {
49 pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
52 paid_hints: Vec<XorName>,
54}
55
56pub async fn build_replica_hints_for_peer(
66 peer: &PeerId,
67 storage: &Arc<LmdbStorage>,
68 p2p_node: &Arc<P2PNode>,
69 close_group_size: usize,
70) -> Vec<XorName> {
71 build_replica_hints_for_peer_with_close_groups(peer, storage, p2p_node, close_group_size)
72 .await
73 .into_iter()
74 .map(|hint| hint.key)
75 .collect()
76}
77
78pub(crate) async fn build_replica_hints_for_peer_with_close_groups(
79 peer: &PeerId,
80 storage: &Arc<LmdbStorage>,
81 p2p_node: &Arc<P2PNode>,
82 close_group_size: usize,
83) -> Vec<SentReplicaHint> {
84 let all_keys = match storage.all_keys().await {
85 Ok(keys) => keys,
86 Err(e) => {
87 warn!("Failed to read stored keys for hint construction: {e}");
88 return Vec::new();
89 }
90 };
91
92 let dht = p2p_node.dht_manager();
93 let mut hints = Vec::new();
94 for key in all_keys {
95 let closest = dht
96 .find_closest_nodes_local_with_self(&key, close_group_size)
97 .await;
98 let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
99 if close_peers.contains(peer) {
100 hints.push(SentReplicaHint { key, close_peers });
101 }
102 }
103 hints
104}
105
106pub(crate) async fn build_sync_hints_for_peers(
109 peers: &[PeerId],
110 storage: &Arc<LmdbStorage>,
111 paid_list: &Arc<PaidList>,
112 p2p_node: &Arc<P2PNode>,
113 close_group_size: usize,
114 paid_list_close_group_size: usize,
115) -> HashMap<PeerId, PeerSyncHints> {
116 let started = Instant::now();
117 let target_peers = peers.iter().copied().collect::<HashSet<_>>();
118 let mut hints_by_peer = peers
119 .iter()
120 .copied()
121 .map(|peer| (peer, PeerSyncHints::default()))
122 .collect::<HashMap<_, _>>();
123
124 if peers.is_empty() {
125 return hints_by_peer;
126 }
127
128 let all_keys = match storage.all_keys().await {
129 Ok(keys) => keys,
130 Err(e) => {
131 warn!("Failed to read stored keys for batch hint construction: {e}");
132 Vec::new()
133 }
134 };
135 let stored_key_count = all_keys.len();
136
137 let dht = p2p_node.dht_manager();
138 for key in all_keys {
139 let closest = dht
140 .find_closest_nodes_local_with_self(&key, close_group_size)
141 .await;
142 let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
143 for peer in close_peers.intersection(&target_peers) {
144 if let Some(peer_hints) = hints_by_peer.get_mut(peer) {
145 peer_hints.sent_replica_hints.push(SentReplicaHint {
146 key,
147 close_peers: close_peers.clone(),
148 });
149 }
150 }
151 }
152
153 let all_paid_keys = match paid_list.all_keys() {
154 Ok(keys) => keys,
155 Err(e) => {
156 warn!("Failed to read PaidForList for batch hint construction: {e}");
157 Vec::new()
158 }
159 };
160 let paid_key_count = all_paid_keys.len();
161
162 for key in all_paid_keys {
163 let closest = dht
164 .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
165 .await;
166 for node in closest {
167 if target_peers.contains(&node.peer_id) {
168 if let Some(peer_hints) = hints_by_peer.get_mut(&node.peer_id) {
169 peer_hints.paid_hints.push(key);
170 }
171 }
172 }
173 }
174
175 let replica_hint_count = hints_by_peer
176 .values()
177 .map(|hints| hints.sent_replica_hints.len())
178 .sum::<usize>();
179 let paid_hint_count = hints_by_peer
180 .values()
181 .map(|hints| hints.paid_hints.len())
182 .sum::<usize>();
183 let elapsed_ms = started.elapsed().as_millis();
184
185 if elapsed_ms >= HINT_BUILD_SLOW_LOG_MS {
186 info!(
187 target: "ant_node::replication::neighbor_sync",
188 "Slow neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
189 peers.len(),
190 stored_key_count,
191 paid_key_count,
192 replica_hint_count,
193 paid_hint_count,
194 );
195 } else {
196 debug!(
197 target: "ant_node::replication::neighbor_sync",
198 "Neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
199 peers.len(),
200 stored_key_count,
201 paid_key_count,
202 replica_hint_count,
203 paid_hint_count,
204 );
205 }
206
207 hints_by_peer
208}
209
210pub async fn build_paid_hints_for_peer(
215 peer: &PeerId,
216 paid_list: &Arc<PaidList>,
217 p2p_node: &Arc<P2PNode>,
218 paid_list_close_group_size: usize,
219) -> Vec<XorName> {
220 let all_paid_keys = match paid_list.all_keys() {
221 Ok(keys) => keys,
222 Err(e) => {
223 warn!("Failed to read PaidForList for hint construction: {e}");
224 return Vec::new();
225 }
226 };
227
228 let dht = p2p_node.dht_manager();
229 let mut hints = Vec::new();
230 for key in all_paid_keys {
231 let closest = dht
232 .find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
233 .await;
234 if closest.iter().any(|n| n.peer_id == *peer) {
235 hints.push(key);
236 }
237 }
238 hints
239}
240
241pub async fn snapshot_close_neighbors(
246 p2p_node: &Arc<P2PNode>,
247 self_id: &PeerId,
248 scope: usize,
249) -> Vec<PeerId> {
250 let self_xor: XorName = *self_id.as_bytes();
251 let closest = p2p_node
252 .dht_manager()
253 .find_closest_nodes_local(&self_xor, scope)
254 .await;
255 closest.iter().map(|n| n.peer_id).collect()
256}
257
258pub fn select_sync_batch(
264 state: &mut NeighborSyncState,
265 peer_count: usize,
266 cooldown: Duration,
267) -> Vec<PeerId> {
268 let mut batch = Vec::new();
269 let now = Instant::now();
270
271 while batch.len() < peer_count {
272 let Some(peer) = select_next_sync_peer(state, now, cooldown) else {
273 break;
274 };
275 batch.push(peer);
276 }
277
278 batch
279}
280
281fn select_next_sync_peer(
282 state: &mut NeighborSyncState,
283 now: Instant,
284 cooldown: Duration,
285) -> Option<PeerId> {
286 while let Some(peer) = state.priority_order.pop_front() {
287 if peer_on_cooldown(state, &peer, now, cooldown) {
288 state.remove_peer(&peer);
289 continue;
290 }
291
292 state.remove_peer(&peer);
293 return Some(peer);
294 }
295
296 while state.cursor < state.order.len() {
297 let peer = state.order[state.cursor];
298
299 if peer_on_cooldown(state, &peer, now, cooldown) {
303 state.order.remove(state.cursor);
304 continue;
305 }
306
307 state.cursor += 1;
308 return Some(peer);
309 }
310
311 None
312}
313
314fn peer_on_cooldown(
315 state: &NeighborSyncState,
316 peer: &PeerId,
317 now: Instant,
318 cooldown: Duration,
319) -> bool {
320 state
321 .last_sync_times
322 .get(peer)
323 .is_some_and(|last_sync| now.duration_since(*last_sync) < cooldown)
324}
325
326pub async fn sync_with_peer(
331 peer: &PeerId,
332 p2p_node: &Arc<P2PNode>,
333 storage: &Arc<LmdbStorage>,
334 paid_list: &Arc<PaidList>,
335 config: &ReplicationConfig,
336 is_bootstrapping: bool,
337) -> Option<NeighborSyncResponse> {
338 sync_with_peer_with_outcome(peer, p2p_node, storage, paid_list, config, is_bootstrapping)
339 .await
340 .map(|outcome| outcome.response)
341}
342
343pub(crate) async fn sync_with_peer_with_outcome(
344 peer: &PeerId,
345 p2p_node: &Arc<P2PNode>,
346 storage: &Arc<LmdbStorage>,
347 paid_list: &Arc<PaidList>,
348 config: &ReplicationConfig,
349 is_bootstrapping: bool,
350) -> Option<NeighborSyncOutcome> {
351 let mut hints_by_peer = build_sync_hints_for_peers(
353 std::slice::from_ref(peer),
354 storage,
355 paid_list,
356 p2p_node,
357 config.close_group_size,
358 config.paid_list_close_group_size,
359 )
360 .await;
361 let hints = hints_by_peer.remove(peer).unwrap_or_default();
362 sync_with_peer_with_hints(peer, p2p_node, config, is_bootstrapping, hints).await
363}
364
365pub(crate) async fn sync_with_peer_with_hints(
366 peer: &PeerId,
367 p2p_node: &Arc<P2PNode>,
368 config: &ReplicationConfig,
369 is_bootstrapping: bool,
370 hints: PeerSyncHints,
371) -> Option<NeighborSyncOutcome> {
372 let replica_hints = hints
373 .sent_replica_hints
374 .iter()
375 .map(|hint| hint.key)
376 .collect::<Vec<_>>();
377 let sent_replica_hints = hints.sent_replica_hints;
378
379 let request = NeighborSyncRequest {
380 replica_hints,
381 paid_hints: hints.paid_hints,
382 bootstrapping: is_bootstrapping,
383 };
384 let request_id = rand::thread_rng().gen::<u64>();
385 let msg = ReplicationMessage {
386 request_id,
387 body: ReplicationMessageBody::NeighborSyncRequest(request),
388 };
389
390 let encoded = match msg.encode() {
391 Ok(data) => data,
392 Err(e) => {
393 warn!("Failed to encode sync request for {peer}: {e}");
394 return None;
395 }
396 };
397
398 let response = match p2p_node
399 .send_request(
400 peer,
401 REPLICATION_PROTOCOL_ID,
402 encoded,
403 config.verification_request_timeout,
404 )
405 .await
406 {
407 Ok(resp) => resp,
408 Err(e) => {
409 debug!("Sync with {peer} failed: {e}");
410 return None;
411 }
412 };
413
414 match ReplicationMessage::decode(&response.data) {
415 Ok(decoded) => {
416 if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
417 Some(NeighborSyncOutcome {
418 response: resp,
419 sent_replica_hints,
420 })
421 } else {
422 warn!("Unexpected response type from {peer} during sync");
423 None
424 }
425 }
426 Err(e) => {
427 warn!("Failed to decode sync response from {peer}: {e}");
428 None
429 }
430 }
431}
432
433pub fn handle_sync_failure(
440 state: &mut NeighborSyncState,
441 failed_peer: &PeerId,
442 cooldown: Duration,
443) -> Option<PeerId> {
444 state.remove_peer(failed_peer);
446
447 let now = Instant::now();
450 select_next_sync_peer(state, now, cooldown)
451}
452
453pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
455 state.last_sync_times.insert(*peer, Instant::now());
456}
457
458pub async fn handle_sync_request(
466 sender: &PeerId,
467 request: &NeighborSyncRequest,
468 p2p_node: &Arc<P2PNode>,
469 storage: &Arc<LmdbStorage>,
470 paid_list: &Arc<PaidList>,
471 config: &ReplicationConfig,
472 is_bootstrapping: bool,
473) -> (NeighborSyncResponse, bool) {
474 let (response, _, sender_in_rt) = handle_sync_request_with_proofs(
475 sender,
476 request,
477 p2p_node,
478 storage,
479 paid_list,
480 config,
481 is_bootstrapping,
482 )
483 .await;
484 (response, sender_in_rt)
485}
486
487pub(crate) async fn handle_sync_request_with_proofs(
488 sender: &PeerId,
489 _request: &NeighborSyncRequest,
490 p2p_node: &Arc<P2PNode>,
491 storage: &Arc<LmdbStorage>,
492 paid_list: &Arc<PaidList>,
493 config: &ReplicationConfig,
494 is_bootstrapping: bool,
495) -> (NeighborSyncResponse, Vec<SentReplicaHint>, bool) {
496 let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
497
498 let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
500 sender,
501 storage,
502 p2p_node,
503 config.close_group_size,
504 )
505 .await;
506 let replica_hints = sent_replica_hints
507 .iter()
508 .map(|hint| hint.key)
509 .collect::<Vec<_>>();
510 let paid_hints = build_paid_hints_for_peer(
511 sender,
512 paid_list,
513 p2p_node,
514 config.paid_list_close_group_size,
515 )
516 .await;
517
518 let response = NeighborSyncResponse {
519 replica_hints,
520 paid_hints,
521 bootstrapping: is_bootstrapping,
522 rejected_keys: Vec::new(),
523 };
524
525 (response, sent_replica_hints, sender_in_rt)
527}
528
529#[cfg(test)]
534#[allow(clippy::unwrap_used, clippy::expect_used)]
535mod tests {
536 use super::*;
537 use crate::replication::types::PeerSyncRecord;
538 use std::collections::HashMap;
539
540 fn peer_id_from_byte(b: u8) -> PeerId {
542 let mut bytes = [0u8; 32];
543 bytes[0] = b;
544 PeerId::from_bytes(bytes)
545 }
546
547 #[test]
550 fn select_sync_batch_returns_up_to_peer_count() {
551 let peers = vec![
552 peer_id_from_byte(1),
553 peer_id_from_byte(2),
554 peer_id_from_byte(3),
555 peer_id_from_byte(4),
556 peer_id_from_byte(5),
557 ];
558 let mut state = NeighborSyncState::new_cycle(peers);
559 let batch_size = 3;
560
561 let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
562
563 assert_eq!(batch.len(), batch_size);
564 assert_eq!(batch[0], peer_id_from_byte(1));
565 assert_eq!(batch[1], peer_id_from_byte(2));
566 assert_eq!(batch[2], peer_id_from_byte(3));
567 assert_eq!(state.cursor, 3);
568 }
569
570 #[test]
571 fn select_sync_batch_skips_cooldown_peers() {
572 let peers = vec![
573 peer_id_from_byte(1),
574 peer_id_from_byte(2),
575 peer_id_from_byte(3),
576 peer_id_from_byte(4),
577 ];
578 let mut state = NeighborSyncState::new_cycle(peers);
579
580 state
582 .last_sync_times
583 .insert(peer_id_from_byte(1), Instant::now());
584 state
585 .last_sync_times
586 .insert(peer_id_from_byte(3), Instant::now());
587
588 let cooldown = Duration::from_secs(3600); let batch = select_sync_batch(&mut state, 2, cooldown);
590
591 assert_eq!(batch.len(), 2);
593 assert_eq!(batch[0], peer_id_from_byte(2));
594 assert_eq!(batch[1], peer_id_from_byte(4));
595
596 assert!(!state.order.contains(&peer_id_from_byte(1)));
598 assert!(!state.order.contains(&peer_id_from_byte(3)));
599 }
600
601 #[test]
602 fn select_sync_batch_expired_cooldown_not_skipped() {
603 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
604 let mut state = NeighborSyncState::new_cycle(peers);
605
606 state.last_sync_times.insert(
611 peer_id_from_byte(1),
612 Instant::now()
613 .checked_sub(Duration::from_secs(2))
614 .unwrap_or_else(Instant::now),
615 );
616
617 let cooldown = Duration::from_secs(1);
618 let batch = select_sync_batch(&mut state, 2, cooldown);
619
620 assert_eq!(batch.len(), 2);
622 assert_eq!(batch[0], peer_id_from_byte(1));
623 assert_eq!(batch[1], peer_id_from_byte(2));
624 }
625
626 #[test]
627 fn select_sync_batch_empty_order() {
628 let mut state = NeighborSyncState::new_cycle(vec![]);
629
630 let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
631
632 assert!(batch.is_empty());
633 assert_eq!(state.cursor, 0);
634 }
635
636 #[test]
637 fn select_sync_batch_all_on_cooldown() {
638 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
639 let mut state = NeighborSyncState::new_cycle(peers);
640
641 state
642 .last_sync_times
643 .insert(peer_id_from_byte(1), Instant::now());
644 state
645 .last_sync_times
646 .insert(peer_id_from_byte(2), Instant::now());
647
648 let cooldown = Duration::from_secs(3600);
649 let batch = select_sync_batch(&mut state, 4, cooldown);
650
651 assert!(batch.is_empty());
652 assert!(state.order.is_empty());
653 }
654
655 #[test]
658 fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
659 let peers = vec![
660 peer_id_from_byte(1),
661 peer_id_from_byte(2),
662 peer_id_from_byte(3),
663 peer_id_from_byte(4),
664 ];
665 let mut state = NeighborSyncState::new_cycle(peers);
666 state.cursor = 2;
668
669 let replacement =
671 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
672
673 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(2)));
676
677 assert!(replacement.is_some());
681 }
682
683 #[test]
684 fn handle_sync_failure_removes_peer_after_cursor() {
685 let peers = vec![
686 peer_id_from_byte(1),
687 peer_id_from_byte(2),
688 peer_id_from_byte(3),
689 peer_id_from_byte(4),
690 ];
691 let mut state = NeighborSyncState::new_cycle(peers);
692 state.cursor = 1;
693
694 let replacement =
696 handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
697
698 assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(3)));
701
702 assert_eq!(replacement, Some(peer_id_from_byte(2)));
704 }
705
706 #[test]
707 fn handle_sync_failure_no_replacement_when_exhausted() {
708 let peers = vec![peer_id_from_byte(1)];
709 let mut state = NeighborSyncState::new_cycle(peers);
710 state.cursor = 1; let replacement =
713 handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
714
715 assert!(state.order.is_empty());
716 assert!(replacement.is_none());
717 }
718
719 #[test]
720 fn handle_sync_failure_unknown_peer_is_noop() {
721 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
722 let mut state = NeighborSyncState::new_cycle(peers);
723 state.cursor = 1;
724
725 let replacement =
726 handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
727
728 assert_eq!(state.order.len(), 2);
730 assert_eq!(replacement, Some(peer_id_from_byte(2)));
732 assert_eq!(state.cursor, 2);
733 }
734
735 #[test]
738 fn record_successful_sync_updates_last_sync_time() {
739 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
740 let mut state = NeighborSyncState::new_cycle(peers);
741 let peer = peer_id_from_byte(1);
742
743 assert!(!state.last_sync_times.contains_key(&peer));
744
745 let before = Instant::now();
746 record_successful_sync(&mut state, &peer);
747 let after = Instant::now();
748
749 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
750 assert!(*ts >= before);
751 assert!(*ts <= after);
752 }
753
754 #[test]
755 fn record_successful_sync_overwrites_previous() {
756 let peers = vec![peer_id_from_byte(1)];
757 let mut state = NeighborSyncState::new_cycle(peers);
758 let peer = peer_id_from_byte(1);
759
760 let old_time = Instant::now()
763 .checked_sub(Duration::from_secs(2))
764 .unwrap_or_else(Instant::now);
765 state.last_sync_times.insert(peer, old_time);
766
767 record_successful_sync(&mut state, &peer);
768
769 let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
770 assert!(*ts > old_time, "sync time should be updated");
771 }
772
773 #[test]
776 fn scenario_35_round_robin_with_cooldown_skip() {
777 let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
784 let mut state = NeighborSyncState::new_cycle(peers);
785 let batch_size = 4;
786 let cooldown = Duration::from_secs(3600);
787
788 state
790 .last_sync_times
791 .insert(peer_id_from_byte(2), Instant::now());
792 state
793 .last_sync_times
794 .insert(peer_id_from_byte(4), Instant::now());
795
796 let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
799 assert_eq!(batch1.len(), 4);
800 assert_eq!(batch1[0], peer_id_from_byte(1));
801 assert_eq!(batch1[1], peer_id_from_byte(3));
802 assert_eq!(batch1[2], peer_id_from_byte(5));
803 assert_eq!(batch1[3], peer_id_from_byte(6));
804
805 assert!(!state.order.contains(&peer_id_from_byte(2)));
807 assert!(!state.order.contains(&peer_id_from_byte(4)));
808
809 let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
811 assert_eq!(batch2.len(), 2);
812 assert_eq!(batch2[0], peer_id_from_byte(7));
813 assert_eq!(batch2[1], peer_id_from_byte(8));
814
815 assert!(state.is_cycle_complete());
817 }
818
819 #[test]
820 fn cycle_complete_when_cursor_past_order() {
821 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
823 let mut state = NeighborSyncState::new_cycle(peers);
824
825 assert!(!state.is_cycle_complete());
827
828 state.cursor = 3;
830 assert!(state.is_cycle_complete());
831
832 state.cursor = 10;
834 assert!(state.is_cycle_complete());
835
836 state.order.clear();
838 state.cursor = 0;
839 assert!(state.is_cycle_complete());
840 }
841
842 #[test]
859 fn scenario_36_post_cycle_triggers_combined_prune_pass() {
860 let config = ReplicationConfig::default();
861
862 let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
864 let mut state = NeighborSyncState::new_cycle(peers);
865 let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
866 assert!(
867 state.is_cycle_complete(),
868 "cycle must be complete before prune pass triggers"
869 );
870
871 assert!(
873 !config.prune_hysteresis_duration.is_zero(),
874 "PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
875 );
876
877 let record_key: [u8; 32] = [0x36; 32];
886 let paid_key: [u8; 32] = [0x37; 32];
887
888 let record_first_seen = Instant::now();
890 let paid_first_seen = Instant::now();
891
892 let record_elapsed = record_first_seen.elapsed();
894 let paid_elapsed = paid_first_seen.elapsed();
895 assert!(
896 record_elapsed < config.prune_hysteresis_duration,
897 "record key should be retained within hysteresis window"
898 );
899 assert!(
900 paid_elapsed < config.prune_hysteresis_duration,
901 "paid key should be retained within hysteresis window"
902 );
903
904 assert_ne!(
907 record_key, paid_key,
908 "record and paid pruning keys must be independent"
909 );
910
911 let new_state = NeighborSyncState::new_cycle(vec![
913 peer_id_from_byte(1),
914 peer_id_from_byte(2),
915 peer_id_from_byte(3),
916 ]);
917 assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
918 assert!(
919 !new_state.is_cycle_complete(),
920 "new cycle should not be immediately complete"
921 );
922 }
923
924 #[test]
925 fn scenario_38_mid_cycle_peer_join_prioritized() {
926 let peers = vec![
929 peer_id_from_byte(0xA),
930 peer_id_from_byte(0xB),
931 peer_id_from_byte(0xC),
932 ];
933 let mut state = NeighborSyncState::new_cycle(peers);
934
935 let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
937 assert_eq!(state.cursor, 1);
938
939 let peer_d = peer_id_from_byte(0xD);
942 assert_eq!(state.queue_priority_peers([peer_d]), 1);
943 assert!(!state.order.contains(&peer_d));
944 assert!(
945 !state.is_cycle_complete(),
946 "pending priority sync keeps the cycle active"
947 );
948
949 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
950 assert_eq!(batch, vec![peer_d, peer_id_from_byte(0xB)]);
951 }
952
953 #[test]
954 fn scenario_39_unreachable_peer_removed_slot_filled() {
955 let peers = vec![
958 peer_id_from_byte(1),
959 peer_id_from_byte(2),
960 peer_id_from_byte(3),
961 peer_id_from_byte(4),
962 peer_id_from_byte(5),
963 ];
964 let mut state = NeighborSyncState::new_cycle(peers);
965
966 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
968 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
969
970 let replacement =
972 handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
973 assert!(!state.order.contains(&peer_id_from_byte(2)));
974
975 assert_eq!(
977 replacement,
978 Some(peer_id_from_byte(3)),
979 "vacated slot should be filled by next peer in order"
980 );
981
982 let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
984 assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
985 assert!(state.is_cycle_complete());
986 }
987
988 #[test]
989 fn scenario_40_cooldown_peer_removed_from_snapshot() {
990 let peers = vec![
994 peer_id_from_byte(1),
995 peer_id_from_byte(2),
996 peer_id_from_byte(3),
997 ];
998 let mut state = NeighborSyncState::new_cycle(peers);
999 let cooldown = Duration::from_secs(3600);
1000
1001 state
1003 .last_sync_times
1004 .insert(peer_id_from_byte(2), Instant::now());
1005
1006 let batch = select_sync_batch(&mut state, 3, cooldown);
1007
1008 assert!(!state.order.contains(&peer_id_from_byte(2)));
1010 assert_eq!(state.order.len(), 2, "order should shrink by 1");
1011
1012 assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
1014
1015 assert!(state.is_cycle_complete());
1017 }
1018
1019 #[test]
1020 fn priority_peer_in_snapshot_is_not_selected_twice() {
1021 let peers = vec![
1022 peer_id_from_byte(1),
1023 peer_id_from_byte(2),
1024 peer_id_from_byte(3),
1025 ];
1026 let mut state = NeighborSyncState::new_cycle(peers);
1027 assert_eq!(state.queue_priority_peers([peer_id_from_byte(2)]), 1);
1028
1029 let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1030
1031 assert_eq!(batch, vec![peer_id_from_byte(2), peer_id_from_byte(1)]);
1032 assert_eq!(
1033 state.order,
1034 vec![peer_id_from_byte(1), peer_id_from_byte(3)]
1035 );
1036 assert_eq!(state.cursor, 1);
1037 }
1038
1039 #[test]
1040 fn priority_peer_on_cooldown_is_skipped_and_removed_from_snapshot() {
1041 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1042 let mut state = NeighborSyncState::new_cycle(peers);
1043 let cooldown = Duration::from_secs(1);
1044 let priority_peer = peer_id_from_byte(2);
1045 state.last_sync_times.insert(priority_peer, Instant::now());
1046 assert_eq!(state.queue_priority_peers([priority_peer]), 1);
1047
1048 let batch = select_sync_batch(&mut state, 2, cooldown);
1049
1050 assert_eq!(batch, vec![peer_id_from_byte(1)]);
1051 assert!(state.priority_order.is_empty());
1052 assert!(!state.order.contains(&priority_peer));
1053 }
1054
1055 #[test]
1056 fn failure_replacement_prefers_remaining_priority_peer() {
1057 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1058 let mut state = NeighborSyncState::new_cycle(peers);
1059 let first_priority_peer = peer_id_from_byte(3);
1060 let second_priority_peer = peer_id_from_byte(4);
1061 assert_eq!(
1062 state.queue_priority_peers([first_priority_peer, second_priority_peer]),
1063 2
1064 );
1065
1066 let batch = select_sync_batch(&mut state, 1, Duration::from_secs(0));
1067 assert_eq!(batch, vec![first_priority_peer]);
1068
1069 let replacement =
1070 handle_sync_failure(&mut state, &first_priority_peer, Duration::from_secs(0));
1071
1072 assert_eq!(replacement, Some(second_priority_peer));
1073 assert!(state.priority_order.is_empty());
1074 assert_eq!(state.cursor, 0);
1075 }
1076
1077 #[test]
1078 fn scenario_41_cycle_always_terminates() {
1079 let peer_count: u8 = 10;
1083 let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
1084 let mut state = NeighborSyncState::new_cycle(peers);
1085 let cooldown = Duration::from_secs(3600);
1086
1087 for i in 1..=peer_count {
1089 state
1090 .last_sync_times
1091 .insert(peer_id_from_byte(i), Instant::now());
1092 }
1093
1094 let batch = select_sync_batch(&mut state, 4, cooldown);
1095
1096 assert!(
1097 batch.is_empty(),
1098 "all peers on cooldown — batch must be empty"
1099 );
1100 assert!(state.order.is_empty(), "all peers should have been removed");
1101 assert!(
1102 state.is_cycle_complete(),
1103 "cycle must terminate when all peers are removed"
1104 );
1105 }
1106
1107 #[test]
1108 fn consecutive_rounds_advance_through_full_cycle() {
1109 let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
1113 let mut state = NeighborSyncState::new_cycle(peers);
1114 let batch_size = 2;
1115 let no_cooldown = Duration::from_secs(0);
1116
1117 let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
1118 assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
1119 assert_eq!(state.cursor, 2);
1120 assert!(!state.is_cycle_complete());
1121
1122 let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
1123 assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
1124 assert_eq!(state.cursor, 4);
1125 assert!(!state.is_cycle_complete());
1126
1127 let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
1128 assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
1129 assert_eq!(state.cursor, 6);
1130 assert!(state.is_cycle_complete());
1131
1132 let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
1134 assert!(round4.is_empty());
1135 }
1136
1137 #[test]
1158 fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
1159 let sender = peer_id_from_byte(0x37);
1160
1161 let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
1164 let outbound_paid_hints = vec![[0x03; 32]];
1165 let response = NeighborSyncResponse {
1166 replica_hints: outbound_replica_hints.clone(),
1167 paid_hints: outbound_paid_hints.clone(),
1168 bootstrapping: false,
1169 rejected_keys: Vec::new(),
1170 };
1171
1172 let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
1174 let inbound_paid_hints = vec![[0xB0; 32]];
1175
1176 let sender_in_rt = false;
1178 let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
1179
1180 assert_eq!(
1182 response.replica_hints, outbound_replica_hints,
1183 "outbound replica hints must be sent even when sender is not in LocalRT"
1184 );
1185 assert_eq!(
1186 response.paid_hints, outbound_paid_hints,
1187 "outbound paid hints must be sent even when sender is not in LocalRT"
1188 );
1189
1190 if !sender_in_rt {
1192 let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
1195 let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
1196
1197 for key in &inbound_replica_hints {
1198 assert!(
1199 !admitted_replica_keys.contains(key),
1200 "inbound replica hints must NOT be admitted from non-RT sender"
1201 );
1202 }
1203 for key in &inbound_paid_hints {
1204 assert!(
1205 !admitted_paid_keys.contains(key),
1206 "inbound paid hints must NOT be admitted from non-RT sender"
1207 );
1208 }
1209
1210 assert!(
1212 !sync_history.contains_key(&sender),
1213 "sync history must NOT be updated for non-LocalRT sender"
1214 );
1215 }
1216
1217 let sender_in_rt = true;
1219 assert!(
1220 sender_in_rt,
1221 "when sender is in LocalRT, inbound hints are processed"
1222 );
1223
1224 sync_history.insert(
1226 sender,
1227 PeerSyncRecord {
1228 last_sync: Some(Instant::now()),
1229 cycles_since_sync: 0,
1230 },
1231 );
1232 assert!(
1233 sync_history.contains_key(&sender),
1234 "sync history should be updated for LocalRT sender"
1235 );
1236 assert!(
1237 sync_history
1238 .get(&sender)
1239 .expect("sender in history")
1240 .last_sync
1241 .is_some(),
1242 "last_sync should be recorded for RT sender"
1243 );
1244 }
1245
1246 #[test]
1247 fn cycle_completion_resets_cursor_but_keeps_sync_times() {
1248 let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
1251 let mut state = NeighborSyncState::new_cycle(peers);
1252
1253 let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
1255 record_successful_sync(&mut state, &peer_id_from_byte(1));
1256 record_successful_sync(&mut state, &peer_id_from_byte(2));
1257 assert!(state.is_cycle_complete());
1258
1259 let old_sync_times = state.last_sync_times.clone();
1261 assert_eq!(old_sync_times.len(), 2);
1262
1263 let new_peers = vec![
1266 peer_id_from_byte(1),
1267 peer_id_from_byte(2),
1268 peer_id_from_byte(3),
1269 ];
1270 let mut new_state = NeighborSyncState::new_cycle(new_peers);
1271 new_state.last_sync_times = old_sync_times;
1272
1273 assert_eq!(new_state.cursor, 0);
1275 assert!(!new_state.is_cycle_complete());
1276
1277 assert_eq!(new_state.last_sync_times.len(), 2);
1279 assert!(new_state
1280 .last_sync_times
1281 .contains_key(&peer_id_from_byte(1)));
1282 assert!(new_state
1283 .last_sync_times
1284 .contains_key(&peer_id_from_byte(2)));
1285
1286 let cooldown = Duration::from_secs(3600);
1289 let batch = select_sync_batch(&mut new_state, 3, cooldown);
1290 assert_eq!(
1291 batch,
1292 std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
1293 "only the new peer should be selected; old peers are on cooldown"
1294 );
1295 }
1296}