use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::logging::{debug, warn};
use rand::Rng;
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
use crate::ant_protocol::XorName;
use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
use crate::replication::paid_list::PaidList;
use crate::replication::protocol::{
NeighborSyncRequest, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody,
};
use crate::replication::types::NeighborSyncState;
use crate::storage::LmdbStorage;
pub async fn build_replica_hints_for_peer(
peer: &PeerId,
storage: &Arc<LmdbStorage>,
p2p_node: &Arc<P2PNode>,
close_group_size: usize,
) -> Vec<XorName> {
let all_keys = match storage.all_keys().await {
Ok(keys) => keys,
Err(e) => {
warn!("Failed to read stored keys for hint construction: {e}");
return Vec::new();
}
};
let dht = p2p_node.dht_manager();
let mut hints = Vec::new();
for key in all_keys {
let closest = dht
.find_closest_nodes_local_with_self(&key, close_group_size)
.await;
if closest.iter().any(|n| n.peer_id == *peer) {
hints.push(key);
}
}
hints
}
pub async fn build_paid_hints_for_peer(
peer: &PeerId,
paid_list: &Arc<PaidList>,
p2p_node: &Arc<P2PNode>,
paid_list_close_group_size: usize,
) -> Vec<XorName> {
let all_paid_keys = match paid_list.all_keys() {
Ok(keys) => keys,
Err(e) => {
warn!("Failed to read PaidForList for hint construction: {e}");
return Vec::new();
}
};
let dht = p2p_node.dht_manager();
let mut hints = Vec::new();
for key in all_paid_keys {
let closest = dht
.find_closest_nodes_local_with_self(&key, paid_list_close_group_size)
.await;
if closest.iter().any(|n| n.peer_id == *peer) {
hints.push(key);
}
}
hints
}
pub async fn snapshot_close_neighbors(
p2p_node: &Arc<P2PNode>,
self_id: &PeerId,
scope: usize,
) -> Vec<PeerId> {
let self_xor: XorName = *self_id.as_bytes();
let closest = p2p_node
.dht_manager()
.find_closest_nodes_local(&self_xor, scope)
.await;
closest.iter().map(|n| n.peer_id).collect()
}
pub fn select_sync_batch(
state: &mut NeighborSyncState,
peer_count: usize,
cooldown: Duration,
) -> Vec<PeerId> {
let mut batch = Vec::new();
let now = Instant::now();
while batch.len() < peer_count && state.cursor < state.order.len() {
let peer = state.order[state.cursor];
if let Some(last_sync) = state.last_sync_times.get(&peer) {
if now.duration_since(*last_sync) < cooldown {
state.order.remove(state.cursor);
continue;
}
}
batch.push(peer);
state.cursor += 1;
}
batch
}
pub async fn sync_with_peer(
peer: &PeerId,
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
config: &ReplicationConfig,
is_bootstrapping: bool,
) -> Option<NeighborSyncResponse> {
let replica_hints =
build_replica_hints_for_peer(peer, storage, p2p_node, config.close_group_size).await;
let paid_hints =
build_paid_hints_for_peer(peer, paid_list, p2p_node, config.paid_list_close_group_size)
.await;
let request = NeighborSyncRequest {
replica_hints,
paid_hints,
bootstrapping: is_bootstrapping,
};
let request_id = rand::thread_rng().gen::<u64>();
let msg = ReplicationMessage {
request_id,
body: ReplicationMessageBody::NeighborSyncRequest(request),
};
let encoded = match msg.encode() {
Ok(data) => data,
Err(e) => {
warn!("Failed to encode sync request for {peer}: {e}");
return None;
}
};
let response = match p2p_node
.send_request(
peer,
REPLICATION_PROTOCOL_ID,
encoded,
config.verification_request_timeout,
)
.await
{
Ok(resp) => resp,
Err(e) => {
debug!("Sync with {peer} failed: {e}");
return None;
}
};
match ReplicationMessage::decode(&response.data) {
Ok(decoded) => {
if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body {
Some(resp)
} else {
warn!("Unexpected response type from {peer} during sync");
None
}
}
Err(e) => {
warn!("Failed to decode sync response from {peer}: {e}");
None
}
}
}
pub fn handle_sync_failure(
state: &mut NeighborSyncState,
failed_peer: &PeerId,
cooldown: Duration,
) -> Option<PeerId> {
if let Some(pos) = state.order.iter().position(|p| p == failed_peer) {
state.order.remove(pos);
if pos < state.cursor {
state.cursor = state.cursor.saturating_sub(1);
}
}
let now = Instant::now();
while state.cursor < state.order.len() {
let candidate = state.order[state.cursor];
if let Some(last_sync) = state.last_sync_times.get(&candidate) {
if now.duration_since(*last_sync) < cooldown {
state.order.remove(state.cursor);
continue;
}
}
state.cursor += 1;
return Some(candidate);
}
None
}
pub fn record_successful_sync(state: &mut NeighborSyncState, peer: &PeerId) {
state.last_sync_times.insert(*peer, Instant::now());
}
pub async fn handle_sync_request(
sender: &PeerId,
_request: &NeighborSyncRequest,
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
config: &ReplicationConfig,
is_bootstrapping: bool,
) -> (NeighborSyncResponse, bool) {
let sender_in_rt = p2p_node.dht_manager().is_in_routing_table(sender).await;
let replica_hints =
build_replica_hints_for_peer(sender, storage, p2p_node, config.close_group_size).await;
let paid_hints = build_paid_hints_for_peer(
sender,
paid_list,
p2p_node,
config.paid_list_close_group_size,
)
.await;
let response = NeighborSyncResponse {
replica_hints,
paid_hints,
bootstrapping: is_bootstrapping,
rejected_keys: Vec::new(),
};
(response, sender_in_rt)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::replication::types::PeerSyncRecord;
use std::collections::HashMap;
fn peer_id_from_byte(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
#[test]
fn select_sync_batch_returns_up_to_peer_count() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
peer_id_from_byte(5),
];
let mut state = NeighborSyncState::new_cycle(peers);
let batch_size = 3;
let batch = select_sync_batch(&mut state, batch_size, Duration::from_secs(0));
assert_eq!(batch.len(), batch_size);
assert_eq!(batch[0], peer_id_from_byte(1));
assert_eq!(batch[1], peer_id_from_byte(2));
assert_eq!(batch[2], peer_id_from_byte(3));
assert_eq!(state.cursor, 3);
}
#[test]
fn select_sync_batch_skips_cooldown_peers() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
];
let mut state = NeighborSyncState::new_cycle(peers);
state
.last_sync_times
.insert(peer_id_from_byte(1), Instant::now());
state
.last_sync_times
.insert(peer_id_from_byte(3), Instant::now());
let cooldown = Duration::from_secs(3600); let batch = select_sync_batch(&mut state, 2, cooldown);
assert_eq!(batch.len(), 2);
assert_eq!(batch[0], peer_id_from_byte(2));
assert_eq!(batch[1], peer_id_from_byte(4));
assert!(!state.order.contains(&peer_id_from_byte(1)));
assert!(!state.order.contains(&peer_id_from_byte(3)));
}
#[test]
fn select_sync_batch_expired_cooldown_not_skipped() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
state.last_sync_times.insert(
peer_id_from_byte(1),
Instant::now()
.checked_sub(Duration::from_secs(2))
.unwrap_or_else(Instant::now),
);
let cooldown = Duration::from_secs(1);
let batch = select_sync_batch(&mut state, 2, cooldown);
assert_eq!(batch.len(), 2);
assert_eq!(batch[0], peer_id_from_byte(1));
assert_eq!(batch[1], peer_id_from_byte(2));
}
#[test]
fn select_sync_batch_empty_order() {
let mut state = NeighborSyncState::new_cycle(vec![]);
let batch = select_sync_batch(&mut state, 4, Duration::from_secs(0));
assert!(batch.is_empty());
assert_eq!(state.cursor, 0);
}
#[test]
fn select_sync_batch_all_on_cooldown() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
state
.last_sync_times
.insert(peer_id_from_byte(1), Instant::now());
state
.last_sync_times
.insert(peer_id_from_byte(2), Instant::now());
let cooldown = Duration::from_secs(3600);
let batch = select_sync_batch(&mut state, 4, cooldown);
assert!(batch.is_empty());
assert!(state.order.is_empty());
}
#[test]
fn handle_sync_failure_removes_peer_and_adjusts_cursor() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
];
let mut state = NeighborSyncState::new_cycle(peers);
state.cursor = 2;
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(2)));
assert!(replacement.is_some());
}
#[test]
fn handle_sync_failure_removes_peer_after_cursor() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
];
let mut state = NeighborSyncState::new_cycle(peers);
state.cursor = 1;
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(3), Duration::from_secs(0));
assert_eq!(state.cursor, 2); assert!(!state.order.contains(&peer_id_from_byte(3)));
assert_eq!(replacement, Some(peer_id_from_byte(2)));
}
#[test]
fn handle_sync_failure_no_replacement_when_exhausted() {
let peers = vec![peer_id_from_byte(1)];
let mut state = NeighborSyncState::new_cycle(peers);
state.cursor = 1;
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(1), Duration::from_secs(0));
assert!(state.order.is_empty());
assert!(replacement.is_none());
}
#[test]
fn handle_sync_failure_unknown_peer_is_noop() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
state.cursor = 1;
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(99), Duration::from_secs(0));
assert_eq!(state.order.len(), 2);
assert_eq!(replacement, Some(peer_id_from_byte(2)));
assert_eq!(state.cursor, 2);
}
#[test]
fn record_successful_sync_updates_last_sync_time() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
let peer = peer_id_from_byte(1);
assert!(!state.last_sync_times.contains_key(&peer));
let before = Instant::now();
record_successful_sync(&mut state, &peer);
let after = Instant::now();
let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
assert!(*ts >= before);
assert!(*ts <= after);
}
#[test]
fn record_successful_sync_overwrites_previous() {
let peers = vec![peer_id_from_byte(1)];
let mut state = NeighborSyncState::new_cycle(peers);
let peer = peer_id_from_byte(1);
let old_time = Instant::now()
.checked_sub(Duration::from_secs(2))
.unwrap_or_else(Instant::now);
state.last_sync_times.insert(peer, old_time);
record_successful_sync(&mut state, &peer);
let ts = state.last_sync_times.get(&peer).expect("timestamp exists");
assert!(*ts > old_time, "sync time should be updated");
}
#[test]
fn scenario_35_round_robin_with_cooldown_skip() {
let peers: Vec<PeerId> = (1..=8).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
let batch_size = 4;
let cooldown = Duration::from_secs(3600);
state
.last_sync_times
.insert(peer_id_from_byte(2), Instant::now());
state
.last_sync_times
.insert(peer_id_from_byte(4), Instant::now());
let batch1 = select_sync_batch(&mut state, batch_size, cooldown);
assert_eq!(batch1.len(), 4);
assert_eq!(batch1[0], peer_id_from_byte(1));
assert_eq!(batch1[1], peer_id_from_byte(3));
assert_eq!(batch1[2], peer_id_from_byte(5));
assert_eq!(batch1[3], peer_id_from_byte(6));
assert!(!state.order.contains(&peer_id_from_byte(2)));
assert!(!state.order.contains(&peer_id_from_byte(4)));
let batch2 = select_sync_batch(&mut state, batch_size, cooldown);
assert_eq!(batch2.len(), 2);
assert_eq!(batch2[0], peer_id_from_byte(7));
assert_eq!(batch2[1], peer_id_from_byte(8));
assert!(state.is_cycle_complete());
}
#[test]
fn cycle_complete_when_cursor_past_order() {
let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
assert!(!state.is_cycle_complete());
state.cursor = 3;
assert!(state.is_cycle_complete());
state.cursor = 10;
assert!(state.is_cycle_complete());
state.order.clear();
state.cursor = 0;
assert!(state.is_cycle_complete());
}
#[test]
fn scenario_36_post_cycle_triggers_combined_prune_pass() {
let config = ReplicationConfig::default();
let peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
let _ = select_sync_batch(&mut state, 3, Duration::from_secs(0));
assert!(
state.is_cycle_complete(),
"cycle must be complete before prune pass triggers"
);
assert!(
!config.prune_hysteresis_duration.is_zero(),
"PRUNE_HYSTERESIS_DURATION must be non-zero for hysteresis to work"
);
let record_key: [u8; 32] = [0x36; 32];
let paid_key: [u8; 32] = [0x37; 32];
let record_first_seen = Instant::now();
let paid_first_seen = Instant::now();
let record_elapsed = record_first_seen.elapsed();
let paid_elapsed = paid_first_seen.elapsed();
assert!(
record_elapsed < config.prune_hysteresis_duration,
"record key should be retained within hysteresis window"
);
assert!(
paid_elapsed < config.prune_hysteresis_duration,
"paid key should be retained within hysteresis window"
);
assert_ne!(
record_key, paid_key,
"record and paid pruning keys must be independent"
);
let new_state = NeighborSyncState::new_cycle(vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
]);
assert_eq!(new_state.cursor, 0, "cursor resets for new cycle");
assert!(
!new_state.is_cycle_complete(),
"new cycle should not be immediately complete"
);
}
#[test]
fn scenario_38_mid_cycle_peer_join_excluded() {
let peers = vec![
peer_id_from_byte(0xA),
peer_id_from_byte(0xB),
peer_id_from_byte(0xC),
];
let mut state = NeighborSyncState::new_cycle(peers);
let _ = select_sync_batch(&mut state, 1, Duration::from_secs(0));
assert_eq!(state.cursor, 1);
let peer_d = peer_id_from_byte(0xD);
assert!(
!state.order.contains(&peer_d),
"mid-cycle joiner must not appear in the current snapshot"
);
let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
assert!(state.is_cycle_complete());
let new_peers = vec![
peer_id_from_byte(0xA),
peer_id_from_byte(0xB),
peer_id_from_byte(0xC),
peer_d,
];
let new_state = NeighborSyncState::new_cycle(new_peers);
assert!(
new_state.order.contains(&peer_d),
"after new snapshot, joiner D should be present"
);
}
#[test]
fn scenario_39_unreachable_peer_removed_slot_filled() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
peer_id_from_byte(4),
peer_id_from_byte(5),
];
let mut state = NeighborSyncState::new_cycle(peers);
let batch = select_sync_batch(&mut state, 2, Duration::from_secs(0));
assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
let replacement =
handle_sync_failure(&mut state, &peer_id_from_byte(2), Duration::from_secs(0));
assert!(!state.order.contains(&peer_id_from_byte(2)));
assert_eq!(
replacement,
Some(peer_id_from_byte(3)),
"vacated slot should be filled by next peer in order"
);
let batch2 = select_sync_batch(&mut state, 2, Duration::from_secs(0));
assert_eq!(batch2, vec![peer_id_from_byte(4), peer_id_from_byte(5)]);
assert!(state.is_cycle_complete());
}
#[test]
fn scenario_40_cooldown_peer_removed_from_snapshot() {
let peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
];
let mut state = NeighborSyncState::new_cycle(peers);
let cooldown = Duration::from_secs(3600);
state
.last_sync_times
.insert(peer_id_from_byte(2), Instant::now());
let batch = select_sync_batch(&mut state, 3, cooldown);
assert!(!state.order.contains(&peer_id_from_byte(2)));
assert_eq!(state.order.len(), 2, "order should shrink by 1");
assert_eq!(batch, vec![peer_id_from_byte(1), peer_id_from_byte(3)]);
assert!(state.is_cycle_complete());
}
#[test]
fn scenario_41_cycle_always_terminates() {
let peer_count: u8 = 10;
let peers: Vec<PeerId> = (1..=peer_count).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
let cooldown = Duration::from_secs(3600);
for i in 1..=peer_count {
state
.last_sync_times
.insert(peer_id_from_byte(i), Instant::now());
}
let batch = select_sync_batch(&mut state, 4, cooldown);
assert!(
batch.is_empty(),
"all peers on cooldown — batch must be empty"
);
assert!(state.order.is_empty(), "all peers should have been removed");
assert!(
state.is_cycle_complete(),
"cycle must terminate when all peers are removed"
);
}
#[test]
fn consecutive_rounds_advance_through_full_cycle() {
let peers: Vec<PeerId> = (1..=6).map(peer_id_from_byte).collect();
let mut state = NeighborSyncState::new_cycle(peers);
let batch_size = 2;
let no_cooldown = Duration::from_secs(0);
let round1 = select_sync_batch(&mut state, batch_size, no_cooldown);
assert_eq!(round1, vec![peer_id_from_byte(1), peer_id_from_byte(2)]);
assert_eq!(state.cursor, 2);
assert!(!state.is_cycle_complete());
let round2 = select_sync_batch(&mut state, batch_size, no_cooldown);
assert_eq!(round2, vec![peer_id_from_byte(3), peer_id_from_byte(4)]);
assert_eq!(state.cursor, 4);
assert!(!state.is_cycle_complete());
let round3 = select_sync_batch(&mut state, batch_size, no_cooldown);
assert_eq!(round3, vec![peer_id_from_byte(5), peer_id_from_byte(6)]);
assert_eq!(state.cursor, 6);
assert!(state.is_cycle_complete());
let round4 = select_sync_batch(&mut state, batch_size, no_cooldown);
assert!(round4.is_empty());
}
#[test]
fn scenario_37_non_local_rt_inbound_sync_drops_hints() {
let sender = peer_id_from_byte(0x37);
let outbound_replica_hints = vec![[0x01; 32], [0x02; 32]];
let outbound_paid_hints = vec![[0x03; 32]];
let response = NeighborSyncResponse {
replica_hints: outbound_replica_hints.clone(),
paid_hints: outbound_paid_hints.clone(),
bootstrapping: false,
rejected_keys: Vec::new(),
};
let inbound_replica_hints = vec![[0xA0; 32], [0xA1; 32]];
let inbound_paid_hints = vec![[0xB0; 32]];
let sender_in_rt = false;
let mut sync_history: HashMap<PeerId, PeerSyncRecord> = HashMap::new();
assert_eq!(
response.replica_hints, outbound_replica_hints,
"outbound replica hints must be sent even when sender is not in LocalRT"
);
assert_eq!(
response.paid_hints, outbound_paid_hints,
"outbound paid hints must be sent even when sender is not in LocalRT"
);
if !sender_in_rt {
let admitted_replica_keys: Vec<[u8; 32]> = Vec::new();
let admitted_paid_keys: Vec<[u8; 32]> = Vec::new();
for key in &inbound_replica_hints {
assert!(
!admitted_replica_keys.contains(key),
"inbound replica hints must NOT be admitted from non-RT sender"
);
}
for key in &inbound_paid_hints {
assert!(
!admitted_paid_keys.contains(key),
"inbound paid hints must NOT be admitted from non-RT sender"
);
}
assert!(
!sync_history.contains_key(&sender),
"sync history must NOT be updated for non-LocalRT sender"
);
}
let sender_in_rt = true;
assert!(
sender_in_rt,
"when sender is in LocalRT, inbound hints are processed"
);
sync_history.insert(
sender,
PeerSyncRecord {
last_sync: Some(Instant::now()),
cycles_since_sync: 0,
},
);
assert!(
sync_history.contains_key(&sender),
"sync history should be updated for LocalRT sender"
);
assert!(
sync_history
.get(&sender)
.expect("sender in history")
.last_sync
.is_some(),
"last_sync should be recorded for RT sender"
);
}
#[test]
fn cycle_completion_resets_cursor_but_keeps_sync_times() {
let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)];
let mut state = NeighborSyncState::new_cycle(peers);
let _ = select_sync_batch(&mut state, 2, Duration::from_secs(0));
record_successful_sync(&mut state, &peer_id_from_byte(1));
record_successful_sync(&mut state, &peer_id_from_byte(2));
assert!(state.is_cycle_complete());
let old_sync_times = state.last_sync_times.clone();
assert_eq!(old_sync_times.len(), 2);
let new_peers = vec![
peer_id_from_byte(1),
peer_id_from_byte(2),
peer_id_from_byte(3),
];
let mut new_state = NeighborSyncState::new_cycle(new_peers);
new_state.last_sync_times = old_sync_times;
assert_eq!(new_state.cursor, 0);
assert!(!new_state.is_cycle_complete());
assert_eq!(new_state.last_sync_times.len(), 2);
assert!(new_state
.last_sync_times
.contains_key(&peer_id_from_byte(1)));
assert!(new_state
.last_sync_times
.contains_key(&peer_id_from_byte(2)));
let cooldown = Duration::from_secs(3600);
let batch = select_sync_batch(&mut new_state, 3, cooldown);
assert_eq!(
batch,
std::iter::once(peer_id_from_byte(3)).collect::<Vec<_>>(),
"only the new peer should be selected; old peers are on cooldown"
);
}
}