use crate::data::client::peer_cache::record_peer_outcome;
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_protocol::evm::{Amount, PaymentQuote};
use ant_protocol::transport::{MultiAddr, PeerId};
use ant_protocol::{
send_and_await_chunk_response, ChunkMessage, ChunkMessageBody, ChunkQuoteRequest,
ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
};
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
fn xor_distance(peer_id: &PeerId, target: &[u8; 32]) -> [u8; 32] {
let peer_bytes = peer_id.as_bytes();
let mut distance = [0u8; 32];
for (i, d) in distance.iter_mut().enumerate() {
let pb = peer_bytes.get(i).copied().unwrap_or(0);
*d = pb ^ target[i];
}
distance
}
impl Client {
#[allow(clippy::too_many_lines)]
pub async fn get_store_quotes(
&self,
address: &[u8; 32],
data_size: u64,
data_type: u32,
) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
let node = self.network().node();
let over_query_count = CLOSE_GROUP_SIZE * 2;
debug!(
"Requesting quotes from up to {over_query_count} peers for address {} (size: {data_size})",
hex::encode(address)
);
let remote_peers = self
.network()
.find_closest_peers(address, over_query_count)
.await?;
if remote_peers.len() < CLOSE_GROUP_SIZE {
return Err(Error::InsufficientPeers(format!(
"Found {} peers, need {CLOSE_GROUP_SIZE}",
remote_peers.len()
)));
}
let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
let overall_timeout = Duration::from_secs(120);
let mut quote_futures = FuturesUnordered::new();
for (peer_id, peer_addrs) in &remote_peers {
let request_id = self.next_request_id();
let request = ChunkQuoteRequest {
address: *address,
data_size,
data_type,
};
let message = ChunkMessage {
request_id,
body: ChunkMessageBody::QuoteRequest(request),
};
let message_bytes = match message.encode() {
Ok(bytes) => bytes,
Err(e) => {
warn!("Failed to encode quote request for {peer_id}: {e}");
continue;
}
};
let peer_id_clone = *peer_id;
let addrs_clone = peer_addrs.clone();
let node_clone = node.clone();
let quote_future = async move {
let start = Instant::now();
let result = send_and_await_chunk_response(
&node_clone,
&peer_id_clone,
message_bytes,
request_id,
per_peer_timeout,
&addrs_clone,
|body| match body {
ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
quote,
already_stored,
}) => {
if already_stored {
debug!("Peer {peer_id_clone} already has chunk");
return Some(Err(Error::AlreadyStored));
}
match rmp_serde::from_slice::<PaymentQuote>("e) {
Ok(payment_quote) => {
let price = payment_quote.price;
debug!("Received quote from {peer_id_clone}: price = {price}");
Some(Ok((payment_quote, price)))
}
Err(e) => Some(Err(Error::Serialization(format!(
"Failed to deserialize quote from {peer_id_clone}: {e}"
)))),
}
}
ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
Error::Protocol(format!("Quote error from {peer_id_clone}: {e}")),
)),
_ => None,
},
|e| {
Error::Network(format!(
"Failed to send quote request to {peer_id_clone}: {e}"
))
},
|| Error::Timeout(format!("Timeout waiting for quote from {peer_id_clone}")),
)
.await;
let success = result.is_ok();
let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
record_peer_outcome(&node_clone, peer_id_clone, &addrs_clone, success, rtt_ms)
.await;
(peer_id_clone, addrs_clone, result)
};
quote_futures.push(quote_future);
}
let mut quotes = Vec::with_capacity(over_query_count);
let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
let mut failures: Vec<String> = Vec::new();
let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
tokio::time::timeout(overall_timeout, async {
while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
match quote_result {
Ok((quote, price)) => {
quotes.push((peer_id, addrs, quote, price));
}
Err(Error::AlreadyStored) => {
info!("Peer {peer_id} reports chunk already stored");
let dist = xor_distance(&peer_id, address);
already_stored_peers.push((peer_id, dist));
}
Err(e) => {
warn!("Failed to get quote from {peer_id}: {e}");
failures.push(format!("{peer_id}: {e}"));
}
}
}
Ok(())
})
.await;
match collect_result {
Err(_elapsed) => {
warn!(
"Quote collection timed out after {overall_timeout:?} for address {}",
hex::encode(address)
);
}
Ok(Err(e)) => return Err(e),
Ok(Ok(())) => {}
}
if !already_stored_peers.is_empty() {
let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
for (peer_id, _, _, _) in "es {
all_peers_by_distance.push((false, xor_distance(peer_id, address)));
}
for (_, dist) in &already_stored_peers {
all_peers_by_distance.push((true, *dist));
}
all_peers_by_distance.sort_by_key(|a| a.1);
let close_group_stored = all_peers_by_distance
.iter()
.take(CLOSE_GROUP_SIZE)
.filter(|(is_stored, _)| *is_stored)
.count();
if close_group_stored >= CLOSE_GROUP_MAJORITY {
debug!(
"Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
hex::encode(address)
);
return Err(Error::AlreadyStored);
}
}
let already_stored_count = already_stored_peers.len();
let failure_count = failures.len();
let quote_count = quotes.len();
let total_responses = quote_count + failure_count + already_stored_count;
if quotes.len() >= CLOSE_GROUP_SIZE {
quotes.sort_by(|a, b| {
let dist_a = xor_distance(&a.0, address);
let dist_b = xor_distance(&b.0, address);
dist_a.cmp(&dist_b)
});
quotes.truncate(CLOSE_GROUP_SIZE);
info!(
"Collected {} quotes for address {} ({total_responses} responses: {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed)",
quotes.len(),
hex::encode(address),
);
return Ok(quotes);
}
Err(Error::InsufficientPeers(format!(
"Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: {already_stored_count} already_stored, {failure_count} failed). Failures: [{}]",
failures.join("; ")
)))
}
}