1use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::evm::{Amount, PaymentQuote};
10use ant_protocol::transport::{MultiAddr, PeerId};
11use ant_protocol::{
12 send_and_await_chunk_response, ChunkMessage, ChunkMessageBody, ChunkQuoteRequest,
13 ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
14};
15use futures::stream::{FuturesUnordered, StreamExt};
16use std::time::{Duration, Instant};
17use tracing::{debug, info, warn};
18
19fn xor_distance(peer_id: &PeerId, target: &[u8; 32]) -> [u8; 32] {
24 let peer_bytes = peer_id.as_bytes();
25 let mut distance = [0u8; 32];
26 for (i, d) in distance.iter_mut().enumerate() {
27 let pb = peer_bytes.get(i).copied().unwrap_or(0);
28 *d = pb ^ target[i];
29 }
30 distance
31}
32
33impl Client {
34 #[allow(clippy::too_many_lines)]
47 pub async fn get_store_quotes(
48 &self,
49 address: &[u8; 32],
50 data_size: u64,
51 data_type: u32,
52 ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
53 let node = self.network().node();
54
55 let over_query_count = CLOSE_GROUP_SIZE * 2;
57 debug!(
58 "Requesting quotes from up to {over_query_count} peers for address {} (size: {data_size})",
59 hex::encode(address)
60 );
61
62 let remote_peers = self
63 .network()
64 .find_closest_peers(address, over_query_count)
65 .await?;
66
67 if remote_peers.len() < CLOSE_GROUP_SIZE {
68 return Err(Error::InsufficientPeers(format!(
69 "Found {} peers, need {CLOSE_GROUP_SIZE}",
70 remote_peers.len()
71 )));
72 }
73
74 let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
75 let overall_timeout = Duration::from_secs(120);
79
80 let mut quote_futures = FuturesUnordered::new();
82
83 for (peer_id, peer_addrs) in &remote_peers {
84 let request_id = self.next_request_id();
85 let request = ChunkQuoteRequest {
86 address: *address,
87 data_size,
88 data_type,
89 };
90 let message = ChunkMessage {
91 request_id,
92 body: ChunkMessageBody::QuoteRequest(request),
93 };
94
95 let message_bytes = match message.encode() {
96 Ok(bytes) => bytes,
97 Err(e) => {
98 warn!("Failed to encode quote request for {peer_id}: {e}");
99 continue;
100 }
101 };
102
103 let peer_id_clone = *peer_id;
104 let addrs_clone = peer_addrs.clone();
105 let node_clone = node.clone();
106
107 let quote_future = async move {
108 let start = Instant::now();
109 let result = send_and_await_chunk_response(
110 &node_clone,
111 &peer_id_clone,
112 message_bytes,
113 request_id,
114 per_peer_timeout,
115 &addrs_clone,
116 |body| match body {
117 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
118 quote,
119 already_stored,
120 }) => {
121 if already_stored {
122 debug!("Peer {peer_id_clone} already has chunk");
123 return Some(Err(Error::AlreadyStored));
124 }
125 match rmp_serde::from_slice::<PaymentQuote>("e) {
126 Ok(payment_quote) => {
127 let price = payment_quote.price;
128 debug!("Received quote from {peer_id_clone}: price = {price}");
129 Some(Ok((payment_quote, price)))
130 }
131 Err(e) => Some(Err(Error::Serialization(format!(
132 "Failed to deserialize quote from {peer_id_clone}: {e}"
133 )))),
134 }
135 }
136 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
137 Error::Protocol(format!("Quote error from {peer_id_clone}: {e}")),
138 )),
139 _ => None,
140 },
141 |e| {
142 Error::Network(format!(
143 "Failed to send quote request to {peer_id_clone}: {e}"
144 ))
145 },
146 || Error::Timeout(format!("Timeout waiting for quote from {peer_id_clone}")),
147 )
148 .await;
149
150 let success = result.is_ok();
151 let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
152 record_peer_outcome(&node_clone, peer_id_clone, &addrs_clone, success, rtt_ms)
153 .await;
154
155 (peer_id_clone, addrs_clone, result)
156 };
157
158 quote_futures.push(quote_future);
159 }
160
161 let mut quotes = Vec::with_capacity(over_query_count);
164 let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
165 let mut failures: Vec<String> = Vec::new();
166
167 let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
168 tokio::time::timeout(overall_timeout, async {
169 while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
170 match quote_result {
171 Ok((quote, price)) => {
172 quotes.push((peer_id, addrs, quote, price));
173 }
174 Err(Error::AlreadyStored) => {
175 info!("Peer {peer_id} reports chunk already stored");
176 let dist = xor_distance(&peer_id, address);
177 already_stored_peers.push((peer_id, dist));
178 }
179 Err(e) => {
180 warn!("Failed to get quote from {peer_id}: {e}");
181 failures.push(format!("{peer_id}: {e}"));
182 }
183 }
184 }
185 Ok(())
186 })
187 .await;
188
189 match collect_result {
190 Err(_elapsed) => {
191 warn!(
192 "Quote collection timed out after {overall_timeout:?} for address {}",
193 hex::encode(address)
194 );
195 }
199 Ok(Err(e)) => return Err(e),
200 Ok(Ok(())) => {}
201 }
202
203 if !already_stored_peers.is_empty() {
205 let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
206 for (peer_id, _, _, _) in "es {
207 all_peers_by_distance.push((false, xor_distance(peer_id, address)));
208 }
209 for (_, dist) in &already_stored_peers {
210 all_peers_by_distance.push((true, *dist));
211 }
212 all_peers_by_distance.sort_by_key(|a| a.1);
213
214 let close_group_stored = all_peers_by_distance
215 .iter()
216 .take(CLOSE_GROUP_SIZE)
217 .filter(|(is_stored, _)| *is_stored)
218 .count();
219
220 if close_group_stored >= CLOSE_GROUP_MAJORITY {
221 debug!(
222 "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
223 hex::encode(address)
224 );
225 return Err(Error::AlreadyStored);
226 }
227 }
228
229 let already_stored_count = already_stored_peers.len();
230 let failure_count = failures.len();
231 let quote_count = quotes.len();
232 let total_responses = quote_count + failure_count + already_stored_count;
233
234 if quotes.len() >= CLOSE_GROUP_SIZE {
235 quotes.sort_by(|a, b| {
237 let dist_a = xor_distance(&a.0, address);
238 let dist_b = xor_distance(&b.0, address);
239 dist_a.cmp(&dist_b)
240 });
241 quotes.truncate(CLOSE_GROUP_SIZE);
242
243 info!(
244 "Collected {} quotes for address {} ({total_responses} responses: {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed)",
245 quotes.len(),
246 hex::encode(address),
247 );
248 return Ok(quotes);
249 }
250
251 Err(Error::InsufficientPeers(format!(
252 "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: {already_stored_count} already_stored, {failure_count} failed). Failures: [{}]",
253 failures.join("; ")
254 )))
255 }
256}