use crate::data::client::peer_xor_distance;
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_protocol::evm::{Amount, PaymentQuote};
use ant_protocol::transport::{DHTNode, MultiAddr, P2PNode, PeerId, WitnessedCloseGroup};
use ant_protocol::{
compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
};
use futures::stream::{FuturesUnordered, StreamExt};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, warn};
const FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER: usize = 2;
const WITNESSED_QUORUM_NUMERATOR: usize = 2;
const WITNESSED_QUORUM_DENOMINATOR: usize = 3;
const SINGLE_NODE_WITNESSED_VIEW_COUNT: usize = 20;
const MEDIAN_QUOTE_INDEX: usize = CLOSE_GROUP_SIZE / 2;
const QUOTE_COLLECTION_TIMEOUT_SECS: u64 = 120;
const ML_DSA_PUB_KEY_LEN: usize = 1952;
fn quote_binding_is_valid(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
if quote.pub_key.len() != ML_DSA_PUB_KEY_LEN {
return false;
}
compute_address("e.pub_key) == *peer_id.as_bytes()
}
fn classify_quote_response(
peer_id: &PeerId,
quote_bytes: &[u8],
already_stored: bool,
) -> std::result::Result<(PaymentQuote, Amount), Error> {
let payment_quote = rmp_serde::from_slice::<PaymentQuote>(quote_bytes).map_err(|e| {
Error::Serialization(format!("Failed to deserialize quote from {peer_id}: {e}"))
})?;
if !quote_binding_is_valid(peer_id, &payment_quote) {
let derived = compute_address(&payment_quote.pub_key);
warn!(
"Dropping response from {peer_id} — quote.pub_key BLAKE3 mismatch \
(peer is signing quotes with another peer's key); the storer \
would reject this proof"
);
return Err(Error::BadQuoteBinding {
peer_id: peer_id.to_string(),
detail: format!(
"BLAKE3(pub_key)={} pub_key_len={}",
hex::encode(derived),
payment_quote.pub_key.len(),
),
});
}
if already_stored {
debug!("Peer {peer_id} already has chunk");
return Err(Error::AlreadyStored);
}
let price = payment_quote.price;
debug!("Received quote from {peer_id}: price = {price}");
Ok((payment_quote, price))
}
fn drop_quotes_with_bad_bindings(
quotes: &mut Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>,
) -> usize {
let before = quotes.len();
quotes.retain(|(peer_id, _, quote, _)| {
if quote_binding_is_valid(peer_id, quote) {
true
} else {
warn!(
"Dropping quote from peer {peer_id} — quote.pub_key BLAKE3 mismatch \
(peer is signing quotes with another peer's key); the storer would \
reject this proof"
);
false
}
});
before - quotes.len()
}
#[allow(clippy::too_many_arguments)]
async fn request_store_quote_from_peer(
node: Arc<P2PNode>,
peer_id: PeerId,
peer_addrs: Vec<MultiAddr>,
request_id: u64,
address: [u8; 32],
data_size: u64,
data_type: u32,
per_peer_timeout: Duration,
) -> StoreQuoteRequestResult {
let request = ChunkQuoteRequest {
address,
data_size,
data_type,
};
let message = ChunkMessage {
request_id,
body: ChunkMessageBody::QuoteRequest(request),
};
let message_bytes = match message.encode() {
Ok(bytes) => bytes,
Err(e) => {
return (
peer_id,
peer_addrs,
Err(Error::Protocol(format!(
"Failed to encode quote request for {peer_id}: {e}"
))),
);
}
};
let result = send_and_await_chunk_response(
&node,
&peer_id,
message_bytes,
request_id,
per_peer_timeout,
&peer_addrs,
|body| match body {
ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
quote,
already_stored,
}) => Some(classify_quote_response(&peer_id, "e, already_stored)),
ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
Error::Protocol(format!("Quote error from {peer_id}: {e}")),
)),
_ => None,
},
|e| Error::Network(format!("Failed to send quote request to {peer_id}: {e}")),
|| Error::Timeout(format!("Timeout waiting for quote from {peer_id}")),
)
.await;
(peer_id, peer_addrs, result)
}
#[allow(clippy::too_many_arguments)]
fn record_store_quote_result(
peer_id: PeerId,
addrs: Vec<MultiAddr>,
quote_result: Result<(PaymentQuote, Amount)>,
address: &[u8; 32],
quotes: &mut Vec<StoreQuote>,
already_stored_peers: &mut Vec<(PeerId, [u8; 32])>,
failures: &mut Vec<String>,
bad_quote_count: &mut usize,
) {
match quote_result {
Ok((quote, price)) => {
quotes.push((peer_id, addrs, quote, price));
}
Err(Error::AlreadyStored) => {
info!("Peer {peer_id} reports chunk already stored");
let dist = peer_xor_distance(&peer_id, address);
already_stored_peers.push((peer_id, dist));
}
Err(e) => {
if matches!(&e, Error::BadQuoteBinding { .. }) {
*bad_quote_count += 1;
}
warn!("Failed to get quote from {peer_id}: {e}");
failures.push(format!("{peer_id}: {e}"));
}
}
}
fn witnessed_quote_launch_budget(
successful_quotes: usize,
in_flight: usize,
remaining_peers: usize,
) -> usize {
CLOSE_GROUP_SIZE
.saturating_sub(successful_quotes.saturating_add(in_flight))
.min(remaining_peers)
}
fn single_node_quote_query_count() -> usize {
CLOSE_GROUP_SIZE
}
fn fault_tolerant_quote_query_count() -> usize {
CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
}
fn witnessed_close_group_quorum() -> usize {
(CLOSE_GROUP_SIZE * WITNESSED_QUORUM_NUMERATOR).div_ceil(WITNESSED_QUORUM_DENOMINATOR)
}
fn witnessed_close_group_quorum_for_missing_views(missing_views: usize) -> usize {
witnessed_close_group_quorum()
.saturating_sub(missing_views)
.max(1)
}
fn missing_witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> usize {
witnessed
.initial_closest
.len()
.saturating_sub(witnessed.responder_views.len())
}
fn witnessed_close_group_quorum_for_transcript(witnessed: &WitnessedCloseGroup) -> usize {
witnessed_close_group_quorum_for_missing_views(missing_witnessed_responder_views(witnessed))
}
fn peer_list(peers: &[PeerId]) -> Vec<String> {
peers.iter().map(ToString::to_string).collect()
}
pub(crate) type StoreQuote = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
type StoreQuoteRequestResult = (PeerId, Vec<MultiAddr>, Result<(PaymentQuote, Amount)>);
type VotersByPeer = HashMap<PeerId, HashSet<PeerId>>;
type WitnessedVoteData = (HashMap<PeerId, DHTNode>, VotersByPeer, Vec<(PeerId, usize)>);
pub(crate) struct StoreQuotePlan {
pub(crate) quotes: Vec<StoreQuote>,
pub(crate) put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
}
#[derive(Debug, Clone)]
struct WitnessedQuoteCandidate {
node: DHTNode,
votes: usize,
voters: HashSet<PeerId>,
}
#[derive(Debug, Clone)]
struct WitnessedQuotePeer {
peer_id: PeerId,
addrs: Vec<MultiAddr>,
voters: HashSet<PeerId>,
}
#[derive(Debug, Clone)]
struct WitnessedQuoteSelection {
quote_peers: Vec<WitnessedQuotePeer>,
initial_put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
quorum: usize,
}
enum QuoteSelectionPolicy {
ClosestByDistance,
WitnessedMedianVoters {
voters_by_peer: VotersByPeer,
quorum: usize,
},
}
fn witnessed_initial_peers(witnessed: &WitnessedCloseGroup) -> Vec<String> {
witnessed
.initial_closest
.iter()
.map(|node| node.peer_id.to_string())
.collect()
}
fn witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> Vec<String> {
witnessed
.responder_views
.iter()
.map(|view| {
let peers = view
.closest
.iter()
.map(|node| node.peer_id)
.collect::<Vec<_>>();
format!("{}=>{:?}", view.responder, peer_list(&peers))
})
.collect()
}
fn merge_witnessed_node(nodes: &mut HashMap<PeerId, DHTNode>, node: DHTNode) {
match nodes.entry(node.peer_id) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().merge_from(node);
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(node);
}
}
}
fn sort_vote_counts_by_distance(vote_counts: &mut [(PeerId, usize)], address: &[u8; 32]) {
vote_counts.sort_by(|left, right| {
peer_xor_distance(&left.0, address)
.cmp(&peer_xor_distance(&right.0, address))
.then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
});
}
fn witnessed_vote_counts_and_nodes(
witnessed: &WitnessedCloseGroup,
address: &[u8; 32],
) -> WitnessedVoteData {
let mut known_nodes = HashMap::new();
for node in &witnessed.initial_closest {
merge_witnessed_node(&mut known_nodes, node.clone());
}
let mut voters_by_peer: HashMap<PeerId, HashSet<PeerId>> = HashMap::new();
for view in &witnessed.responder_views {
let mut voted = HashSet::new();
for node in &view.closest {
merge_witnessed_node(&mut known_nodes, node.clone());
if voted.insert(node.peer_id) {
voters_by_peer
.entry(node.peer_id)
.or_default()
.insert(view.responder);
}
}
}
let mut vote_counts: Vec<(PeerId, usize)> = voters_by_peer
.iter()
.map(|(peer_id, voters)| (*peer_id, voters.len()))
.collect();
sort_vote_counts_by_distance(&mut vote_counts, address);
(known_nodes, voters_by_peer, vote_counts)
}
fn witnessed_consensus_candidates(
witnessed: &WitnessedCloseGroup,
address: &[u8; 32],
quorum: usize,
) -> Vec<WitnessedQuoteCandidate> {
let (known_nodes, voters_by_peer, vote_counts) =
witnessed_vote_counts_and_nodes(witnessed, address);
let mut candidates = vote_counts
.iter()
.filter_map(|(peer_id, votes)| {
if *votes < quorum {
return None;
}
known_nodes.get(peer_id).cloned().and_then(|node| {
voters_by_peer
.get(peer_id)
.cloned()
.map(|voters| WitnessedQuoteCandidate {
node,
votes: *votes,
voters,
})
})
})
.collect::<Vec<_>>();
candidates.sort_by(|left, right| {
peer_xor_distance(&left.node.peer_id, address)
.cmp(&peer_xor_distance(&right.node.peer_id, address))
.then_with(|| right.votes.cmp(&left.votes))
.then_with(|| {
left.node
.peer_id
.as_bytes()
.cmp(right.node.peer_id.as_bytes())
})
});
candidates
}
fn witnessed_vote_counts(witnessed: &WitnessedCloseGroup, address: &[u8; 32]) -> Vec<String> {
let (_, _, vote_counts) = witnessed_vote_counts_and_nodes(witnessed, address);
vote_counts
.iter()
.map(|(peer_id, votes)| format!("{peer_id}:{votes}"))
.collect()
}
fn witnessed_consensus(
witnessed: &WitnessedCloseGroup,
address: &[u8; 32],
quorum: usize,
) -> Vec<String> {
witnessed_consensus_candidates(witnessed, address, quorum)
.iter()
.map(|candidate| format!("{}:{}", candidate.node.peer_id, candidate.votes))
.collect()
}
fn witnessed_close_group_diagnostics(
address: &[u8; 32],
witnessed: &WitnessedCloseGroup,
quorum: usize,
) -> String {
format!(
"target={}, initial={:?}, responder_views={:?}, vote_counts={:?}, quorum={}, final={:?}",
hex::encode(address),
witnessed_initial_peers(witnessed),
witnessed_responder_views(witnessed),
witnessed_vote_counts(witnessed, address),
quorum,
witnessed_consensus(witnessed, address, quorum)
)
}
fn witnessed_quote_selection_or_error(
address: &[u8; 32],
witnessed: &WitnessedCloseGroup,
required: usize,
quorum: usize,
) -> Result<WitnessedQuoteSelection> {
let candidates = witnessed_consensus_candidates(witnessed, address, quorum);
if candidates.len() < required {
return Err(Error::InsufficientPeers(format!(
"Witnessed close group inconclusive before payment: got {}/{} quorum-recognised peers. {}",
candidates.len(),
required,
witnessed_close_group_diagnostics(address, witnessed, quorum)
)));
}
let initial_put_peers = witnessed
.initial_closest
.iter()
.take(CLOSE_GROUP_SIZE)
.map(|node| (node.peer_id, node.addresses_by_priority()))
.collect::<Vec<_>>();
if initial_put_peers.len() < CLOSE_GROUP_SIZE {
return Err(Error::InsufficientPeers(format!(
"Witnessed close group returned only {}/{} initial PUT peers before payment. {}",
initial_put_peers.len(),
CLOSE_GROUP_SIZE,
witnessed_close_group_diagnostics(address, witnessed, quorum)
)));
}
let quote_peers = candidates
.into_iter()
.map(|candidate| WitnessedQuotePeer {
peer_id: candidate.node.peer_id,
addrs: candidate.node.addresses_by_priority(),
voters: candidate.voters,
})
.collect();
Ok(WitnessedQuoteSelection {
quote_peers,
initial_put_peers,
quorum,
})
}
pub(crate) fn median_paid_quote_issuer(
quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)],
) -> Option<(PeerId, Amount)> {
if quotes.len() <= MEDIAN_QUOTE_INDEX {
return None;
}
let mut by_price: Vec<(usize, PeerId, Amount)> = quotes
.iter()
.enumerate()
.map(|(index, (peer_id, _, _, price))| (index, *peer_id, *price))
.collect();
by_price.sort_by_key(|(index, _, price)| (*price, *index));
by_price
.get(MEDIAN_QUOTE_INDEX)
.map(|(_, peer_id, price)| (*peer_id, *price))
}
fn sort_quotes_by_distance(quotes: &mut [StoreQuote], address: &[u8; 32]) {
quotes.sort_by(|left, right| {
peer_xor_distance(&left.0, address)
.cmp(&peer_xor_distance(&right.0, address))
.then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
});
}
fn median_paid_quote_issuer_for_indices(
quotes: &[StoreQuote],
indices: &[usize],
) -> Option<(PeerId, Amount)> {
if indices.len() <= MEDIAN_QUOTE_INDEX {
return None;
}
let mut by_price: Vec<(usize, PeerId, Amount)> = indices
.iter()
.enumerate()
.map(|(selected_index, quote_index)| {
let (peer_id, _, _, price) = "es[*quote_index];
(selected_index, *peer_id, *price)
})
.collect();
by_price.sort_by_key(|(selected_index, _, price)| (*price, *selected_index));
by_price
.get(MEDIAN_QUOTE_INDEX)
.map(|(_, peer_id, price)| (*peer_id, *price))
}
fn median_issuer_voter_support(
quotes: &[StoreQuote],
indices: &[usize],
voters_by_peer: &VotersByPeer,
) -> Option<(PeerId, usize)> {
let (median_peer_id, _) = median_paid_quote_issuer_for_indices(quotes, indices)?;
let voters = voters_by_peer.get(&median_peer_id)?;
Some((median_peer_id, voters.len()))
}
fn visit_quote_subsets<F>(
quote_count: usize,
subset_size: usize,
start_index: usize,
current: &mut Vec<usize>,
visit: &mut F,
) where
F: FnMut(&[usize]),
{
if current.len() == subset_size {
visit(current);
return;
}
let remaining = subset_size - current.len();
let last_start = quote_count - remaining;
for index in start_index..=last_start {
current.push(index);
visit_quote_subsets(quote_count, subset_size, index + 1, current, visit);
current.pop();
}
}
fn select_closest_quotes(mut quotes: Vec<StoreQuote>, address: &[u8; 32]) -> Vec<StoreQuote> {
sort_quotes_by_distance(&mut quotes, address);
quotes.truncate(CLOSE_GROUP_SIZE);
quotes
}
fn select_witnessed_median_voter_quotes(
mut quotes: Vec<StoreQuote>,
address: &[u8; 32],
voters_by_peer: &VotersByPeer,
required_support: usize,
) -> Option<Vec<StoreQuote>> {
if quotes.len() < CLOSE_GROUP_SIZE {
return None;
}
sort_quotes_by_distance(&mut quotes, address);
let mut best_indices: Option<(usize, Vec<usize>)> = None;
let mut current_indices = Vec::with_capacity(CLOSE_GROUP_SIZE);
visit_quote_subsets(
quotes.len(),
CLOSE_GROUP_SIZE,
0,
&mut current_indices,
&mut |indices| {
let Some((_, support)) = median_issuer_voter_support("es, indices, voters_by_peer)
else {
return;
};
if support < required_support {
return;
}
match &best_indices {
Some((best_support, best)) if *best_support > support => {}
Some((best_support, best))
if *best_support == support && best.as_slice() <= indices => {}
_ => best_indices = Some((support, indices.to_vec())),
}
},
);
best_indices.map(|(_, indices)| {
indices
.into_iter()
.map(|index| quotes[index].clone())
.collect()
})
}
fn put_peers_with_median_voters_first(
quotes: &[StoreQuote],
put_peers: &[(PeerId, Vec<MultiAddr>)],
voters_by_peer: &VotersByPeer,
required_support: usize,
) -> Option<Vec<(PeerId, Vec<MultiAddr>)>> {
let (median_peer_id, _) = median_paid_quote_issuer(quotes)?;
let voters = voters_by_peer.get(&median_peer_id)?;
let mut supporting_peers = Vec::new();
let mut fallback_peers = Vec::new();
for (peer_id, addrs) in put_peers {
let peer = (*peer_id, addrs.clone());
if voters.contains(peer_id) {
supporting_peers.push(peer);
} else {
fallback_peers.push(peer);
}
}
if supporting_peers.len() < required_support {
return None;
}
supporting_peers.extend(fallback_peers);
Some(supporting_peers)
}
impl Client {
pub async fn get_store_quotes(
&self,
address: &[u8; 32],
data_size: u64,
data_type: u32,
) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
Ok(self
.get_store_quote_plan(address, data_size, data_type)
.await?
.quotes)
}
pub(crate) async fn get_store_quote_plan(
&self,
address: &[u8; 32],
data_size: u64,
data_type: u32,
) -> Result<StoreQuotePlan> {
let witnessed_selection = self.select_witnessed_quote_selection(address).await?;
let voters_by_peer: VotersByPeer = witnessed_selection
.quote_peers
.iter()
.map(|peer| (peer.peer_id, peer.voters.clone()))
.collect();
let remote_peers = witnessed_selection
.quote_peers
.into_iter()
.map(|peer| (peer.peer_id, peer.addrs))
.collect();
let initial_put_peers = witnessed_selection.initial_put_peers;
let quorum = witnessed_selection.quorum;
let quotes = self
.collect_store_quotes_from_remote_peers(
address,
data_size,
data_type,
remote_peers,
QuoteSelectionPolicy::WitnessedMedianVoters {
voters_by_peer: voters_by_peer.clone(),
quorum,
},
)
.await?;
let put_peers = put_peers_with_median_voters_first(
"es,
&initial_put_peers,
&voters_by_peer,
quorum,
)
.ok_or_else(|| {
Error::InsufficientPeers(format!(
"Collected {} witnessed quotes, but fewer than {} initial witness PUT peers \
voted for the paid median issuer for {}",
quotes.len(),
quorum,
hex::encode(address)
))
})?;
Ok(StoreQuotePlan { quotes, put_peers })
}
pub(crate) async fn get_store_quotes_with_fault_tolerance(
&self,
address: &[u8; 32],
data_size: u64,
data_type: u32,
) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
let peer_query_count = fault_tolerant_quote_query_count();
let remote_peers = self
.network()
.find_closest_peers(address, peer_query_count)
.await?;
self.collect_store_quotes_from_remote_peers(
address,
data_size,
data_type,
remote_peers,
QuoteSelectionPolicy::ClosestByDistance,
)
.await
}
async fn select_witnessed_quote_selection(
&self,
address: &[u8; 32],
) -> Result<WitnessedQuoteSelection> {
let required = single_node_quote_query_count();
let witnessed = self
.network()
.find_witnessed_close_group_with_view_count(
address,
required,
SINGLE_NODE_WITNESSED_VIEW_COUNT,
)
.await
.map_err(|e| {
Error::InsufficientPeers(format!(
"Witnessed close group lookup failed before payment for target {}: {e}",
hex::encode(address)
))
})?;
let base_quorum = witnessed_close_group_quorum();
let missing_views = missing_witnessed_responder_views(&witnessed);
let quorum = witnessed_close_group_quorum_for_transcript(&witnessed);
if missing_views > 0 {
warn!(
target = %hex::encode(address),
initial = witnessed.initial_closest.len(),
responder_views = witnessed.responder_views.len(),
missing_views = missing_views,
base_quorum = base_quorum,
adjusted_quorum = quorum,
"Witnessed close group transcript is missing responder views; lowering SNP witness quorum"
);
}
debug!(
target = %hex::encode(address),
quorum = quorum,
view_count = SINGLE_NODE_WITNESSED_VIEW_COUNT,
initial = ?witnessed_initial_peers(&witnessed),
responder_views = ?witnessed_responder_views(&witnessed),
vote_counts = ?witnessed_vote_counts(&witnessed, address),
final_witnessed_set = ?witnessed_consensus(&witnessed, address, quorum),
"Witnessed close group selected for SNP quote collection"
);
witnessed_quote_selection_or_error(address, &witnessed, required, quorum)
}
#[allow(clippy::too_many_lines)]
async fn collect_store_quotes_from_remote_peers(
&self,
address: &[u8; 32],
data_size: u64,
data_type: u32,
remote_peers: Vec<(PeerId, Vec<MultiAddr>)>,
quote_selection_policy: QuoteSelectionPolicy,
) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
let peer_query_count = remote_peers.len();
let node = self.network().node();
debug!(
"Requesting quotes from up to {peer_query_count} peers for address {} (size: {data_size})",
hex::encode(address)
);
if remote_peers.len() < CLOSE_GROUP_SIZE {
return Err(Error::InsufficientPeers(format!(
"Found {} peers, need {CLOSE_GROUP_SIZE}",
remote_peers.len()
)));
}
debug_assert!(peer_query_count >= CLOSE_GROUP_SIZE);
let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
let overall_timeout = Duration::from_secs(QUOTE_COLLECTION_TIMEOUT_SECS);
let mut quotes = Vec::with_capacity(peer_query_count);
let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
let mut failures: Vec<String> = Vec::new();
let mut bad_quote_count = 0usize;
let staged_witnessed_collection = matches!(
"e_selection_policy,
QuoteSelectionPolicy::WitnessedMedianVoters { .. }
);
if staged_witnessed_collection {
let mut quote_futures = FuturesUnordered::new();
let mut next_peer_index = 0usize;
let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
tokio::time::timeout(overall_timeout, async {
loop {
let launch_count = witnessed_quote_launch_budget(
quotes.len(),
quote_futures.len(),
remote_peers.len().saturating_sub(next_peer_index),
);
for _ in 0..launch_count {
let (peer_id, peer_addrs) = &remote_peers[next_peer_index];
next_peer_index += 1;
quote_futures.push(request_store_quote_from_peer(
node.clone(),
*peer_id,
peer_addrs.clone(),
self.next_request_id(),
*address,
data_size,
data_type,
per_peer_timeout,
));
}
if quotes.len() >= CLOSE_GROUP_SIZE || quote_futures.is_empty() {
break;
}
let Some((peer_id, addrs, quote_result)) = quote_futures.next().await
else {
break;
};
record_store_quote_result(
peer_id,
addrs,
quote_result,
address,
&mut quotes,
&mut already_stored_peers,
&mut failures,
&mut bad_quote_count,
);
}
Ok(())
})
.await;
match collect_result {
Err(_elapsed) => {
warn!(
"Quote collection timed out after {overall_timeout:?} for address {}",
hex::encode(address)
);
}
Ok(Err(e)) => return Err(e),
Ok(Ok(())) => {}
}
} else {
let mut quote_futures = FuturesUnordered::new();
for (peer_id, peer_addrs) in &remote_peers {
quote_futures.push(request_store_quote_from_peer(
node.clone(),
*peer_id,
peer_addrs.clone(),
self.next_request_id(),
*address,
data_size,
data_type,
per_peer_timeout,
));
}
let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
tokio::time::timeout(overall_timeout, async {
while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
record_store_quote_result(
peer_id,
addrs,
quote_result,
address,
&mut quotes,
&mut already_stored_peers,
&mut failures,
&mut bad_quote_count,
);
}
Ok(())
})
.await;
match collect_result {
Err(_elapsed) => {
warn!(
"Quote collection timed out after {overall_timeout:?} for address {}",
hex::encode(address)
);
}
Ok(Err(e)) => return Err(e),
Ok(Ok(())) => {}
}
}
let bad_dropped = drop_quotes_with_bad_bindings(&mut quotes);
if bad_dropped > 0 {
warn!(
"Defensive filter dropped {bad_dropped} quotes with mismatched peer bindings \
for address {} — the per-peer handler should have caught these earlier \
(this indicates an upstream regression)",
hex::encode(address),
);
bad_quote_count += bad_dropped;
}
if !already_stored_peers.is_empty() {
let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
for (peer_id, _, _, _) in "es {
all_peers_by_distance.push((false, peer_xor_distance(peer_id, address)));
}
for (_, dist) in &already_stored_peers {
all_peers_by_distance.push((true, *dist));
}
all_peers_by_distance.sort_by_key(|a| a.1);
let close_group_stored = all_peers_by_distance
.iter()
.take(CLOSE_GROUP_SIZE)
.filter(|(is_stored, _)| *is_stored)
.count();
if close_group_stored >= CLOSE_GROUP_MAJORITY {
debug!(
"Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
hex::encode(address)
);
return Err(Error::AlreadyStored);
}
}
let already_stored_count = already_stored_peers.len();
let failure_count = failures.len();
let quote_count = quotes.len();
let total_responses = quote_count + failure_count + already_stored_count;
if quotes.len() >= CLOSE_GROUP_SIZE {
let selected_quotes = match quote_selection_policy {
QuoteSelectionPolicy::ClosestByDistance => select_closest_quotes(quotes, address),
QuoteSelectionPolicy::WitnessedMedianVoters {
voters_by_peer,
quorum,
} => select_witnessed_median_voter_quotes(quotes, address, &voters_by_peer, quorum)
.ok_or_else(|| {
Error::InsufficientPeers(format!(
"Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} whose paid \
median issuer is recognised by at least {} \
selected witness peers ({total_responses} responses: \
{already_stored_count} already_stored, {failure_count} failed \
including {bad_quote_count} with mismatched peer bindings). \
Failures: [{}]",
quorum,
failures.join("; ")
))
})?,
};
info!(
"Collected {} quotes for address {} ({total_responses} responses: \
{quote_count} ok, {already_stored_count} already_stored, {failure_count} failed, \
{bad_quote_count} bad-binding)",
selected_quotes.len(),
hex::encode(address),
);
return Ok(selected_quotes);
}
Err(Error::InsufficientPeers(format!(
"Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: \
{already_stored_count} already_stored, {failure_count} failed including \
{bad_quote_count} with mismatched peer bindings). Failures: [{}]",
failures.join("; ")
)))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use ant_protocol::evm::RewardsAddress;
use ant_protocol::pqc::ops::{MlDsaOperations, MlDsaPublicKey};
use ant_protocol::transport::{DHTNode, MlDsa65, ResponderView, WitnessedCloseGroup};
use std::time::SystemTime;
use xor_name::XorName;
struct Keypair {
peer_id: PeerId,
pub_key_bytes: Vec<u8>,
}
fn gen_keypair() -> Keypair {
let ml_dsa = MlDsa65::new();
let (pub_key, _sk) = ml_dsa.generate_keypair().expect("ML-DSA-65 keygen");
let pub_key_bytes = pub_key.as_bytes().to_vec();
let peer_id = PeerId::from_bytes(compute_address(&pub_key_bytes));
Keypair {
peer_id,
pub_key_bytes,
}
}
fn good_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
let kp = gen_keypair();
let quote = PaymentQuote {
content: XorName([0u8; 32]),
timestamp: SystemTime::UNIX_EPOCH,
price: Amount::ZERO,
rewards_address: RewardsAddress::new([0u8; 20]),
pub_key: kp.pub_key_bytes,
signature: Vec::new(),
};
(kp.peer_id, Vec::new(), quote, Amount::ZERO)
}
fn bad_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
let claimed = gen_keypair();
let signing = gen_keypair();
assert_ne!(claimed.pub_key_bytes, signing.pub_key_bytes);
assert_ne!(claimed.peer_id.as_bytes(), signing.peer_id.as_bytes());
let quote = PaymentQuote {
content: XorName([0u8; 32]),
timestamp: SystemTime::UNIX_EPOCH,
price: Amount::ZERO,
rewards_address: RewardsAddress::new([0u8; 20]),
pub_key: signing.pub_key_bytes,
signature: Vec::new(),
};
(claimed.peer_id, Vec::new(), quote, Amount::ZERO)
}
fn witnessed_test_node(seed: u8) -> DHTNode {
DHTNode {
peer_id: PeerId::from_bytes([seed; 32]),
addresses: Vec::new(),
address_types: Vec::new(),
distance: None,
reliability: 1.0,
}
}
fn witnessed_test_nodes(seeds: &[u8]) -> Vec<DHTNode> {
seeds.iter().copied().map(witnessed_test_node).collect()
}
fn witnessed_test_view(responder: u8, closest: &[u8]) -> ResponderView {
ResponderView {
responder: PeerId::from_bytes([responder; 32]),
closest: witnessed_test_nodes(closest),
}
}
fn synthetic_peer(seed: u8) -> PeerId {
PeerId::from_bytes([seed; 32])
}
fn synthetic_quote(seed: u8, price: u64) -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
let amount = Amount::from(price);
let quote = PaymentQuote {
content: XorName([0u8; 32]),
timestamp: SystemTime::UNIX_EPOCH,
price: amount,
rewards_address: RewardsAddress::new([0u8; 20]),
pub_key: Vec::new(),
signature: Vec::new(),
};
(synthetic_peer(seed), Vec::new(), quote, amount)
}
fn synthetic_voters(seeds: &[u8]) -> HashSet<PeerId> {
seeds.iter().copied().map(synthetic_peer).collect()
}
fn quote_peer_seeds(quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)]) -> Vec<u8> {
quotes
.iter()
.map(|(peer_id, _, _, _)| peer_id.as_bytes()[0])
.collect()
}
fn put_peer_seeds(peers: &[(PeerId, Vec<MultiAddr>)]) -> Vec<u8> {
peers
.iter()
.map(|(peer_id, _)| peer_id.as_bytes()[0])
.collect()
}
fn put_peers_from_seeds(seeds: &[u8]) -> Vec<(PeerId, Vec<MultiAddr>)> {
seeds
.iter()
.copied()
.map(|seed| (synthetic_peer(seed), Vec::new()))
.collect()
}
fn storer_binding_would_accept(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
if MlDsaPublicKey::from_bytes("e.pub_key).is_err() {
return false;
}
compute_address("e.pub_key) == *peer_id.as_bytes()
}
#[test]
fn binding_accepts_real_self_consistent_keypair() {
let (peer_id, _, quote, _) = good_quote_real();
assert!(quote_binding_is_valid(&peer_id, "e));
assert!(storer_binding_would_accept(&peer_id, "e));
}
#[test]
fn binding_rejects_real_crossed_keypair() {
let (peer_id, _, quote, _) = bad_quote_real();
assert!(!quote_binding_is_valid(&peer_id, "e));
assert!(!storer_binding_would_accept(&peer_id, "e));
}
#[test]
fn binding_rejects_oversize_pubkey() {
let oversized = vec![0u8; ML_DSA_PUB_KEY_LEN + 1];
let peer_id = PeerId::from_bytes(compute_address(&oversized));
let quote = PaymentQuote {
content: XorName([0u8; 32]),
timestamp: SystemTime::UNIX_EPOCH,
price: Amount::ZERO,
rewards_address: RewardsAddress::new([0u8; 20]),
pub_key: oversized,
signature: Vec::new(),
};
assert_eq!(compute_address("e.pub_key), *peer_id.as_bytes());
assert!(
!quote_binding_is_valid(&peer_id, "e),
"predicate must reject oversize pub_key even when BLAKE3 happens to match"
);
assert!(!storer_binding_would_accept(&peer_id, "e));
}
#[test]
fn binding_rejects_undersize_pubkey() {
let undersized = vec![0u8; ML_DSA_PUB_KEY_LEN - 1];
let peer_id = PeerId::from_bytes(compute_address(&undersized));
let quote = PaymentQuote {
content: XorName([0u8; 32]),
timestamp: SystemTime::UNIX_EPOCH,
price: Amount::ZERO,
rewards_address: RewardsAddress::new([0u8; 20]),
pub_key: undersized,
signature: Vec::new(),
};
assert!(!quote_binding_is_valid(&peer_id, "e));
assert!(!storer_binding_would_accept(&peer_id, "e));
}
#[test]
fn quote_query_counts_keep_single_node_close_group_only() {
assert_eq!(single_node_quote_query_count(), CLOSE_GROUP_SIZE);
assert_eq!(SINGLE_NODE_WITNESSED_VIEW_COUNT, 20);
assert!(SINGLE_NODE_WITNESSED_VIEW_COUNT > single_node_quote_query_count());
assert_eq!(witnessed_close_group_quorum(), 5);
assert_eq!(witnessed_close_group_quorum_for_missing_views(0), 5);
assert_eq!(witnessed_close_group_quorum_for_missing_views(1), 4);
assert_eq!(witnessed_close_group_quorum_for_missing_views(2), 3);
assert_eq!(
fault_tolerant_quote_query_count(),
CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
);
assert!(fault_tolerant_quote_query_count() > single_node_quote_query_count());
}
#[test]
fn witnessed_quote_launch_budget_keeps_exact_quote_window() {
assert_eq!(
witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE * 2),
CLOSE_GROUP_SIZE,
"initial SNP quote fetch should launch the closest seven peers"
);
assert_eq!(
witnessed_quote_launch_budget(1, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
0,
"a successful quote should not launch an extra fallback"
);
assert_eq!(
witnessed_quote_launch_budget(0, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1,
"a failed in-flight quote should launch the next closest fallback"
);
assert_eq!(
witnessed_quote_launch_budget(CLOSE_GROUP_SIZE - 1, 0, 3),
1,
"only one more peer is needed for the seventh quote"
);
assert_eq!(
witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE - 1),
CLOSE_GROUP_SIZE - 1,
"launch budget is capped by remaining candidates"
);
}
#[test]
fn witnessed_candidates_sort_by_xor_distance_then_votes() {
let address = [0u8; 32];
let witnessed = WitnessedCloseGroup {
target: address,
k: CLOSE_GROUP_SIZE,
initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
responder_views: vec![
witnessed_test_view(1, &[1, 9]),
witnessed_test_view(2, &[1, 9]),
witnessed_test_view(3, &[1, 9]),
witnessed_test_view(4, &[1, 9]),
witnessed_test_view(5, &[1, 9]),
witnessed_test_view(6, &[9]),
witnessed_test_view(7, &[9]),
],
};
let candidates =
witnessed_consensus_candidates(&witnessed, &address, witnessed_close_group_quorum());
assert_eq!(
candidates
.iter()
.map(|candidate| candidate.node.peer_id.as_bytes()[0])
.collect::<Vec<_>>(),
vec![1, 9],
"XOR closeness must be the primary sort before quote collection"
);
}
#[test]
fn witnessed_quote_peers_error_is_typed_and_pre_payment_when_consensus_is_short() {
let address = [0u8; 32];
let responder_views = (1..=7)
.map(|responder| witnessed_test_view(responder, &[1, 2, 3, 4]))
.collect();
let witnessed = WitnessedCloseGroup {
target: address,
k: CLOSE_GROUP_SIZE,
initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
responder_views,
};
let err = witnessed_quote_selection_or_error(
&address,
&witnessed,
CLOSE_GROUP_SIZE,
witnessed_close_group_quorum(),
)
.expect_err("short witnessed consensus must fail before payment");
match err {
Error::InsufficientPeers(message) => {
assert!(message.contains("before payment"));
assert!(message.contains("vote_counts"));
assert!(message.contains("quorum"));
}
other => panic!("expected typed InsufficientPeers error, got {other:?}"),
}
}
#[test]
fn witnessed_quote_peers_include_quorum_fallback_candidates() {
const EXTRA_QUORUM_CANDIDATES: usize = 1;
let address = [0u8; 32];
let witnessed = WitnessedCloseGroup {
target: address,
k: CLOSE_GROUP_SIZE,
initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
responder_views: vec![
witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
witnessed_test_view(7, &[1, 3, 4, 5, 6, 7, 8]),
],
};
let selection = witnessed_quote_selection_or_error(
&address,
&witnessed,
CLOSE_GROUP_SIZE,
witnessed_close_group_quorum(),
)
.expect("fallback candidates should be retained for quote collection");
assert_eq!(
selection.quote_peers.len(),
CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES
);
assert_eq!(
selection
.quote_peers
.iter()
.map(|peer| peer.peer_id.as_bytes()[0])
.collect::<Vec<_>>(),
vec![1, 2, 3, 4, 5, 6, 7, 8]
);
assert_eq!(
put_peer_seeds(&selection.initial_put_peers),
vec![1, 2, 3, 4, 5, 6, 7]
);
}
#[test]
fn witnessed_quote_peers_lower_quorum_for_missing_responder_views() {
let address = [0u8; 32];
let witnessed = WitnessedCloseGroup {
target: address,
k: CLOSE_GROUP_SIZE,
initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
responder_views: vec![
witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
],
};
let quorum = witnessed_close_group_quorum_for_transcript(&witnessed);
assert_eq!(missing_witnessed_responder_views(&witnessed), 1);
assert_eq!(quorum, 4);
let selection =
witnessed_quote_selection_or_error(&address, &witnessed, CLOSE_GROUP_SIZE, quorum)
.expect(
"one missing responder view should lower quorum and still select candidates",
);
assert_eq!(
selection
.quote_peers
.iter()
.map(|peer| peer.peer_id.as_bytes()[0])
.collect::<Vec<_>>(),
vec![1, 2, 3, 4, 5, 6, 7, 8]
);
assert_eq!(selection.quorum, quorum);
}
#[test]
fn witnessed_quote_selection_keeps_closest_set_with_median_voter_quorum() {
const MEDIAN_ISSUER_SEED: u8 = 7;
const FAR_SUPPORTING_VOTER_SEED: u8 = 20;
const UNSUCCESSFUL_SUPPORTING_VOTER_SEED: u8 = 21;
let address = [0u8; 32];
let quotes = vec![
synthetic_quote(1, 10),
synthetic_quote(2, 20),
synthetic_quote(3, 30),
synthetic_quote(6, 50),
synthetic_quote(MEDIAN_ISSUER_SEED, 40),
synthetic_quote(8, 60),
synthetic_quote(9, 70),
synthetic_quote(FAR_SUPPORTING_VOTER_SEED, 80),
];
let mut voters_by_peer = HashMap::new();
voters_by_peer.insert(
synthetic_peer(MEDIAN_ISSUER_SEED),
synthetic_voters(&[
1,
2,
3,
MEDIAN_ISSUER_SEED,
FAR_SUPPORTING_VOTER_SEED,
UNSUCCESSFUL_SUPPORTING_VOTER_SEED,
]),
);
let quorum = witnessed_close_group_quorum();
let selected =
select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
.expect("a supported close-group quote set should be selected");
assert_eq!(quote_peer_seeds(&selected), vec![1, 2, 3, 6, 7, 8, 9]);
let (median_peer_id, _) =
median_paid_quote_issuer(&selected).expect("selected quotes have a median");
assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
assert!(voters_by_peer[&median_peer_id].len() >= quorum);
}
#[test]
fn witnessed_quote_selection_uses_direct_median_witness_recognition() {
const MEDIAN_ISSUER_SEED: u8 = 7;
let address = [0u8; 32];
let quotes = vec![
synthetic_quote(1, 10),
synthetic_quote(2, 20),
synthetic_quote(3, 30),
synthetic_quote(4, 50),
synthetic_quote(MEDIAN_ISSUER_SEED, 40),
synthetic_quote(8, 60),
synthetic_quote(9, 70),
];
let mut voters_by_peer = HashMap::new();
voters_by_peer.insert(
synthetic_peer(MEDIAN_ISSUER_SEED),
synthetic_voters(&[20, 21, 22, 23, 24]),
);
let quorum = witnessed_close_group_quorum();
let selected =
select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
.expect("direct witness recognition should support the paid median issuer");
let (median_peer_id, _) =
median_paid_quote_issuer(&selected).expect("selected quotes have a median");
let selected_peers = selected
.iter()
.map(|(peer_id, _, _, _)| *peer_id)
.collect::<HashSet<_>>();
assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
assert_eq!(
voters_by_peer[&median_peer_id]
.intersection(&selected_peers)
.count(),
0,
"recognising witnesses need not also be selected quote issuers"
);
assert_eq!(voters_by_peer[&median_peer_id].len(), quorum);
}
#[test]
fn witnessed_quote_selection_rejects_median_without_witness_quorum() {
const MEDIAN_ISSUER_SEED: u8 = 7;
let address = [0u8; 32];
let quotes = vec![
synthetic_quote(1, 10),
synthetic_quote(2, 20),
synthetic_quote(3, 30),
synthetic_quote(6, 50),
synthetic_quote(MEDIAN_ISSUER_SEED, 40),
synthetic_quote(8, 60),
synthetic_quote(9, 70),
synthetic_quote(10, 80),
];
let mut voters_by_peer = HashMap::new();
voters_by_peer.insert(
synthetic_peer(MEDIAN_ISSUER_SEED),
synthetic_voters(&[1, 2, 3, 20]),
);
let selected = select_witnessed_median_voter_quotes(
quotes,
&address,
&voters_by_peer,
witnessed_close_group_quorum(),
);
assert!(
selected.is_none(),
"the selector must not return a paid quote set when fewer than the \
witnessed median voter quorum recognised the paid median issuer"
);
}
#[test]
fn put_peers_prioritise_median_voters_without_reordering_quotes() {
const MEDIAN_ISSUER_SEED: u8 = 7;
let quotes = vec![
synthetic_quote(1, 10),
synthetic_quote(2, 20),
synthetic_quote(3, 30),
synthetic_quote(4, 50),
synthetic_quote(5, 60),
synthetic_quote(6, 70),
synthetic_quote(MEDIAN_ISSUER_SEED, 40),
];
let mut voters_by_peer = HashMap::new();
voters_by_peer.insert(
synthetic_peer(MEDIAN_ISSUER_SEED),
synthetic_voters(&[3, 4, 5, 6, MEDIAN_ISSUER_SEED]),
);
let put_candidates = put_peers_from_seeds(&[1, 2, 3, 4, 5, 6, 7]);
let put_peers = put_peers_with_median_voters_first(
"es,
&put_candidates,
&voters_by_peer,
witnessed_close_group_quorum(),
)
.expect("median voters should produce an ordered PUT set");
assert_eq!(quote_peer_seeds("es), vec![1, 2, 3, 4, 5, 6, 7]);
let (median_peer_id, _) =
median_paid_quote_issuer("es).expect("selected quotes have a median");
assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
assert_eq!(put_peer_seeds(&put_peers), vec![3, 4, 5, 6, 7, 1, 2]);
}
#[test]
fn filter_drops_only_bad_bindings_and_leaves_storer_acceptable_quotes() {
let mut quotes = vec![
good_quote_real(),
bad_quote_real(),
good_quote_real(),
bad_quote_real(),
good_quote_real(),
];
let dropped = drop_quotes_with_bad_bindings(&mut quotes);
assert_eq!(dropped, 2, "two crossed-key quotes must be dropped");
assert_eq!(quotes.len(), 3, "three real-key quotes must remain");
for (peer_id, _, quote, _) in "es {
assert!(
storer_binding_would_accept(peer_id, quote),
"every retained quote must satisfy the full storer-side spec"
);
}
}
#[test]
fn filter_is_noop_when_all_quotes_are_storer_acceptable() {
let mut quotes: Vec<_> = (0..5).map(|_| good_quote_real()).collect();
let before = quotes.len();
let dropped = drop_quotes_with_bad_bindings(&mut quotes);
assert_eq!(dropped, 0);
assert_eq!(quotes.len(), before);
for (peer_id, _, quote, _) in "es {
assert!(storer_binding_would_accept(peer_id, quote));
}
}
#[test]
fn filter_drops_all_when_every_responder_is_bad() {
let mut quotes: Vec<_> = (0..fault_tolerant_quote_query_count())
.map(|_| bad_quote_real())
.collect();
let dropped = drop_quotes_with_bad_bindings(&mut quotes);
assert_eq!(dropped, fault_tolerant_quote_query_count());
assert!(quotes.is_empty());
}
#[test]
fn filter_preserves_quote_payload_byte_for_byte() {
let (peer_id, addrs, original_quote, amount) = good_quote_real();
let mut quotes = vec![(peer_id, addrs.clone(), original_quote.clone(), amount)];
let _ = drop_quotes_with_bad_bindings(&mut quotes);
let (kept_peer, kept_addrs, kept_quote, kept_amount) =
quotes.pop().expect("the good quote must survive filtering");
assert_eq!(kept_peer.as_bytes(), peer_id.as_bytes());
assert_eq!(kept_addrs.len(), addrs.len());
assert_eq!(kept_amount, amount);
assert_eq!(kept_quote.pub_key, original_quote.pub_key);
assert_eq!(kept_quote.signature, original_quote.signature);
assert_eq!(kept_quote.content.0, original_quote.content.0);
assert_eq!(kept_quote.timestamp, original_quote.timestamp);
assert_eq!(kept_quote.price, original_quote.price);
assert_eq!(kept_quote.rewards_address, original_quote.rewards_address);
}
#[test]
fn repro_apr_30_storer_would_have_rejected_pre_filter_and_accepts_post_filter() {
let over_query_count = fault_tolerant_quote_query_count();
let mut quotes: Vec<_> = (0..over_query_count - 1)
.map(|_| good_quote_real())
.collect();
quotes.insert(over_query_count / 2, bad_quote_real());
assert_eq!(quotes.len(), over_query_count);
let storer_would_reject_count = quotes
.iter()
.filter(|(p, _, q, _)| !storer_binding_would_accept(p, q))
.count();
assert_eq!(
storer_would_reject_count, 1,
"exactly one quote (the crossed-key one) must be rejected by the storer spec"
);
let dropped = drop_quotes_with_bad_bindings(&mut quotes);
assert_eq!(dropped, 1, "exactly the crossed-key quote must be filtered");
for (peer_id, _, quote, _) in "es {
assert!(
storer_binding_would_accept(peer_id, quote),
"every post-filter quote must be accepted by the storer spec — \
this is what the filter guarantees before any quote set is used"
);
}
assert!(
quotes.len() >= CLOSE_GROUP_SIZE,
"after filtering, at least CLOSE_GROUP_SIZE good quotes must remain \
so a fault-tolerant probe can still return a full close group"
);
}
#[test]
fn filter_leaves_short_set_when_too_many_bad_peers() {
let good_count = CLOSE_GROUP_SIZE - 1;
let bad_count = fault_tolerant_quote_query_count() - good_count;
let mut quotes: Vec<_> = std::iter::repeat_with(bad_quote_real)
.take(bad_count)
.chain(std::iter::repeat_with(good_quote_real).take(good_count))
.collect();
let dropped = drop_quotes_with_bad_bindings(&mut quotes);
assert_eq!(dropped, bad_count);
assert!(
quotes.len() < CLOSE_GROUP_SIZE,
"this is the precondition for InsufficientPeers downstream"
);
for (peer_id, _, quote, _) in "es {
assert!(storer_binding_would_accept(peer_id, quote));
}
}
fn serialize_quote(quote: &PaymentQuote) -> Vec<u8> {
rmp_serde::to_vec(quote).expect("serialize quote")
}
#[test]
fn classifier_accepts_real_self_consistent_quote() {
let (peer_id, _, quote, _) = good_quote_real();
let bytes = serialize_quote("e);
let result = classify_quote_response(&peer_id, &bytes, false);
match result {
Ok((q, price)) => {
assert_eq!(q.pub_key, quote.pub_key);
assert_eq!(price, quote.price);
}
Err(e) => panic!("expected Ok, got {e}"),
}
}
#[test]
fn classifier_rejects_crossed_keypair_with_typed_error() {
let (peer_id, _, quote, _) = bad_quote_real();
let bytes = serialize_quote("e);
let result = classify_quote_response(&peer_id, &bytes, false);
match result {
Err(Error::BadQuoteBinding {
peer_id: pid,
detail,
}) => {
assert_eq!(pid, peer_id.to_string());
assert!(
detail.contains("BLAKE3(pub_key)="),
"diagnostic detail must include the derived peer id: {detail}"
);
}
other => panic!("expected BadQuoteBinding for crossed-key quote, got {other:?}"),
}
}
#[test]
fn classifier_rejects_already_stored_vote_from_bad_binding_peer() {
let (peer_id, _, quote, _) = bad_quote_real();
let bytes = serialize_quote("e);
let result = classify_quote_response(&peer_id, &bytes, true);
assert!(
matches!(result, Err(Error::BadQuoteBinding { .. })),
"crossed-key peer must be classified BadQuoteBinding even when \
voting already_stored=true; got {result:?}"
);
}
#[test]
fn classifier_honours_already_stored_vote_from_good_binding_peer() {
let (peer_id, _, quote, _) = good_quote_real();
let bytes = serialize_quote("e);
let result = classify_quote_response(&peer_id, &bytes, true);
assert!(
matches!(result, Err(Error::AlreadyStored)),
"honest peer's already_stored vote must be honoured; got {result:?}"
);
}
#[test]
fn classifier_returns_serialization_error_on_bad_bytes() {
let (peer_id, _, _, _) = good_quote_real();
let garbage = b"this is not a valid msgpack PaymentQuote".to_vec();
let result = classify_quote_response(&peer_id, &garbage, false);
assert!(
matches!(result, Err(Error::Serialization(_))),
"garbage bytes must produce a Serialization error; got {result:?}"
);
}
#[test]
fn classifier_verdict_matches_storer_binding_spec_for_mixed_responders() {
let mut responders: Vec<(PeerId, PaymentQuote)> = (0..12)
.map(|_| {
let (p, _, q, _) = good_quote_real();
(p, q)
})
.collect();
for _ in 0..4 {
let (p, _, q, _) = bad_quote_real();
responders.push((p, q));
}
for (peer_id, quote) in &responders {
let bytes = serialize_quote(quote);
let storer_verdict = storer_binding_would_accept(peer_id, quote);
let classifier_verdict = classify_quote_response(peer_id, &bytes, false).is_ok();
assert_eq!(
classifier_verdict, storer_verdict,
"classifier and storer-binding-spec must agree on every responder \
(peer_id={}, storer={storer_verdict}, classifier={classifier_verdict})",
peer_id
);
}
}
}