use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::logging::{debug, warn};
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
use crate::ant_protocol::XorName;
use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
use crate::replication::protocol::{
ReplicationMessage, ReplicationMessageBody, VerificationRequest, VerificationResponse,
};
use crate::replication::types::{KeyVerificationEvidence, PaidListEvidence, PresenceEvidence};
#[derive(Debug)]
pub struct VerificationTargets {
pub quorum_targets: HashMap<XorName, Vec<PeerId>>,
pub paid_targets: HashMap<XorName, Vec<PeerId>>,
pub all_peers: HashSet<PeerId>,
pub peer_to_keys: HashMap<PeerId, Vec<XorName>>,
pub peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>>,
}
pub async fn compute_verification_targets(
keys: &[XorName],
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
self_id: &PeerId,
) -> VerificationTargets {
let dht = p2p_node.dht_manager();
let mut targets = VerificationTargets {
quorum_targets: HashMap::new(),
paid_targets: HashMap::new(),
all_peers: HashSet::new(),
peer_to_keys: HashMap::new(),
peer_to_paid_keys: HashMap::new(),
};
for &key in keys {
let closest = dht
.find_closest_nodes_local(&key, config.close_group_size)
.await;
let quorum_peers: Vec<PeerId> = closest
.iter()
.filter(|n| n.peer_id != *self_id)
.map(|n| n.peer_id)
.collect();
let paid_closest = dht
.find_closest_nodes_local_with_self(&key, config.paid_list_close_group_size)
.await;
let paid_peers: Vec<PeerId> = paid_closest
.iter()
.filter(|n| n.peer_id != *self_id)
.map(|n| n.peer_id)
.collect();
for &peer in &quorum_peers {
targets.all_peers.insert(peer);
targets.peer_to_keys.entry(peer).or_default().push(key);
}
for &peer in &paid_peers {
targets.all_peers.insert(peer);
targets.peer_to_keys.entry(peer).or_default().push(key);
targets
.peer_to_paid_keys
.entry(peer)
.or_default()
.insert(key);
}
targets.quorum_targets.insert(key, quorum_peers);
targets.paid_targets.insert(key, paid_peers);
}
for keys_list in targets.peer_to_keys.values_mut() {
keys_list.sort_unstable();
keys_list.dedup();
}
targets
}
#[derive(Debug, Clone)]
pub enum KeyVerificationOutcome {
QuorumVerified {
sources: Vec<PeerId>,
},
PaidListVerified {
sources: Vec<PeerId>,
},
QuorumFailed,
QuorumInconclusive,
}
#[must_use]
pub fn evaluate_key_evidence(
key: &XorName,
evidence: &KeyVerificationEvidence,
targets: &VerificationTargets,
config: &ReplicationConfig,
) -> KeyVerificationOutcome {
let quorum_peers = targets
.quorum_targets
.get(key)
.map_or(&[][..], Vec::as_slice);
let mut presence_positive = 0usize;
let mut presence_unresolved = 0usize;
let mut present_peers = Vec::new();
for peer in quorum_peers {
match evidence.presence.get(peer) {
Some(PresenceEvidence::Present) => {
presence_positive += 1;
present_peers.push(*peer);
}
Some(PresenceEvidence::Absent) => {}
Some(PresenceEvidence::Unresolved) | None => {
presence_unresolved += 1;
}
}
}
let paid_peers = targets.paid_targets.get(key).map_or(&[][..], Vec::as_slice);
for peer in paid_peers {
if matches!(evidence.presence.get(peer), Some(PresenceEvidence::Present))
&& !present_peers.contains(peer)
{
present_peers.push(*peer);
}
}
let mut paid_confirmed = 0usize;
let mut paid_unresolved = 0usize;
for peer in paid_peers {
match evidence.paid_list.get(peer) {
Some(PaidListEvidence::Confirmed) => paid_confirmed += 1,
Some(PaidListEvidence::NotFound) => {}
Some(PaidListEvidence::Unresolved) | None => paid_unresolved += 1,
}
}
let quorum_needed = config.quorum_needed(quorum_peers.len());
let paid_group_size = paid_peers.len();
let confirm_needed = ReplicationConfig::confirm_needed(paid_group_size);
if quorum_needed > 0 && presence_positive >= quorum_needed {
return KeyVerificationOutcome::QuorumVerified {
sources: present_peers,
};
}
if paid_group_size > 0 && paid_confirmed >= confirm_needed {
return KeyVerificationOutcome::PaidListVerified {
sources: present_peers,
};
}
let paid_possible = paid_group_size > 0 && paid_confirmed + paid_unresolved >= confirm_needed;
let quorum_possible =
quorum_needed > 0 && presence_positive + presence_unresolved >= quorum_needed;
if !paid_possible && !quorum_possible {
return KeyVerificationOutcome::QuorumFailed;
}
KeyVerificationOutcome::QuorumInconclusive
}
pub async fn run_verification_round(
keys: &[XorName],
targets: &VerificationTargets,
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
) -> HashMap<XorName, KeyVerificationEvidence> {
let mut evidence: HashMap<XorName, KeyVerificationEvidence> = keys
.iter()
.map(|&k| {
(
k,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
)
})
.collect();
let mut handles = Vec::new();
for (&peer, peer_keys) in &targets.peer_to_keys {
let paid_check_keys = targets.peer_to_paid_keys.get(&peer);
let mut paid_indices = Vec::new();
for (i, key) in peer_keys.iter().enumerate() {
if let Some(paid_keys) = paid_check_keys {
if paid_keys.contains(key) {
if let Ok(idx) = u32::try_from(i) {
paid_indices.push(idx);
}
}
}
}
let request = VerificationRequest {
keys: peer_keys.clone(),
paid_list_check_indices: paid_indices,
};
let msg = ReplicationMessage {
request_id: rand::random(),
body: ReplicationMessageBody::VerificationRequest(request),
};
let p2p = Arc::clone(p2p_node);
let timeout = config.verification_request_timeout;
let peer_id = peer;
handles.push(tokio::spawn(async move {
let encoded = match msg.encode() {
Ok(data) => data,
Err(e) => {
warn!("Failed to encode verification request: {e}");
return (peer_id, None);
}
};
match p2p
.send_request(&peer_id, REPLICATION_PROTOCOL_ID, encoded, timeout)
.await
{
Ok(response) => match ReplicationMessage::decode(&response.data) {
Ok(decoded) => (peer_id, Some(decoded)),
Err(e) => {
warn!("Failed to decode verification response from {peer_id}: {e}");
(peer_id, None)
}
},
Err(e) => {
debug!("Verification request to {peer_id} failed: {e}");
(peer_id, None)
}
}
}));
}
for handle in handles {
let (peer, response) = match handle.await {
Ok(result) => result,
Err(e) => {
warn!("Verification task panicked: {e}");
continue;
}
};
let Some(msg) = response else {
mark_peer_unresolved(&peer, targets, &mut evidence);
continue;
};
if let ReplicationMessageBody::VerificationResponse(resp) = msg.body {
process_verification_response(&peer, &resp, targets, &mut evidence);
}
}
evidence
}
fn mark_peer_unresolved(
peer: &PeerId,
targets: &VerificationTargets,
evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
) {
if let Some(peer_keys) = targets.peer_to_keys.get(peer) {
let is_paid_peer = targets.peer_to_paid_keys.get(peer);
for key in peer_keys {
if let Some(ev) = evidence.get_mut(key) {
ev.presence.insert(*peer, PresenceEvidence::Unresolved);
if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
ev.paid_list.insert(*peer, PaidListEvidence::Unresolved);
}
}
}
}
}
fn process_verification_response(
peer: &PeerId,
response: &VerificationResponse,
targets: &VerificationTargets,
evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
) {
let Some(peer_keys) = targets.peer_to_keys.get(peer) else {
return;
};
let peer_keys_set: HashSet<&XorName> = peer_keys.iter().collect();
let max_results = peer_keys.len().saturating_mul(2);
let results = if response.results.len() > max_results {
warn!(
"Peer {peer} sent {} verification results but only {} keys were requested — truncating",
response.results.len(),
peer_keys.len(),
);
&response.results[..max_results]
} else {
&response.results
};
for result in results {
if !peer_keys_set.contains(&result.key) {
continue; }
if let Some(ev) = evidence.get_mut(&result.key) {
let presence = if result.present {
PresenceEvidence::Present
} else {
PresenceEvidence::Absent
};
ev.presence.insert(*peer, presence);
if let Some(is_paid) = result.paid {
let paid = if is_paid {
PaidListEvidence::Confirmed
} else {
PaidListEvidence::NotFound
};
ev.paid_list.insert(*peer, paid);
}
}
}
let is_paid_peer = targets.peer_to_paid_keys.get(peer);
for key in peer_keys {
if let Some(ev) = evidence.get_mut(key) {
ev.presence
.entry(*peer)
.or_insert(PresenceEvidence::Unresolved);
if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
ev.paid_list
.entry(*peer)
.or_insert(PaidListEvidence::Unresolved);
}
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::replication::protocol::KeyVerificationResult;
fn peer_id_from_byte(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
fn xor_name_from_byte(b: u8) -> XorName {
[b; 32]
}
fn single_key_targets(
key: &XorName,
quorum_peers: Vec<PeerId>,
paid_peers: Vec<PeerId>,
) -> VerificationTargets {
let mut all_peers = HashSet::new();
let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
for &p in &quorum_peers {
all_peers.insert(p);
peer_to_keys.entry(p).or_default().push(*key);
}
for &p in &paid_peers {
all_peers.insert(p);
peer_to_keys.entry(p).or_default().push(*key);
peer_to_paid_keys.entry(p).or_default().insert(*key);
}
for keys_list in peer_to_keys.values_mut() {
keys_list.sort_unstable();
keys_list.dedup();
}
VerificationTargets {
quorum_targets: std::iter::once((key.to_owned(), quorum_peers)).collect(),
paid_targets: std::iter::once((key.to_owned(), paid_peers)).collect(),
all_peers,
peer_to_keys,
peer_to_paid_keys,
}
}
fn build_evidence(
presence: Vec<(PeerId, PresenceEvidence)>,
paid_list: Vec<(PeerId, PaidListEvidence)>,
) -> KeyVerificationEvidence {
KeyVerificationEvidence {
presence: presence.into_iter().collect(),
paid_list: paid_list.into_iter().collect(),
}
}
#[test]
fn quorum_verified_with_enough_present_responses() {
let key = xor_name_from_byte(0x10);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Present),
(quorum_peers[1], PresenceEvidence::Present),
(quorum_peers[2], PresenceEvidence::Present),
(quorum_peers[3], PresenceEvidence::Present),
(quorum_peers[4], PresenceEvidence::Absent),
(quorum_peers[5], PresenceEvidence::Absent),
(quorum_peers[6], PresenceEvidence::Absent),
],
vec![],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 4),
"expected QuorumVerified with 4 sources, got {outcome:?}"
);
}
#[test]
fn paid_list_verified_with_enough_confirmations() {
let key = xor_name_from_byte(0x20);
let config = ReplicationConfig::default();
let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Absent),
(quorum_peers[1], PresenceEvidence::Absent),
(quorum_peers[2], PresenceEvidence::Absent),
],
vec![
(paid_peers[0], PaidListEvidence::Confirmed),
(paid_peers[1], PaidListEvidence::Confirmed),
(paid_peers[2], PaidListEvidence::Confirmed),
(paid_peers[3], PaidListEvidence::NotFound),
(paid_peers[4], PaidListEvidence::NotFound),
],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
"expected PaidListVerified, got {outcome:?}"
);
}
#[test]
fn quorum_failed_when_both_paths_impossible() {
let key = xor_name_from_byte(0x30);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Absent),
(quorum_peers[1], PresenceEvidence::Absent),
(quorum_peers[2], PresenceEvidence::Absent),
(quorum_peers[3], PresenceEvidence::Absent),
(quorum_peers[4], PresenceEvidence::Absent),
],
vec![
(paid_peers[0], PaidListEvidence::NotFound),
(paid_peers[1], PaidListEvidence::NotFound),
(paid_peers[2], PaidListEvidence::NotFound),
],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumFailed),
"expected QuorumFailed, got {outcome:?}"
);
}
#[test]
fn quorum_inconclusive_with_unresolved_peers() {
let key = xor_name_from_byte(0x40);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Present),
(quorum_peers[1], PresenceEvidence::Present),
(quorum_peers[2], PresenceEvidence::Absent),
(quorum_peers[3], PresenceEvidence::Unresolved),
(quorum_peers[4], PresenceEvidence::Unresolved),
],
vec![
(paid_peers[0], PaidListEvidence::Confirmed),
(paid_peers[1], PaidListEvidence::Unresolved),
(paid_peers[2], PaidListEvidence::NotFound),
],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
"expected QuorumInconclusive, got {outcome:?}"
);
}
#[test]
fn quorum_verified_with_undersized_quorum_targets() {
let key = xor_name_from_byte(0x50);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=2).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Present),
(quorum_peers[1], PresenceEvidence::Present),
],
vec![],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
"expected QuorumVerified with 2 sources, got {outcome:?}"
);
}
#[test]
fn paid_list_verified_with_single_paid_peer() {
let key = xor_name_from_byte(0x60);
let config = ReplicationConfig::default();
let paid_peers = vec![peer_id_from_byte(10)];
let targets = single_key_targets(&key, vec![], paid_peers.clone());
let evidence = build_evidence(vec![], vec![(paid_peers[0], PaidListEvidence::Confirmed)]);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
"expected PaidListVerified with single peer, got {outcome:?}"
);
}
#[test]
fn quorum_fails_with_zero_targets_no_paid() {
let key = xor_name_from_byte(0x70);
let config = ReplicationConfig::default();
let targets = single_key_targets(&key, vec![], vec![]);
let evidence = build_evidence(vec![], vec![]);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumFailed),
"expected QuorumFailed with zero targets, got {outcome:?}"
);
}
#[test]
fn quorum_verified_beats_paid_list_when_both_satisfied() {
let key = xor_name_from_byte(0x80);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Present),
(quorum_peers[1], PresenceEvidence::Present),
(quorum_peers[2], PresenceEvidence::Present),
(quorum_peers[3], PresenceEvidence::Present),
(quorum_peers[4], PresenceEvidence::Present),
],
vec![
(paid_peers[0], PaidListEvidence::Confirmed),
(paid_peers[1], PaidListEvidence::Confirmed),
(paid_peers[2], PaidListEvidence::Confirmed),
],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumVerified { .. }),
"QuorumVerified should take precedence over PaidListVerified, got {outcome:?}"
);
}
#[test]
fn process_response_populates_evidence() {
let key = xor_name_from_byte(0x90);
let peer = peer_id_from_byte(1);
let targets = single_key_targets(&key, vec![peer], vec![peer]);
let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
key,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
))
.collect();
let response = VerificationResponse {
results: vec![KeyVerificationResult {
key,
present: true,
paid: Some(true),
}],
};
process_verification_response(&peer, &response, &targets, &mut evidence);
let ev = evidence.get(&key).expect("evidence for key");
assert_eq!(
ev.presence.get(&peer),
Some(&PresenceEvidence::Present),
"presence should be Present"
);
assert_eq!(
ev.paid_list.get(&peer),
Some(&PaidListEvidence::Confirmed),
"paid_list should be Confirmed"
);
}
#[test]
fn process_response_missing_key_gets_unresolved() {
let key = xor_name_from_byte(0xA0);
let peer = peer_id_from_byte(2);
let targets = single_key_targets(&key, vec![peer], vec![peer]);
let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
key,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
))
.collect();
let response = VerificationResponse { results: vec![] };
process_verification_response(&peer, &response, &targets, &mut evidence);
let ev = evidence.get(&key).expect("evidence for key");
assert_eq!(
ev.presence.get(&peer),
Some(&PresenceEvidence::Unresolved),
"missing key in response should be Unresolved"
);
assert_eq!(
ev.paid_list.get(&peer),
Some(&PaidListEvidence::Unresolved),
"missing paid key in response should be Unresolved"
);
}
#[test]
fn process_response_ignores_unsolicited_keys() {
let key = xor_name_from_byte(0xB0);
let unsolicited_key = xor_name_from_byte(0xB1);
let peer = peer_id_from_byte(3);
let targets = single_key_targets(&key, vec![peer], vec![]);
let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
key,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
))
.collect();
let response = VerificationResponse {
results: vec![
KeyVerificationResult {
key: unsolicited_key,
present: true,
paid: None,
},
KeyVerificationResult {
key,
present: false,
paid: None,
},
],
};
process_verification_response(&peer, &response, &targets, &mut evidence);
assert!(
!evidence.contains_key(&unsolicited_key),
"unsolicited key should not be in evidence"
);
let ev = evidence.get(&key).expect("evidence for key");
assert_eq!(
ev.presence.get(&peer),
Some(&PresenceEvidence::Absent),
"solicited key should have Absent"
);
}
#[test]
fn mark_unresolved_sets_all_keys_for_peer() {
let key_a = xor_name_from_byte(0xC0);
let key_b = xor_name_from_byte(0xC1);
let peer = peer_id_from_byte(5);
let targets = VerificationTargets {
quorum_targets: std::iter::once((key_a, vec![peer])).collect(),
paid_targets: std::iter::once((key_b, vec![peer])).collect(),
all_peers: std::iter::once(peer).collect(),
peer_to_keys: std::iter::once((peer, vec![key_a, key_b])).collect(),
peer_to_paid_keys: std::iter::once((peer, std::iter::once(key_b).collect())).collect(),
};
let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
(
key_a,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
),
(
key_b,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
),
]
.into_iter()
.collect();
mark_peer_unresolved(&peer, &targets, &mut evidence);
let ev_a = evidence.get(&key_a).expect("evidence for key_a");
assert_eq!(
ev_a.presence.get(&peer),
Some(&PresenceEvidence::Unresolved)
);
assert!(!ev_a.paid_list.contains_key(&peer));
let ev_b = evidence.get(&key_b).expect("evidence for key_b");
assert_eq!(
ev_b.presence.get(&peer),
Some(&PresenceEvidence::Unresolved)
);
assert_eq!(
ev_b.paid_list.get(&peer),
Some(&PaidListEvidence::Unresolved)
);
}
#[test]
fn scenario_4_quorum_fail_transitions_to_abandoned() {
let key = xor_name_from_byte(0xD0);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
quorum_peers
.iter()
.map(|p| (*p, PresenceEvidence::Absent))
.collect(),
paid_peers
.iter()
.map(|p| (*p, PaidListEvidence::NotFound))
.collect(),
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumFailed),
"all-Absent with no paid confirmations should yield QuorumFailed, got {outcome:?}"
);
}
#[test]
fn scenario_16_timeout_yields_inconclusive() {
let key = xor_name_from_byte(0xD1);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
quorum_peers
.iter()
.map(|p| (*p, PresenceEvidence::Unresolved))
.collect(),
paid_peers
.iter()
.map(|p| (*p, PaidListEvidence::Unresolved))
.collect(),
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
"all-unresolved should yield QuorumInconclusive, got {outcome:?}"
);
}
#[test]
fn scenario_27_single_round_collects_both_presence_and_paid() {
let key = xor_name_from_byte(0xD2);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Present),
(quorum_peers[1], PresenceEvidence::Absent),
(quorum_peers[2], PresenceEvidence::Absent),
(quorum_peers[3], PresenceEvidence::Absent),
(quorum_peers[4], PresenceEvidence::Absent),
(quorum_peers[5], PresenceEvidence::Absent),
(quorum_peers[6], PresenceEvidence::Absent),
],
vec![
(paid_peers[0], PaidListEvidence::Confirmed),
(paid_peers[1], PaidListEvidence::Confirmed),
(paid_peers[2], PaidListEvidence::Confirmed),
(paid_peers[3], PaidListEvidence::NotFound),
(paid_peers[4], PaidListEvidence::NotFound),
],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
"paid-list majority should trigger PaidListVerified when quorum fails, got {outcome:?}"
);
}
#[test]
fn scenario_28_dynamic_threshold_with_3_targets() {
let key = xor_name_from_byte(0xD3);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
assert_eq!(config.quorum_needed(3), 2, "quorum_needed(3) should be 2");
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Present),
(quorum_peers[1], PresenceEvidence::Present),
(quorum_peers[2], PresenceEvidence::Absent),
],
vec![],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
"2 Present in 3-target set should QuorumVerify, got {outcome:?}"
);
}
fn two_key_targets(
key_a: &XorName,
key_b: &XorName,
quorum_peers_a: Vec<PeerId>,
quorum_peers_b: Vec<PeerId>,
paid_peers_a: Vec<PeerId>,
paid_peers_b: Vec<PeerId>,
) -> VerificationTargets {
let mut all_peers = HashSet::new();
let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
for &p in &quorum_peers_a {
all_peers.insert(p);
peer_to_keys.entry(p).or_default().push(*key_a);
}
for &p in &quorum_peers_b {
all_peers.insert(p);
peer_to_keys.entry(p).or_default().push(*key_b);
}
for &p in &paid_peers_a {
all_peers.insert(p);
peer_to_keys.entry(p).or_default().push(*key_a);
peer_to_paid_keys.entry(p).or_default().insert(*key_a);
}
for &p in &paid_peers_b {
all_peers.insert(p);
peer_to_keys.entry(p).or_default().push(*key_b);
peer_to_paid_keys.entry(p).or_default().insert(*key_b);
}
for keys_list in peer_to_keys.values_mut() {
keys_list.sort_unstable();
keys_list.dedup();
}
let mut quorum_targets = HashMap::new();
quorum_targets.insert(*key_a, quorum_peers_a);
quorum_targets.insert(*key_b, quorum_peers_b);
let mut paid_targets = HashMap::new();
paid_targets.insert(*key_a, paid_peers_a);
paid_targets.insert(*key_b, paid_peers_b);
VerificationTargets {
quorum_targets,
paid_targets,
all_peers,
peer_to_keys,
peer_to_paid_keys,
}
}
#[test]
fn scenario_33_batched_response_per_key_evidence() {
let key_a = xor_name_from_byte(0xD4);
let key_b = xor_name_from_byte(0xD5);
let peer = peer_id_from_byte(1);
let targets = two_key_targets(
&key_a,
&key_b,
vec![peer],
vec![peer],
vec![peer],
vec![peer],
);
let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
(
key_a,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
),
(
key_b,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
),
]
.into_iter()
.collect();
let response = VerificationResponse {
results: vec![
KeyVerificationResult {
key: key_a,
present: true,
paid: Some(true),
},
KeyVerificationResult {
key: key_b,
present: false,
paid: Some(false),
},
],
};
process_verification_response(&peer, &response, &targets, &mut evidence);
let ev_a = evidence.get(&key_a).expect("evidence for key_a");
assert_eq!(ev_a.presence.get(&peer), Some(&PresenceEvidence::Present));
assert_eq!(
ev_a.paid_list.get(&peer),
Some(&PaidListEvidence::Confirmed)
);
let ev_b = evidence.get(&key_b).expect("evidence for key_b");
assert_eq!(ev_b.presence.get(&peer), Some(&PresenceEvidence::Absent));
assert_eq!(ev_b.paid_list.get(&peer), Some(&PaidListEvidence::NotFound));
}
#[test]
fn scenario_34_partial_response_unresolved_per_key() {
let key_a = xor_name_from_byte(0xD6);
let key_b = xor_name_from_byte(0xD7);
let peer = peer_id_from_byte(2);
let targets = two_key_targets(&key_a, &key_b, vec![peer], vec![peer], vec![], vec![peer]);
let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
(
key_a,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
),
(
key_b,
KeyVerificationEvidence {
presence: HashMap::new(),
paid_list: HashMap::new(),
},
),
]
.into_iter()
.collect();
let response = VerificationResponse {
results: vec![KeyVerificationResult {
key: key_a,
present: true,
paid: None,
}],
};
process_verification_response(&peer, &response, &targets, &mut evidence);
let ev_a = evidence.get(&key_a).expect("evidence for key_a");
assert_eq!(
ev_a.presence.get(&peer),
Some(&PresenceEvidence::Present),
"key_a should have explicit Present"
);
let ev_b = evidence.get(&key_b).expect("evidence for key_b");
assert_eq!(
ev_b.presence.get(&peer),
Some(&PresenceEvidence::Unresolved),
"omitted key_b should get Unresolved presence"
);
assert_eq!(
ev_b.paid_list.get(&peer),
Some(&PaidListEvidence::Unresolved),
"omitted key_b (paid target) should get Unresolved paid_list"
);
}
#[test]
fn scenario_42_quorum_pass_derives_paid_list_auth() {
let key = xor_name_from_byte(0xD8);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (3..=5).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
vec![
(quorum_peers[0], PresenceEvidence::Present),
(quorum_peers[1], PresenceEvidence::Present),
(quorum_peers[2], PresenceEvidence::Present), (quorum_peers[3], PresenceEvidence::Present), (quorum_peers[4], PresenceEvidence::Absent), ],
vec![
(paid_peers[0], PaidListEvidence::NotFound),
(paid_peers[1], PaidListEvidence::NotFound),
(paid_peers[2], PaidListEvidence::NotFound),
],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
match outcome {
KeyVerificationOutcome::QuorumVerified { ref sources } => {
assert!(
sources.len() >= 4,
"QuorumVerified sources should contain at least the 4 quorum-positive peers, got {}",
sources.len()
);
assert!(
sources.contains(&quorum_peers[0]),
"source peer 1 should be in sources"
);
assert!(
sources.contains(&quorum_peers[1]),
"source peer 2 should be in sources"
);
}
other => panic!("expected QuorumVerified, got {other:?}"),
}
}
#[test]
fn scenario_44_cold_start_recovery_via_replica_majority() {
let key = xor_name_from_byte(0xD9);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=19).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let paid_evidence: Vec<(PeerId, PaidListEvidence)> = paid_peers
.iter()
.map(|p| (*p, PaidListEvidence::NotFound))
.collect();
let presence_evidence = vec![
(quorum_peers[0], PresenceEvidence::Present),
(quorum_peers[1], PresenceEvidence::Present),
(quorum_peers[2], PresenceEvidence::Present),
(quorum_peers[3], PresenceEvidence::Present),
(quorum_peers[4], PresenceEvidence::Present),
(quorum_peers[5], PresenceEvidence::Absent),
(quorum_peers[6], PresenceEvidence::Absent),
];
let evidence = build_evidence(presence_evidence, paid_evidence);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
match outcome {
KeyVerificationOutcome::QuorumVerified { ref sources } => {
assert!(
sources.len() >= 4,
"QuorumVerified should have >= 4 sources (the presence-positive peers), got {}",
sources.len()
);
for (i, peer) in quorum_peers.iter().enumerate().take(5) {
assert!(
sources.contains(peer),
"quorum_peer[{i}] responded Present and should be a fetch source"
);
}
assert!(
!sources.contains(&quorum_peers[5]),
"absent peer should not be a fetch source"
);
assert!(
!sources.contains(&quorum_peers[6]),
"absent peer should not be a fetch source"
);
}
other => panic!(
"Cold-start recovery should succeed via replica majority \
(QuorumVerified), got {other:?}"
),
}
}
#[test]
fn scenario_20_paid_list_local_hit_bypasses_presence_quorum() {
let key = xor_name_from_byte(0xE0);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
quorum_peers
.iter()
.map(|p| (*p, PresenceEvidence::Absent))
.collect(),
vec![
(paid_peers[0], PaidListEvidence::Confirmed),
(paid_peers[1], PaidListEvidence::Confirmed),
(paid_peers[2], PaidListEvidence::Confirmed),
(paid_peers[3], PaidListEvidence::NotFound),
(paid_peers[4], PaidListEvidence::NotFound),
],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
"paid-list majority should bypass failed presence quorum, got {outcome:?}"
);
}
#[test]
fn scenario_22_paid_list_rejection_below_threshold() {
let key = xor_name_from_byte(0xE2);
let config = ReplicationConfig::default();
let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
let evidence = build_evidence(
quorum_peers
.iter()
.map(|p| (*p, PresenceEvidence::Absent))
.collect(),
vec![
(paid_peers[0], PaidListEvidence::Confirmed),
(paid_peers[1], PaidListEvidence::Confirmed),
(paid_peers[2], PaidListEvidence::NotFound),
(paid_peers[3], PaidListEvidence::NotFound),
(paid_peers[4], PaidListEvidence::NotFound),
],
);
let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
assert!(
matches!(outcome, KeyVerificationOutcome::QuorumFailed),
"below-threshold paid confirmations with all-Absent quorum should yield QuorumFailed, got {outcome:?}"
);
}
}