use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::logging::{debug, info, warn};
use rand::Rng;
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
use crate::ant_protocol::XorName;
use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
use crate::replication::paid_list::PaidList;
use crate::replication::protocol::{
NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
};
use crate::replication::types::NeighborSyncState;
use crate::storage::LmdbStorage;
const HINT_BUILD_SLOW_LOG_MS: u128 = 250;
#[derive(Debug)]
pub(crate) struct SentReplicaHint {
pub(crate) key: XorName,
pub(crate) close_peers: HashSet<PeerId>,
}
#[derive(Debug)]
pub(crate) struct NeighborSyncOutcome {
pub(crate) response: NeighborSyncResponse,
pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
}
#[derive(Debug, Default)]
pub(crate) struct PeerSyncHints {
pub(crate) sent_replica_hints: Vec<SentReplicaHint>,
paid_hints: Vec<XorName>,
}
pub async fn build_replica_hints_for_peer(
peer: &PeerId,
storage: &Arc<LmdbStorage>,
p2p_node: &Arc<P2PNode>,
close_group_size: usize,
) -> Vec<XorName> {
build_replica_hints_for_peer_with_close_groups(peer, storage, p2p_node, close_group_size)
.await
.into_iter()
.map(|hint| hint.key)
.collect()
}
pub(crate) async fn build_replica_hints_for_peer_with_close_groups(
peer: &PeerId,
storage: &Arc<LmdbStorage>,
p2p_node: &Arc<P2PNode>,
close_group_size: usize,
) -> Vec<SentReplicaHint> {
let all_keys = match storage.all_keys().await {
Ok(keys) => keys,
Err(e) => {
warn!("Failed to read stored keys for hint construction: {e}");
return Vec::new();
}
};
let dht = p2p_node.dht_manager();
let mut hints = Vec::new();
for key in all_keys {
let closest = dht
.find_closest_nodes_local_with_self(&key, close_group_size)
.await;
let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
if close_peers.contains(peer) {
hints.push(SentReplicaHint { key, close_peers });
}
}
hints
}
pub(crate) async fn build_sync_hints_for_peers(
peers: &[PeerId],
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
p2p_node: &Arc<P2PNode>,
close_group_size: usize,
paid_list_close_group_size: usize,
) -> HashMap<PeerId, PeerSyncHints> {
let started = Instant::now();
let target_peers = peers.iter().copied().collect::<HashSet<_>>();
let mut hints_by_peer = peers
.iter()
.copied()
.map(|peer| (peer, PeerSyncHints::default()))
.collect::<HashMap<_, _>>();
if peers.is_empty() {
return hints_by_peer;
}
let all_keys = match storage.all_keys().await {
Ok(keys) => keys,
Err(e) => {
warn!("Failed to read stored keys for batch hint construction: {e}");
Vec::new()
}
};
let stored_key_count = all_keys.len();
let dht = p2p_node.dht_manager();
for key in all_keys {
let closest = dht
.find_closest_nodes_local_with_self(&key, close_group_size)
.await;
let close_peers = closest.iter().map(|n| n.peer_id).collect::<HashSet<_>>();
for peer in close_peers.intersection(&target_peers) {
if let Some(peer_hints) = hints_by_peer.get_mut(peer) {
peer_hints.sent_replica_hints.push(SentReplicaHint {
key,
close_peers: close_peers.clone(),
});
}
}
}
let all_paid_keys = match paid_list.all_keys() {
Ok(keys) => keys,
Err(e) => {
warn!("Failed to read PaidForList for batch hint construction: {e}");
Vec::new()
}
};
let paid_key_count = all_paid_keys.len();
for key in all_paid_keys {
let closest = dht
.find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
.await;
for node in closest {
if target_peers.contains(&node.peer_id) {
if let Some(peer_hints) = hints_by_peer.get_mut(&node.peer_id) {
peer_hints.paid_hints.push(key);
}
}
}
}
let replica_hint_count = hints_by_peer
.values()
.map(|hints| hints.sent_replica_hints.len())
.sum::<usize>();
let paid_hint_count = hints_by_peer
.values()
.map(|hints| hints.paid_hints.len())
.sum::<usize>();
let elapsed_ms = started.elapsed().as_millis();
if elapsed_ms >= HINT_BUILD_SLOW_LOG_MS {
info!(
target: "ant_node::replication::neighbor_sync",
"Slow neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
peers.len(),
stored_key_count,
paid_key_count,
replica_hint_count,
paid_hint_count,
);
} else {
debug!(
target: "ant_node::replication::neighbor_sync",
"Neighbor-sync hint build: peers={}, stored_keys={}, paid_keys={}, replica_hints={}, paid_hints={}, elapsed_ms={elapsed_ms}",
peers.len(),
stored_key_count,
paid_key_count,
replica_hint_count,
paid_hint_count,
);
}
hints_by_peer
}
pub async fn build_paid_hints_for_peer(
peer: &PeerId,
paid_list: &Arc<PaidList>,
p2p_node: &Arc<P2PNode>,
paid_list_close_group_size: usize,
) -> Vec<XorName> {
let all_paid_keys = match paid_list.all_keys() {
Ok(keys) => keys,
Err(e) => {
warn!("Failed to read PaidForList for hint construction: {e}");
return Vec::new();
}
};
let dht = p2p_node.dht_manager();
let mut hints = Vec::new();
for key in all_paid_keys {
let closest = dht
.find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
.await;
if closest.iter().any(|n| n.peer_id == *peer) {
hints.push(key);
}
}
hints
}
pub async fn snapshot_close_neighbors(
p2p_node: &Arc<P2PNode>,
self_id: &PeerId,
scope: usize,
) -> Vec<PeerId> {
let self_xor: XorName = *self_id.as_bytes();
let closest = p2p_node
.dht_manager()
.find_closest_nodes_local(&self_xor, scope)
.await;
closest.iter().map(|n| n.peer_id).collect()
}
pub fn select_sync_batch(
state: &mut NeighborSyncState,
peer_count: usize,
cooldown: Duration,
) -> Vec<PeerId> {
let mut batch = Vec::new();
let now = Instant::now();
while batch.len() < peer_count {
let Some(peer) = select_next_sync_peer(state, now, cooldown) else {
break;
};
batch.push(peer);
}
batch
}
fn select_next_sync_peer(
state: &mut NeighborSyncState,
now: Instant,
cooldown: Duration,
) -> Option<PeerId> {
while let Some(peer) = state.priority_order.pop_front() {
if peer_on_cooldown(state, &peer, now, cooldown) {
state.remove_peer(&peer);
continue;
}
state.remove_peer(&peer);
return Some(peer);
}
while state.cursor < state.order.len() {
let peer = state.order[state.cursor];
if peer_on_cooldown(state, &peer, now, cooldown) {
state.order.remove(state.cursor);
continue;
}
state.cursor += 1;
return Some(peer);
}
None
}
fn peer_on_cooldown(
state: &NeighborSyncState,
peer: &PeerId,
now: Instant,
cooldown: Duration,
) -> bool {
state
.last_sync_times
.get(peer)
.is_some_and(|last_sync| now.duration_since(*last_sync) < cooldown)
}
pub async fn sync_with_peer(
peer: &PeerId,
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
config: &ReplicationConfig,
is_bootstrapping: bool,
) -> Option<NeighborSyncResponse> {
sync_with_peer_with_outcome(peer, p2p_node, storage, paid_list, config, is_bootstrapping)
.await
.map(|outcome| outcome.response)
}
pub(crate) async fn sync_with_peer_with_outcome(
peer: &PeerId,
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
config: &ReplicationConfig,
is_bootstrapping: bool,
) -> Option<NeighborSyncOutcome> {
let mut hints_by_peer = build_sync_hints_for_peers(
std::slice::from_ref(peer),
storage,
paid_list,
p2p_node,
config.close_group_size,
config.paid_list_close_group_size,
)
.await;
let hints = hints_by_peer.remove(peer).unwrap_or_default();
sync_with_peer_with_hints(peer, p2p_node, config, is_bootstrapping, hints).await
}
pub(crate) async fn sync_with_peer_with_hints(
peer: &PeerId,
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
is_bootstrapping: bool,
hints: PeerSyncHints,
) -> Option<NeighborSyncOutcome> {
let replica_hints = hints
.sent_replica_hints
.iter()
.map(|hint| hint.key)
.collect::<Vec<_>>();
let sent_replica_hints = hints.sent_replica_hints;
let request = NeighborSyncRequest {
replica_hints,
paid_hints: hints.paid_hints,
bootstrapping: is_bootstrapping,
};
let request_id = rand::thread_rng().gen::<u64>();
let msg = ReplicationMessage {
request_id,
body: ReplicationMessageBody::NeighborSyncRequest(request),
};
let encoded = match msg.encode() {
Ok(data) => data,
Err(e) => {
warn!("Failed to encode sync request for {peer}: {e}");
return None;
}
};
let response = match p2p_node
.send_request(
peer,
REPLICATION_PROTOCOL_ID,
encoded,
config.verification_request_timeout,
)
.await
{
Ok(resp) => resp,
Err(e) => {
debug!("Sync with {peer} failed: {e}");
return None;
}
};
match ReplicationMessage::decode(&response.data) {
Ok(decoded) => {
if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
Some(NeighborSyncOutcome {
response: resp,
sent_replica_hints,
})
} else {
warn!("Unexpected response type from {peer} during sync");
None
}
}
Err(e) => {
warn!("Failed to decode sync response from {peer}: {e}");
None
}
}
}
pub fn handle_sync_failure(
state: &mut NeighborSyncState,
failed_peer: &PeerId,
cooldown: Duration,
) -> Option<PeerId> {
state.remove_peer(failed_peer);
let now = Instant::now();
select_next_sync_peer(state, now, cooldown)
}
pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
state.last_sync_times.insert(*peer, Instant::now());
}
pub async fn handle_sync_request(
sender: &PeerId,
request: &NeighborSyncRequest,
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
config: &ReplicationConfig,
is_bootstrapping: bool,
) -> (NeighborSyncResponse, bool) {
let (response, _, sender_in_rt) = handle_sync_request_with_proofs(
sender,
request,
p2p_node,
storage,
paid_list,
config,
is_bootstrapping,
)
.await;
(response, sender_in_rt)
}
pub(crate) async fn handle_sync_request_with_proofs(
sender: &PeerId,
_request: &NeighborSyncRequest,
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
config: &ReplicationConfig,
is_bootstrapping: bool,
) -> (NeighborSyncResponse, Vec<SentReplicaHint>, bool) {
let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
let sent_replica_hints = build_replica_hints_for_peer_with_close_groups(
sender,
storage,
p2p_node,
config.close_group_size,
)
.await;
let replica_hints = sent_replica_hints
.iter()
.map(|hint| hint.key)
.collect::<Vec<_>>();
let paid_hints = build_paid_hints_for_peer(
sender,
paid_list,
p2p_node,
config.paid_list_close_group_size,
)
.await;
let response = NeighborSyncResponse {
replica_hints,
paid_hints,
bootstrapping: is_bootstrapping,
rejected_keys: Vec::new(),
};
(response, sent_replica_hints, sender_in_rt)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::replication::types::PeerSyncRecord;
use std::collections::HashMap;
fn peer_id_from_byte(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
#[test]
fn select_sync_batch_returns_up_to_peer_count() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
peer_id_from_byte(5),
];
let mut state = NeighborSyncState::new_cycle(peers);
let batch_size = 3;
let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
assert_eq!(batch.len(), batch_size);
assert_eq!(batch[0], peer_id_from_byte(1));
assert_eq!(batch[1], peer_id_from_byte(2));
assert_eq!(batch[2], peer_id_from_byte(3));
assert_eq!(state.cursor, 3);
}
#[test]
fn select_sync_batch_skips_cooldown_peers() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
];
let mut state = NeighborSyncState::new_cycle(peers);
state
.last_sync_times
.insert(peer_id_from_byte(1), Instant::now());
state
.last_sync_times
.insert(peer_id_from_byte(3), Instant::now());
let cooldown = Duration::from_secs(3600); let batch = select_sync_batch(&mut state, 2, cooldown);
assert_eq!(batch.len(), 2);
assert_eq!(batch[0], peer_id_from_byte(2));
assert_eq!(batch[1], peer_id_from_byte(4));
assert!(!state.order.contains(&peer_id_from_byte(1)));
assert!(!state.order.contains(&peer_id_from_byte(3)));
}
#[test]
fn select_sync_batch_expired_cooldown_not_skipped() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
state.last_sync_times.insert(
peer_id_from_byte(1),
Instant::now()
.checked_sub(Duration::from_secs(2))
.unwrap_or_else(Instant::now),
);
let cooldown = Duration::from_secs(1);
let batch = select_sync_batch(&mut state, 2, cooldown);
assert_eq!(batch.len(), 2);
assert_eq!(batch[0], peer_id_from_byte(1));
assert_eq!(batch[1], peer_id_from_byte(2));
}
#[test]
fn select_sync_batch_empty_order() {
let mut state = NeighborSyncState::new_cycle(vec![]);
let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
assert!(batch.is_empty());
assert_eq!(state.cursor, 0);
}
#[test]
fn select_sync_batch_all_on_cooldown() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
state
.last_sync_times
.insert(peer_id_from_byte(1), Instant::now());
state
.last_sync_times
.insert(peer_id_from_byte(2), Instant::now());
let cooldown = Duration::from_secs(3600);
let batch = select_sync_batch(&mut state, 4, cooldown);
assert!(batch.is_empty());
assert!(state.order.is_empty());
}
#[test]
fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
];
let mut state = NeighborSyncState::new_cycle(peers);
state.cursor = 2;
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(2)));
assert!(replacement.is_some());
}
#[test]
fn handle_sync_failure_removes_peer_after_cursor() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
];
let mut state = NeighborSyncState::new_cycle(peers);
state.cursor = 1;
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(3)));
assert_eq!(replacement, Some(peer_id_from_byte(2)));
}
#[test]
fn handle_sync_failure_no_replacement_when_exhausted() {
let peers = vec![peer_id_from_byte(1)];
let mut state = NeighborSyncState::new_cycle(peers);
state.cursor = 1;
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
assert!(state.order.is_empty());
assert!(replacement.is_none());
}
#[test]
fn handle_sync_failure_unknown_peer_is_noop() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
state.cursor = 1;
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
assert_eq!(state.order.len(), 2);
assert_eq!(replacement, Some(peer_id_from_byte(2)));
assert_eq!(state.cursor, 2);
}
#[test]
fn record_successful_sync_updates_last_sync_time() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
let peer = peer_id_from_byte(1);
assert!(!state.last_sync_times.contains_key(&peer));
let before = Instant::now();
record_successful_sync(&mut state, &peer);
let after = Instant::now();
let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
assert!(*ts >= before);
assert!(*ts <= after);
}
#[test]
fn record_successful_sync_overwrites_previous() {
let peers = vec![peer_id_from_byte(1)];
let mut state = NeighborSyncState::new_cycle(peers);
let peer = peer_id_from_byte(1);
let old_time = Instant::now()
.checked_sub(Duration::from_secs(2))
.unwrap_or_else(Instant::now);
state.last_sync_times.insert(peer, old_time);
record_successful_sync(&mut state, &peer);
let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
assert!(*ts > old_time, "sync time should be updated");
}
#[test]
fn scenario_35_round_robin_with_cooldown_skip() {
let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
let batch_size = 4;
let cooldown = Duration::from_secs(3600);
state
.last_sync_times
.insert(peer_id_from_byte(2), Instant::now());
state
.last_sync_times
.insert(peer_id_from_byte(4), Instant::now());
let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
assert_eq!(batch1.len(), 4);
assert_eq!(batch1[0], peer_id_from_byte(1));
assert_eq!(batch1[1], peer_id_from_byte(3));
assert_eq!(batch1[2], peer_id_from_byte(5));
assert_eq!(batch1[3], peer_id_from_byte(6));
assert!(!state.order.contains(&peer_id_from_byte(2)));
assert!(!state.order.contains(&peer_id_from_byte(4)));
let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
assert_eq!(batch2.len(), 2);
assert_eq!(batch2[0], peer_id_from_byte(7));
assert_eq!(batch2[1], peer_id_from_byte(8));
assert!(state.is_cycle_complete());
}
#[test]
fn cycle_complete_when_cursor_past_order() {
let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
assert!(!state.is_cycle_complete());
state.cursor = 3;
assert!(state.is_cycle_complete());
state.cursor = 10;
assert!(state.is_cycle_complete());
state.order.clear();
state.cursor = 0;
assert!(state.is_cycle_complete());
}
#[test]
fn scenario_36_post_cycle_triggers_combined_prune_pass() {
let config = ReplicationConfig::default();
let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
assert!(
state.is_cycle_complete(),
"cycle must be complete before prune pass triggers"
);
assert!(
!config.prune_hysteresis_duration.is_zero(),
"PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
);
let record_key: [u8; 32] = [0x36; 32];
let paid_key: [u8; 32] = [0x37; 32];
let record_first_seen = Instant::now();
let paid_first_seen = Instant::now();
let record_elapsed = record_first_seen.elapsed();
let paid_elapsed = paid_first_seen.elapsed();
assert!(
record_elapsed < config.prune_hysteresis_duration,
"record key should be retained within hysteresis window"
);
assert!(
paid_elapsed < config.prune_hysteresis_duration,
"paid key should be retained within hysteresis window"
);
assert_ne!(
record_key, paid_key,
"record and paid pruning keys must be independent"
);
let new_state = NeighborSyncState::new_cycle(vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
]);
assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
assert!(
!new_state.is_cycle_complete(),
"new cycle should not be immediately complete"
);
}
#[test]
fn scenario_38_mid_cycle_peer_join_prioritized() {
let peers = vec![
peer_id_from_byte(0xA),
peer_id_from_byte(0xB),
peer_id_from_byte(0xC),
];
let mut state = NeighborSyncState::new_cycle(peers);
let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
assert_eq!(state.cursor, 1);
let peer_d = peer_id_from_byte(0xD);
assert_eq!(state.queue_priority_peers([peer_d]), 1);
assert!(!state.order.contains(&peer_d));
assert!(
!state.is_cycle_complete(),
"pending priority sync keeps the cycle active"
);
let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
assert_eq!(batch, vec![peer_d, peer_id_from_byte(0xB)]);
}
#[test]
fn scenario_39_unreachable_peer_removed_slot_filled() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
peer_id_from_byte(5),
];
let mut state = NeighborSyncState::new_cycle(peers);
let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
assert!(!state.order.contains(&peer_id_from_byte(2)));
assert_eq!(
replacement,
Some(peer_id_from_byte(3)),
"vacated slot should be filled by next peer in order"
);
let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
assert!(state.is_cycle_complete());
}
#[test]
fn scenario_40_cooldown_peer_removed_from_snapshot() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
];
let mut state = NeighborSyncState::new_cycle(peers);
let cooldown = Duration::from_secs(3600);
state
.last_sync_times
.insert(peer_id_from_byte(2), Instant::now());
let batch = select_sync_batch(&mut state, 3, cooldown);
assert!(!state.order.contains(&peer_id_from_byte(2)));
assert_eq!(state.order.len(), 2, "order should shrink by 1");
assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
assert!(state.is_cycle_complete());
}
#[test]
fn priority_peer_in_snapshot_is_not_selected_twice() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
];
let mut state = NeighborSyncState::new_cycle(peers);
assert_eq!(state.queue_priority_peers([peer_id_from_byte(2)]), 1);
let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
assert_eq!(batch, vec![peer_id_from_byte(2), peer_id_from_byte(1)]);
assert_eq!(
state.order,
vec![peer_id_from_byte(1), peer_id_from_byte(3)]
);
assert_eq!(state.cursor, 1);
}
#[test]
fn priority_peer_on_cooldown_is_skipped_and_removed_from_snapshot() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
let cooldown = Duration::from_secs(1);
let priority_peer = peer_id_from_byte(2);
state.last_sync_times.insert(priority_peer, Instant::now());
assert_eq!(state.queue_priority_peers([priority_peer]), 1);
let batch = select_sync_batch(&mut state, 2, cooldown);
assert_eq!(batch, vec![peer_id_from_byte(1)]);
assert!(state.priority_order.is_empty());
assert!(!state.order.contains(&priority_peer));
}
#[test]
fn failure_replacement_prefers_remaining_priority_peer() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
let first_priority_peer = peer_id_from_byte(3);
let second_priority_peer = peer_id_from_byte(4);
assert_eq!(
state.queue_priority_peers([first_priority_peer, second_priority_peer]),
2
);
let batch = select_sync_batch(&mut state, 1, Duration::from_secs(0));
assert_eq!(batch, vec![first_priority_peer]);
let replacement =
handle_sync_failure(&mut state, &first_priority_peer, Duration::from_secs(0));
assert_eq!(replacement, Some(second_priority_peer));
assert!(state.priority_order.is_empty());
assert_eq!(state.cursor, 0);
}
#[test]
fn scenario_41_cycle_always_terminates() {
let peer_count: u8 = 10;
let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
let cooldown = Duration::from_secs(3600);
for i in 1..=peer_count {
state
.last_sync_times
.insert(peer_id_from_byte(i), Instant::now());
}
let batch = select_sync_batch(&mut state, 4, cooldown);
assert!(
batch.is_empty(),
"all peers on cooldown — batch must be empty"
);
assert!(state.order.is_empty(), "all peers should have been removed");
assert!(
state.is_cycle_complete(),
"cycle must terminate when all peers are removed"
);
}
#[test]
fn consecutive_rounds_advance_through_full_cycle() {
let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
let batch_size = 2;
let no_cooldown = Duration::from_secs(0);
let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
assert_eq!(state.cursor, 2);
assert!(!state.is_cycle_complete());
let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
assert_eq!(state.cursor, 4);
assert!(!state.is_cycle_complete());
let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
assert_eq!(state.cursor, 6);
assert!(state.is_cycle_complete());
let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
assert!(round4.is_empty());
}
#[test]
fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
let sender = peer_id_from_byte(0x37);
let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
let outbound_paid_hints = vec![[0x03; 32]];
let response = NeighborSyncResponse {
replica_hints: outbound_replica_hints.clone(),
paid_hints: outbound_paid_hints.clone(),
bootstrapping: false,
rejected_keys: Vec::new(),
};
let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
let inbound_paid_hints = vec![[0xB0; 32]];
let sender_in_rt = false;
let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
assert_eq!(
response.replica_hints, outbound_replica_hints,
"outbound replica hints must be sent even when sender is not in LocalRT"
);
assert_eq!(
response.paid_hints, outbound_paid_hints,
"outbound paid hints must be sent even when sender is not in LocalRT"
);
if !sender_in_rt {
let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
for key in &inbound_replica_hints {
assert!(
!admitted_replica_keys.contains(key),
"inbound replica hints must NOT be admitted from non-RT sender"
);
}
for key in &inbound_paid_hints {
assert!(
!admitted_paid_keys.contains(key),
"inbound paid hints must NOT be admitted from non-RT sender"
);
}
assert!(
!sync_history.contains_key(&sender),
"sync history must NOT be updated for non-LocalRT sender"
);
}
let sender_in_rt = true;
assert!(
sender_in_rt,
"when sender is in LocalRT, inbound hints are processed"
);
sync_history.insert(
sender,
PeerSyncRecord {
last_sync: Some(Instant::now()),
cycles_since_sync: 0,
},
);
assert!(
sync_history.contains_key(&sender),
"sync history should be updated for LocalRT sender"
);
assert!(
sync_history
.get(&sender)
.expect("sender in history")
.last_sync
.is_some(),
"last_sync should be recorded for RT sender"
);
}
#[test]
fn cycle_completion_resets_cursor_but_keeps_sync_times() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
record_successful_sync(&mut state, &peer_id_from_byte(1));
record_successful_sync(&mut state, &peer_id_from_byte(2));
assert!(state.is_cycle_complete());
let old_sync_times = state.last_sync_times.clone();
assert_eq!(old_sync_times.len(), 2);
let new_peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
];
let mut new_state = NeighborSyncState::new_cycle(new_peers);
new_state.last_sync_times = old_sync_times;
assert_eq!(new_state.cursor, 0);
assert!(!new_state.is_cycle_complete());
assert_eq!(new_state.last_sync_times.len(), 2);
assert!(new_state
.last_sync_times
.contains_key(&peer_id_from_byte(1)));
assert!(new_state
.last_sync_times
.contains_key(&peer_id_from_byte(2)));
let cooldown = Duration::from_secs(3600);
let batch = select_sync_batch(&mut new_state, 3, cooldown);
assert_eq!(
batch,
std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
"only the new peer should be selected; old peers are on cooldown"
);
}
}