use std::collections::HashSet;
use std::sync::Arc;
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
use crate::ant_protocol::XorName;
use crate::replication::config::ReplicationConfig;
use crate::replication::paid_list::PaidList;
use crate::storage::LmdbStorage;
#[derive(Debug)]
pub struct AdmissionResult {
pub replica_keys: Vec<XorName>,
pub paid_only_keys: Vec<XorName>,
pub rejected_keys: Vec<XorName>,
}
pub async fn is_responsible(
self_id: &PeerId,
key: &XorName,
p2p_node: &Arc<P2PNode>,
close_group_size: usize,
) -> bool {
let closest = p2p_node
.dht_manager()
.find_closest_nodes_local_with_self(key, close_group_size)
.await;
closest.iter().any(|n| n.peer_id == *self_id)
}
pub async fn is_in_paid_close_group(
self_id: &PeerId,
key: &XorName,
p2p_node: &Arc<P2PNode>,
paid_list_close_group_size: usize,
) -> bool {
let closest = p2p_node
.dht_manager()
.find_closest_nodes_local_with_self(key, paid_list_close_group_size)
.await;
closest.iter().any(|n| n.peer_id == *self_id)
}
#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
pub async fn admit_hints(
self_id: &PeerId,
replica_hints: &[XorName],
paid_hints: &[XorName],
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
storage: &Arc<LmdbStorage>,
paid_list: &Arc<PaidList>,
pending_keys: &HashSet<XorName>,
) -> AdmissionResult {
let mut result = AdmissionResult {
replica_keys: Vec::new(),
paid_only_keys: Vec::new(),
rejected_keys: Vec::new(),
};
let mut seen = HashSet::new();
for &key in replica_hints {
if !seen.insert(key) {
continue;
}
let already_local = storage.exists(&key).unwrap_or(false);
let already_pending = pending_keys.contains(&key);
if already_local || already_pending {
result.replica_keys.push(key);
continue;
}
if is_responsible(self_id, &key, p2p_node, config.close_group_size).await {
result.replica_keys.push(key);
} else {
result.rejected_keys.push(key);
}
}
for &key in paid_hints {
if !seen.insert(key) {
continue;
}
let already_paid = paid_list.contains(&key).unwrap_or(false);
if already_paid {
result.paid_only_keys.push(key);
continue;
}
if is_in_paid_close_group(self_id, &key, p2p_node, config.paid_list_close_group_size).await
{
result.paid_only_keys.push(key);
} else {
result.rejected_keys.push(key);
}
}
result
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::client::xor_distance;
use crate::replication::config::ReplicationConfig;
fn peer_id_from_byte(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
fn xor_name_from_byte(b: u8) -> XorName {
[b; 32]
}
#[test]
fn cross_set_precedence_replica_wins() {
let key = xor_name_from_byte(0xAA);
let replica_set: HashSet<XorName> = std::iter::once(key).collect();
assert!(
replica_set.contains(&key),
"paid-hint key present in replica set should be skipped"
);
}
#[test]
fn deduplication_within_replica_hints() {
let key_a = xor_name_from_byte(0x01);
let key_b = xor_name_from_byte(0x02);
let hints = vec![key_a, key_b, key_a, key_a, key_b];
let mut seen = HashSet::new();
let mut unique = Vec::new();
for &key in &hints {
if seen.insert(key) {
unique.push(key);
}
}
assert_eq!(unique.len(), 2);
assert_eq!(unique[0], key_a);
assert_eq!(unique[1], key_b);
}
#[test]
fn deduplication_across_sets() {
let key = xor_name_from_byte(0xFF);
let replica_hints = vec![key];
let paid_hints = vec![key];
let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
let mut seen: HashSet<XorName> = HashSet::new();
for &k in &replica_hints {
seen.insert(k);
}
let mut paid_admitted = Vec::new();
for &k in &paid_hints {
if !seen.insert(k) {
continue; }
if replica_set.contains(&k) {
continue; }
paid_admitted.push(k);
}
assert!(
paid_admitted.is_empty(),
"paid-hint should be suppressed when key is also a replica hint"
);
}
#[test]
fn admission_result_empty_inputs() {
let result = AdmissionResult {
replica_keys: Vec::new(),
paid_only_keys: Vec::new(),
rejected_keys: Vec::new(),
};
assert!(result.replica_keys.is_empty());
assert!(result.paid_only_keys.is_empty());
assert!(result.rejected_keys.is_empty());
}
#[test]
fn out_of_range_keys_rejected_by_distance() {
let _self_id = peer_id_from_byte(0x00);
let key = xor_name_from_byte(0xFF);
let _config = ReplicationConfig::default();
let self_xor: XorName = [0u8; 32];
let dist = xor_distance(&self_xor, &key);
assert_eq!(dist[0], 0xFF, "distance first byte should be 0xFF");
let close_key = xor_name_from_byte(0x01);
let close_dist = xor_distance(&self_xor, &close_key);
assert_eq!(
close_dist[0], 0x01,
"close distance first byte should be 0x01"
);
assert!(
dist > close_dist,
"far key should have greater distance than close key"
);
}
#[test]
fn config_close_group_sizes_are_valid() {
let config = ReplicationConfig::default();
assert!(
config.close_group_size > 0,
"close_group_size must be positive"
);
assert!(
config.paid_list_close_group_size > 0,
"paid_list_close_group_size must be positive"
);
assert!(
config.paid_list_close_group_size >= config.close_group_size,
"paid_list_close_group_size should be >= close_group_size"
);
}
#[test]
fn scenario_5_sender_does_not_grant_key_relevance() {
let key_pending = xor_name_from_byte(0xB0);
let key_not_pending = xor_name_from_byte(0xB1);
let key_paid_existing = xor_name_from_byte(0xB2);
let _sender = peer_id_from_byte(0x01);
let pending: HashSet<XorName> = std::iter::once(key_pending).collect();
let paid_set: HashSet<XorName> = std::iter::once(key_paid_existing).collect();
let replica_hints = [key_pending, key_not_pending];
let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
let mut seen = HashSet::new();
let mut admitted_replica = Vec::new();
let mut rejected = Vec::new();
for &key in &replica_hints {
if !seen.insert(key) {
continue; }
if pending.contains(&key) {
admitted_replica.push(key);
continue;
}
let is_responsible = false;
if is_responsible {
admitted_replica.push(key);
} else {
rejected.push(key);
}
}
let paid_hints = [key_paid_existing, key_pending]; let mut admitted_paid = Vec::new();
for &key in &paid_hints {
if !seen.insert(key) {
continue; }
if replica_set.contains(&key) {
continue; }
if paid_set.contains(&key) {
admitted_paid.push(key);
continue;
}
rejected.push(key);
}
assert_eq!(
admitted_replica,
vec![key_pending],
"only the pending key should be admitted as replica"
);
assert_eq!(
rejected,
vec![key_not_pending],
"non-pending, non-responsible key must be rejected"
);
assert_eq!(
admitted_paid,
vec![key_paid_existing],
"existing paid-list key should be admitted via fast path"
);
assert!(
!admitted_paid.contains(&key_pending),
"key in both hint sets must be processed as replica only"
);
}
#[test]
fn scenario_7_out_of_range_key_rejected() {
let self_xor: XorName = [0u8; 32];
let far_key = xor_name_from_byte(0xFF);
let close_key = xor_name_from_byte(0x01);
let far_dist = xor_distance(&self_xor, &far_key);
let close_dist = xor_distance(&self_xor, &close_key);
assert_eq!(far_dist[0], 0xFF, "far_key distance should be maximal");
assert_eq!(close_dist[0], 0x01, "close_key distance should be small");
assert!(far_dist > close_dist, "far key is further than close key");
let pending: HashSet<XorName> = HashSet::new();
let replica_hints = [far_key, close_key];
let mut seen = HashSet::new();
let mut admitted = Vec::new();
let mut rejected = Vec::new();
for &key in &replica_hints {
if !seen.insert(key) {
continue;
}
if pending.contains(&key) {
admitted.push(key);
continue;
}
let distance = xor_distance(&self_xor, &key);
let simulated_responsible = distance[0] < 0x80;
if simulated_responsible {
admitted.push(key);
} else {
rejected.push(key);
}
}
assert_eq!(
admitted,
vec![close_key],
"only close key should be admitted"
);
assert_eq!(
rejected,
vec![far_key],
"far key should be rejected regardless of quorum — it never enters verification"
);
let paid_hints = [far_key];
let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
let mut paid_admitted = Vec::new();
for &key in &paid_hints {
if !seen.insert(key) {
continue; }
if replica_set.contains(&key) {
continue; }
paid_admitted.push(key);
}
assert!(
paid_admitted.is_empty(),
"far key already processed as replica (and rejected) should not re-enter via paid hints"
);
}
}