Skip to main content

ant_core/data/client/
quote.rs

1//! Quote and payment operations.
2//!
3//! Handles requesting storage quotes from network nodes and
4//! managing payment for data storage.
5
6use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::evm::{Amount, PaymentQuote};
10use ant_protocol::transport::{MultiAddr, PeerId};
11use ant_protocol::{
12    send_and_await_chunk_response, ChunkMessage, ChunkMessageBody, ChunkQuoteRequest,
13    ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
14};
15use futures::stream::{FuturesUnordered, StreamExt};
16use std::time::{Duration, Instant};
17use tracing::{debug, info, warn};
18
19/// Compute XOR distance between a peer's ID bytes and a target address.
20///
21/// Uses the first 32 bytes of the peer ID (or fewer if shorter) XORed with
22/// the target address. Returns a byte array suitable for lexicographic comparison.
23fn xor_distance(peer_id: &PeerId, target: &[u8; 32]) -> [u8; 32] {
24    let peer_bytes = peer_id.as_bytes();
25    let mut distance = [0u8; 32];
26    for (i, d) in distance.iter_mut().enumerate() {
27        let pb = peer_bytes.get(i).copied().unwrap_or(0);
28        *d = pb ^ target[i];
29    }
30    distance
31}
32
33impl Client {
34    /// Get storage quotes from the closest peers for a given address.
35    ///
36    /// Queries 2x `CLOSE_GROUP_SIZE` peers from the DHT for fault tolerance,
37    /// requests quotes from all of them concurrently, and returns the
38    /// `CLOSE_GROUP_SIZE` closest successful responders sorted by XOR distance.
39    ///
40    /// Returns `Error::AlreadyStored` early if `CLOSE_GROUP_MAJORITY` peers
41    /// report the chunk is already stored.
42    ///
43    /// # Errors
44    ///
45    /// Returns an error if insufficient quotes can be collected.
46    #[allow(clippy::too_many_lines)]
47    pub async fn get_store_quotes(
48        &self,
49        address: &[u8; 32],
50        data_size: u64,
51        data_type: u32,
52    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
53        let node = self.network().node();
54
55        // Over-query for fault tolerance: ask 2x peers, keep closest successful ones.
56        let over_query_count = CLOSE_GROUP_SIZE * 2;
57        debug!(
58            "Requesting quotes from up to {over_query_count} peers for address {} (size: {data_size})",
59            hex::encode(address)
60        );
61
62        let remote_peers = self
63            .network()
64            .find_closest_peers(address, over_query_count)
65            .await?;
66
67        if remote_peers.len() < CLOSE_GROUP_SIZE {
68            return Err(Error::InsufficientPeers(format!(
69                "Found {} peers, need {CLOSE_GROUP_SIZE}",
70                remote_peers.len()
71            )));
72        }
73
74        let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
75        // Overall timeout for collecting all quotes. Must accommodate
76        // connect_with_fallback cascade (direct 5s + hole-punch 15s×3 + relay 30s ≈ 80s)
77        // plus the per-peer quote timeout. 120s is generous.
78        let overall_timeout = Duration::from_secs(120);
79
80        // Request quotes from all peers concurrently
81        let mut quote_futures = FuturesUnordered::new();
82
83        for (peer_id, peer_addrs) in &remote_peers {
84            let request_id = self.next_request_id();
85            let request = ChunkQuoteRequest {
86                address: *address,
87                data_size,
88                data_type,
89            };
90            let message = ChunkMessage {
91                request_id,
92                body: ChunkMessageBody::QuoteRequest(request),
93            };
94
95            let message_bytes = match message.encode() {
96                Ok(bytes) => bytes,
97                Err(e) => {
98                    warn!("Failed to encode quote request for {peer_id}: {e}");
99                    continue;
100                }
101            };
102
103            let peer_id_clone = *peer_id;
104            let addrs_clone = peer_addrs.clone();
105            let node_clone = node.clone();
106
107            let quote_future = async move {
108                let start = Instant::now();
109                let result = send_and_await_chunk_response(
110                    &node_clone,
111                    &peer_id_clone,
112                    message_bytes,
113                    request_id,
114                    per_peer_timeout,
115                    &addrs_clone,
116                    |body| match body {
117                        ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
118                            quote,
119                            already_stored,
120                        }) => {
121                            if already_stored {
122                                debug!("Peer {peer_id_clone} already has chunk");
123                                return Some(Err(Error::AlreadyStored));
124                            }
125                            match rmp_serde::from_slice::<PaymentQuote>(&quote) {
126                                Ok(payment_quote) => {
127                                    let price = payment_quote.price;
128                                    debug!("Received quote from {peer_id_clone}: price = {price}");
129                                    Some(Ok((payment_quote, price)))
130                                }
131                                Err(e) => Some(Err(Error::Serialization(format!(
132                                    "Failed to deserialize quote from {peer_id_clone}: {e}"
133                                )))),
134                            }
135                        }
136                        ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
137                            Error::Protocol(format!("Quote error from {peer_id_clone}: {e}")),
138                        )),
139                        _ => None,
140                    },
141                    |e| {
142                        Error::Network(format!(
143                            "Failed to send quote request to {peer_id_clone}: {e}"
144                        ))
145                    },
146                    || Error::Timeout(format!("Timeout waiting for quote from {peer_id_clone}")),
147                )
148                .await;
149
150                let success = result.is_ok();
151                let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
152                record_peer_outcome(&node_clone, peer_id_clone, &addrs_clone, success, rtt_ms)
153                    .await;
154
155                (peer_id_clone, addrs_clone, result)
156            };
157
158            quote_futures.push(quote_future);
159        }
160
161        // Collect all responses with an overall timeout to prevent indefinite stalls.
162        // Over-query means we have 2x peers, so we can tolerate failures.
163        let mut quotes = Vec::with_capacity(over_query_count);
164        let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
165        let mut failures: Vec<String> = Vec::new();
166
167        let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
168            tokio::time::timeout(overall_timeout, async {
169                while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
170                    match quote_result {
171                        Ok((quote, price)) => {
172                            quotes.push((peer_id, addrs, quote, price));
173                        }
174                        Err(Error::AlreadyStored) => {
175                            info!("Peer {peer_id} reports chunk already stored");
176                            let dist = xor_distance(&peer_id, address);
177                            already_stored_peers.push((peer_id, dist));
178                        }
179                        Err(e) => {
180                            warn!("Failed to get quote from {peer_id}: {e}");
181                            failures.push(format!("{peer_id}: {e}"));
182                        }
183                    }
184                }
185                Ok(())
186            })
187            .await;
188
189        match collect_result {
190            Err(_elapsed) => {
191                warn!(
192                    "Quote collection timed out after {overall_timeout:?} for address {}",
193                    hex::encode(address)
194                );
195                // Fall through to check if we have enough quotes despite timeout.
196                // The timeout fires when slow peers haven't responded yet, but we
197                // may already have enough successful quotes from fast peers.
198            }
199            Ok(Err(e)) => return Err(e),
200            Ok(Ok(())) => {}
201        }
202
203        // Check already-stored: only count votes from the closest CLOSE_GROUP_SIZE peers.
204        if !already_stored_peers.is_empty() {
205            let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
206            for (peer_id, _, _, _) in &quotes {
207                all_peers_by_distance.push((false, xor_distance(peer_id, address)));
208            }
209            for (_, dist) in &already_stored_peers {
210                all_peers_by_distance.push((true, *dist));
211            }
212            all_peers_by_distance.sort_by_key(|a| a.1);
213
214            let close_group_stored = all_peers_by_distance
215                .iter()
216                .take(CLOSE_GROUP_SIZE)
217                .filter(|(is_stored, _)| *is_stored)
218                .count();
219
220            if close_group_stored >= CLOSE_GROUP_MAJORITY {
221                debug!(
222                    "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
223                    hex::encode(address)
224                );
225                return Err(Error::AlreadyStored);
226            }
227        }
228
229        let already_stored_count = already_stored_peers.len();
230        let failure_count = failures.len();
231        let quote_count = quotes.len();
232        let total_responses = quote_count + failure_count + already_stored_count;
233
234        if quotes.len() >= CLOSE_GROUP_SIZE {
235            // Sort by XOR distance to target, keep the closest CLOSE_GROUP_SIZE.
236            quotes.sort_by(|a, b| {
237                let dist_a = xor_distance(&a.0, address);
238                let dist_b = xor_distance(&b.0, address);
239                dist_a.cmp(&dist_b)
240            });
241            quotes.truncate(CLOSE_GROUP_SIZE);
242
243            info!(
244                "Collected {} quotes for address {} ({total_responses} responses: {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed)",
245                quotes.len(),
246                hex::encode(address),
247            );
248            return Ok(quotes);
249        }
250
251        Err(Error::InsufficientPeers(format!(
252            "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: {already_stored_count} already_stored, {failure_count} failed). Failures: [{}]",
253            failures.join("; ")
254        )))
255    }
256}