Skip to main content

ant_core/data/client/
chunk.rs

1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::adaptive::Outcome;
7use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
8use crate::data::client::peer_cache::record_peer_outcome;
9use crate::data::client::peer_xor_distance;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{QuoteHash, TxHash};
13use ant_protocol::transport::{MultiAddr, PeerId};
14use ant_protocol::{
15    compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
16    ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
17    ProofType, XorName, CLOSE_GROUP_MAJORITY,
18};
19use bytes::Bytes;
20use futures::stream::{self, FuturesUnordered, StreamExt};
21use std::collections::HashMap;
22use std::future::Future;
23use std::time::{Duration, Instant};
24use tracing::{debug, info, warn};
25
26/// Data type identifier for chunks (used in quote requests).
27const CHUNK_DATA_TYPE: u32 = 0;
28
29/// Result of one sweep over a chunk's close group.
30///
31/// Either we got the chunk from some peer, or every peer in the group
32/// returned NotFound, timed out, or hit a transport / protocol error.
33/// The counts feed the retry decision (`is_authoritative_not_found`):
34/// only a *unanimous* NotFound from a *well-sampled* close group counts
35/// as authoritative data absence — anything else (a non-unanimous
36/// result, or a thin/under-sampled DHT walk) leaves room for the actual
37/// storer to be in the timeout / network-error / protocol-error bucket
38/// or outside the sampled view, and is worth a retry against a freshly
39/// re-walked close group.
40struct CloseGroupOutcome {
41    chunk: Option<DataChunk>,
42    queried: usize,
43    not_found: usize,
44    timeout: usize,
45    network_err: usize,
46    /// Counts peers that responded with a remote `Error` (e.g.
47    /// "Chunk verification failed") or any other protocol-level error
48    /// that classifies as `Error::Protocol`. Treated the same as
49    /// `timeout` / `network_err` for retry decisions: one peer's bad
50    /// response must not abort the whole close-group sweep — the
51    /// remaining peers might still have a clean copy.
52    protocol_err: usize,
53}
54
55/// `true` if the close-group sweep is strong enough evidence to
56/// conclude the chunk is genuinely absent, so retrying is pointless.
57///
58/// Two conditions, both required:
59///
60/// 1. *Unanimous*: every peer we managed to query responded with an
61///    authoritative NotFound (`not_found == queried`). An earlier
62///    version used a majority quorum (`not_found >= close_group_size /
63///    2 + 1`), but production traffic disproved that: storage
64///    replicates to `CLOSE_GROUP_MAJORITY` (4) of the K=7 close-group
65///    peers, so up to 3 peers legitimately don't store any given chunk
66///    and a `not_found=4 timeout=3` result is "3 storers we couldn't
67///    reach" plus "4 non-storers," not data loss.
68///
69/// 2. *Well-sampled*: at least `CLOSE_GROUP_MAJORITY` peers were
70///    queried. `close_group_peers` (via `find_closest_peers`) accepts
71///    any non-empty DHT result, so a thin/under-sampled walk can return
72///    1 or 2 peers. A `1/1` or `3/3` NotFound from such a walk is NOT
73///    authoritative — the real replica majority may sit entirely
74///    outside that narrow view. Requiring a majority-sized sample means
75///    a thin lookup falls through to the retry (which re-walks the DHT)
76///    instead of being declared a final absence.
77fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
78    queried >= CLOSE_GROUP_MAJORITY && not_found == queried
79}
80
81/// Store-response timeout for non-merkle chunk PUTs.
82const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
83
84/// Extra waves allowed after the computed diagnostic peer-sweep deadline.
85const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
86
87/// Result of fetching one chunk address from one close-group peer.
88pub struct ChunkPeerGetResult {
89    /// Peer queried for the chunk.
90    pub peer_id: PeerId,
91    /// Known network addresses used for the peer.
92    pub peer_addrs: Vec<MultiAddr>,
93    /// XOR distance from `peer_id` to the chunk address.
94    pub xor_distance: [u8; 32],
95    /// Per-peer fetch result.
96    pub chunk_result: Result<Option<DataChunk>>,
97}
98
99#[derive(Clone)]
100struct ChunkPeerGetTarget {
101    index: usize,
102    peer_id: PeerId,
103    peer_addrs: Vec<MultiAddr>,
104    xor_distance: [u8; 32],
105}
106
107fn chunk_peer_get_targets(
108    peers: Vec<(PeerId, Vec<MultiAddr>)>,
109    address: &XorName,
110) -> Vec<ChunkPeerGetTarget> {
111    peers
112        .into_iter()
113        .enumerate()
114        .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
115            index,
116            peer_id,
117            peer_addrs,
118            xor_distance: peer_xor_distance(&peer_id, address),
119        })
120        .collect()
121}
122
123fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
124    results.sort_by_key(|result| result.xor_distance);
125}
126
127fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
128    peer_count.min(close_group_size.max(1))
129}
130
131fn diagnostic_peer_get_overall_timeout(
132    per_peer_timeout: Duration,
133    target_count: usize,
134    concurrency_limit: usize,
135) -> Duration {
136    let concurrency_limit = concurrency_limit.max(1);
137    let peer_get_waves = target_count.div_ceil(concurrency_limit);
138    let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
139    let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
140
141    per_peer_timeout.saturating_mul(timeout_waves)
142}
143
144fn timed_out_chunk_peer_get_result(
145    target: &ChunkPeerGetTarget,
146    address: &XorName,
147    timeout: Duration,
148) -> ChunkPeerGetResult {
149    let addr_hex = hex::encode(address);
150    let timeout_secs = timeout.as_secs();
151    ChunkPeerGetResult {
152        peer_id: target.peer_id,
153        peer_addrs: target.peer_addrs.clone(),
154        xor_distance: target.xor_distance,
155        chunk_result: Err(Error::Timeout(format!(
156            "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
157            target.peer_id
158        ))),
159    }
160}
161
162fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
163    match detect_proof_type(proof) {
164        Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
165        _ => STORE_RESPONSE_TIMEOUT,
166    }
167}
168
169impl Client {
170    /// Run `chunk_get` and feed one byte-aware observation per call to
171    /// the adaptive fetch limiter. Use this from any consumer that
172    /// drives chunk-fetch concurrency from `controller().fetch.current()`
173    /// — the controller's window relies on every call along the hot
174    /// path producing an observation.
175    ///
176    /// Classifier semantics: see `chunk_get_outcome`. Most importantly,
177    /// `Ok(None)` is treated as `Outcome::Timeout`, not Success, so a
178    /// sustained run of close-group exhaustions correctly drives the
179    /// cap down rather than silently inflating it.
180    pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
181        let started = Instant::now();
182        let result = self.chunk_get(address).await;
183        let latency = started.elapsed();
184        let bytes = result
185            .as_ref()
186            .ok()
187            .and_then(Option::as_ref)
188            .map_or(0, |chunk| chunk.content.len() as u64);
189        self.controller()
190            .fetch
191            .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
192        result
193    }
194}
195
196/// Map a `chunk_get` outcome to an adaptive controller `Outcome`.
197///
198/// This is the result-aware classifier used by the file-download paths.
199/// It differs from `classify_error` in one critical way: an `Ok(None)`
200/// from `chunk_get` is `Outcome::Timeout`, not `Outcome::Success`. By
201/// the time `chunk_get` returns `Ok(None)` it has already exhausted
202/// the close group across its first attempt + retry sweep, so
203/// `Ok(None)` is the controller's load-shedding signal — a sustained
204/// run of them on a saturated home link is exactly the case where the
205/// cap should shrink.
206///
207/// Healthy returns (`Ok(Some(_))`) are Success regardless of how many
208/// internal peer attempts the chunk_get had to make. The controller
209/// does not need to see internal peer noise; that's noise about the
210/// production network's natural peer-side variability, not about the
211/// client's effective capacity.
212pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
213    match result {
214        Ok(Some(_)) => Outcome::Success,
215        Ok(None) => Outcome::Timeout,
216        Err(Error::Timeout(_)) => Outcome::Timeout,
217        Err(Error::Network(_)) => Outcome::NetworkError,
218        Err(_) => Outcome::ApplicationError,
219    }
220}
221
222impl Client {
223    /// Store a chunk on the Autonomi network with payment.
224    ///
225    /// Checks if the chunk already exists before paying. If it does,
226    /// returns the address immediately without incurring on-chain costs.
227    /// Otherwise collects quotes, pays on-chain, then stores with proof
228    /// to `CLOSE_GROUP_MAJORITY` peers.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if payment or the network operation fails.
233    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
234        let address = compute_address(&content);
235        let data_size = u64::try_from(content.len())
236            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
237
238        match self
239            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
240            .await
241        {
242            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
243            Err(Error::AlreadyStored) => {
244                debug!(
245                    "Chunk {} already stored on network, skipping payment",
246                    hex::encode(address)
247                );
248                Ok(address)
249            }
250            Err(e) => Err(e),
251        }
252    }
253
254    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set.
255    ///
256    /// Initially sends the PUT concurrently to the first
257    /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the
258    /// remaining peers in the quoted set until majority is reached or
259    /// all peers are exhausted.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
264    /// the chunk.
265    pub(crate) async fn chunk_put_to_close_group(
266        &self,
267        content: Bytes,
268        proof: Vec<u8>,
269        peers: &[(PeerId, Vec<MultiAddr>)],
270    ) -> Result<XorName> {
271        let address = compute_address(&content);
272
273        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
274        let (initial_peers, fallback_peers) = peers.split_at(initial_count);
275
276        let mut put_futures = FuturesUnordered::new();
277        for (peer_id, addrs) in initial_peers {
278            put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
279        }
280
281        let mut success_count = 0usize;
282        let mut failures: Vec<String> = Vec::new();
283        let mut fallback_iter = fallback_peers.iter();
284
285        while let Some((peer_id, result)) = put_futures.next().await {
286            match result {
287                Ok(_) => {
288                    success_count += 1;
289                    if success_count >= CLOSE_GROUP_MAJORITY {
290                        debug!(
291                            "Chunk {} stored on {success_count} peers (majority reached)",
292                            hex::encode(address)
293                        );
294                        return Ok(address);
295                    }
296                }
297                Err(e) => {
298                    warn!("Failed to store chunk on {peer_id}: {e}");
299                    failures.push(format!("{peer_id}: {e}"));
300
301                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
302                        debug!(
303                            "Falling back to peer {fb_peer} for chunk {}",
304                            hex::encode(address)
305                        );
306                        put_futures.push(self.spawn_chunk_put(
307                            content.clone(),
308                            proof.clone(),
309                            fb_peer,
310                            fb_addrs,
311                        ));
312                    }
313                }
314            }
315        }
316
317        Err(Error::InsufficientPeers(format!(
318            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
319            failures.join("; ")
320        )))
321    }
322
323    /// Spawn a chunk PUT future for a single peer.
324    fn spawn_chunk_put<'a>(
325        &'a self,
326        content: Bytes,
327        proof: Vec<u8>,
328        peer_id: &'a PeerId,
329        addrs: &'a [MultiAddr],
330    ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
331        let peer_id_owned = *peer_id;
332        async move {
333            let result = self
334                .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
335                .await;
336            (peer_id_owned, result)
337        }
338    }
339
340    /// Store a chunk on the Autonomi network with a pre-built payment proof.
341    ///
342    /// Sends to a single peer. Callers that need replication across the
343    /// close group should use `chunk_put_to_close_group` instead.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if the network operation fails.
348    pub async fn chunk_put_with_proof(
349        &self,
350        content: Bytes,
351        proof: Vec<u8>,
352        target_peer: &PeerId,
353        peer_addrs: &[MultiAddr],
354    ) -> Result<XorName> {
355        let address = compute_address(&content);
356        let node = self.network().node();
357        let timeout =
358            store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
359        let timeout_secs = timeout.as_secs();
360
361        let request_id = self.next_request_id();
362        // `content` is a refcounted `Bytes` shared with the sibling
363        // close-group sends; pass it through directly so each peer shares
364        // the same backing buffer instead of deep-copying the 4 MB payload.
365        let request = ChunkPutRequest::with_payment(address, content, proof);
366        let message = ChunkMessage {
367            request_id,
368            body: ChunkMessageBody::PutRequest(request),
369        };
370        let message_bytes = message
371            .encode()
372            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
373
374        let addr_hex = hex::encode(address);
375
376        let result = send_and_await_chunk_response(
377            node,
378            target_peer,
379            message_bytes,
380            request_id,
381            timeout,
382            peer_addrs,
383            |body| match body {
384                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
385                    debug!("Chunk stored at {}", hex::encode(addr));
386                    Some(Ok(addr))
387                }
388                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
389                    address: addr,
390                }) => {
391                    debug!("Chunk already exists at {}", hex::encode(addr));
392                    Some(Ok(addr))
393                }
394                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
395                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
396                }
397                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
398                    Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
399                )),
400                _ => None,
401            },
402            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
403            || {
404                Error::Timeout(format!(
405                    "Timeout waiting for store response after {timeout_secs}s"
406                ))
407            },
408        )
409        .await;
410
411        // No RTT recorded on the PUT path: the wall-clock is dominated by
412        // the ~4 MB payload upload, which reflects the uploader's uplink
413        // rather than the peer's responsiveness. Quote-path and GET-path
414        // RTTs still feed quality scoring.
415        record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
416
417        result
418    }
419
420    /// Retrieve a chunk from the Autonomi network.
421    ///
422    /// Queries all peers in the close group for the chunk address,
423    /// returning the first successful response. This handles the case
424    /// where the storing peer differs from the first peer returned by
425    /// DHT routing.
426    ///
427    /// ## Adaptive controller feedback
428    ///
429    /// Each per-peer GET attempt is fed individually to the adaptive
430    /// fetch limiter via `controller().fetch.observe(...)`. This is
431    /// deliberately finer-grained than wrapping the outer `chunk_get`
432    /// with `observe_op`: when a chunk takes 6 peer tries to land,
433    /// 5 of them are real capacity signals (timeouts / network errors)
434    /// that should pull the cap down even if the chunk eventually
435    /// succeeds. The outer `Ok(_)` would mask all five as a single
436    /// `Outcome::Success`. See `adaptive::Outcome` for the per-attempt
437    /// classification rules used below.
438    ///
439    /// Callers should therefore NOT wrap `chunk_get` in `observe_op`.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if the network operation fails.
444    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
445        self.chunk_get_from_closest_peers(address, self.config().close_group_size)
446            .await
447    }
448
449    /// Retrieve a chunk from the requested number of closest peers.
450    ///
451    /// Queries peers in XOR-distance order for the chunk address,
452    /// returning the first successful response. This handles the case
453    /// where the storing peer differs from the first peer returned by
454    /// DHT routing.
455    ///
456    /// # Errors
457    ///
458    /// Returns an error if the network operation fails.
459    pub async fn chunk_get_from_closest_peers(
460        &self,
461        address: &XorName,
462        peer_count: usize,
463    ) -> Result<Option<DataChunk>> {
464        // Check cache first, with integrity verification.
465        if let Some(cached) = self.chunk_cache().get(address) {
466            let computed = compute_address(&cached);
467            if computed == *address {
468                debug!("Cache hit for chunk {}", hex::encode(address));
469                return Ok(Some(DataChunk::new(*address, cached)));
470            }
471            // Cache entry corrupted — evict and fall through to network fetch.
472            debug!(
473                "Cache corruption detected for {}: evicting",
474                hex::encode(address)
475            );
476            self.chunk_cache().remove(address);
477        }
478
479        let addr_hex = hex::encode(address);
480
481        // First attempt against the current close-group view. A
482        // lookup/transport error here (e.g. close_group_peers' DHT walk
483        // momentarily returning an error, or InsufficientPeers from a
484        // thin routing table) is NOT fatal: fall through to the retry
485        // path exactly as a non-authoritative miss would. Otherwise one
486        // transient error on the *initial* close-group walk for a single
487        // chunk would fail an entire multi-hundred-chunk download. A
488        // zeroed outcome (queried=0) is never authoritative, so it flows
489        // straight to the retry below.
490        let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
491            Ok(outcome) => outcome,
492            Err(e) => {
493                info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
494                CloseGroupOutcome {
495                    chunk: None,
496                    queried: 0,
497                    not_found: 0,
498                    timeout: 0,
499                    network_err: 0,
500                    protocol_err: 0,
501                }
502            }
503        };
504        if let Some(chunk) = first.chunk {
505            self.chunk_cache().put(chunk.address, chunk.content.clone());
506            return Ok(Some(chunk));
507        }
508
509        // Only treat as authoritative absence when *every* queried peer
510        // responded NotFound. Anything less leaves the actual storer
511        // possibly in the timeout / network-error bucket, which a retry
512        // could reach.
513        if is_authoritative_not_found(first.not_found, first.queried) {
514            info!(
515                "chunk_get giving up on {addr_hex} (unanimous NotFound): \
516                 queried={} not_found={} timeout={} network_err={} protocol_err={}",
517                first.queried,
518                first.not_found,
519                first.timeout,
520                first.network_err,
521                first.protocol_err,
522            );
523            return Ok(None);
524        }
525
526        // Otherwise the failure looks like reachability (most peers timed out
527        // or hit transport errors). The chunk is most likely still on the
528        // network but the current close-group view either (a) caught a
529        // transient transport blip or (b) converged on the wrong neighbourhood
530        // because the routing table is thin. One retry against a freshly
531        // re-walked close group is the cheapest defence against both.
532        info!(
533            "chunk_get retrying {addr_hex} after reachability failure: \
534             queried={} not_found={} timeout={} network_err={} protocol_err={}",
535            first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
536        );
537
538        // Brief settle so any in-flight transport state can quiesce before
539        // we re-walk the DHT. Keep this small so we don't add meaningful
540        // latency to the genuinely-lost case (we already paid for one full
541        // close-group sweep before getting here).
542        tokio::time::sleep(Duration::from_secs(1)).await;
543
544        // If the retry's DHT lookup itself fails, treat that as "still
545        // couldn't find" rather than escalating the error — matches the
546        // semantics of the first attempt when peers are unreachable.
547        let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
548            Ok(o) => o,
549            Err(e) => {
550                info!(
551                    "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
552                     first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
553                    first.queried,
554                    first.not_found,
555                    first.timeout,
556                    first.network_err,
557                    first.protocol_err,
558                );
559                return Ok(None);
560            }
561        };
562        if let Some(chunk) = retry.chunk {
563            info!("chunk_get retry succeeded for {addr_hex}");
564            self.chunk_cache().put(chunk.address, chunk.content.clone());
565            return Ok(Some(chunk));
566        }
567
568        info!(
569            "chunk_get exhausted close group after retry for {addr_hex}: \
570             first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
571             retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
572            first.queried,
573            first.not_found,
574            first.timeout,
575            first.network_err,
576            first.protocol_err,
577            retry.queried,
578            retry.not_found,
579            retry.timeout,
580            retry.network_err,
581            retry.protocol_err,
582        );
583        Ok(None)
584    }
585
586    /// One sweep of the requested closest peers: fetch the closest peers
587    /// for `address` from the DHT and ask each for the chunk in turn,
588    /// returning on the first success.
589    async fn chunk_get_try_closest_peers(
590        &self,
591        address: &XorName,
592        peer_count: usize,
593    ) -> Result<CloseGroupOutcome> {
594        let peers = self.closest_peers(address, peer_count).await?;
595        let addr_hex = hex::encode(address);
596        let queried = peers.len();
597        let mut not_found = 0usize;
598        let mut timeout = 0usize;
599        let mut network_err = 0usize;
600        let mut protocol_err = 0usize;
601
602        for (peer, addrs) in &peers {
603            match self.chunk_get_from_peer(address, peer, addrs).await {
604                Ok(Some(chunk)) => {
605                    return Ok(CloseGroupOutcome {
606                        chunk: Some(chunk),
607                        queried,
608                        not_found,
609                        timeout,
610                        network_err,
611                        protocol_err,
612                    });
613                }
614                Ok(None) => {
615                    not_found += 1;
616                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
617                }
618                Err(Error::Timeout(_)) => {
619                    timeout += 1;
620                    debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
621                }
622                Err(Error::Network(_)) => {
623                    network_err += 1;
624                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
625                }
626                // A `Protocol` error here is the storer responding with
627                // `ChunkGetResponse::Error(...)` — e.g. "Chunk verification
628                // failed" from a peer that has a corrupted local copy.
629                // That's a per-peer problem, not a per-chunk one: the
630                // remaining peers might still have a clean copy, so
631                // continue the sweep rather than aborting it. Counted
632                // separately from network_err so the summary log still
633                // distinguishes "peer corrupted" from "peer unreachable".
634                Err(Error::Protocol(ref e)) => {
635                    protocol_err += 1;
636                    debug!(
637                        "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
638                    );
639                }
640                Err(e) => return Err(e),
641            }
642        }
643
644        Ok(CloseGroupOutcome {
645            chunk: None,
646            queried,
647            not_found,
648            timeout,
649            network_err,
650            protocol_err,
651        })
652    }
653
654    /// Retrieve a chunk from every peer in the close group.
655    ///
656    /// Unlike [`Client::chunk_get`], this method does not return early
657    /// after the first successful response. It returns one result per
658    /// close-group peer, sorted from closest XOR distance to furthest.
659    ///
660    /// # Errors
661    ///
662    /// Returns an error if the close-group lookup fails.
663    pub async fn chunk_get_from_close_group(
664        &self,
665        address: &XorName,
666    ) -> Result<Vec<ChunkPeerGetResult>> {
667        self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
668            .await
669    }
670
671    /// Retrieve a chunk from the requested number of closest peers.
672    ///
673    /// Unlike [`Client::chunk_get_from_closest_peers`], this method does
674    /// not return early after the first successful response. It returns
675    /// one result per queried peer, sorted from closest XOR distance to
676    /// furthest.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if the DHT lookup fails.
681    pub async fn chunk_get_from_closest_peer_group(
682        &self,
683        address: &XorName,
684        peer_count: usize,
685    ) -> Result<Vec<ChunkPeerGetResult>> {
686        let peers = self.closest_peers(address, peer_count).await?;
687        let targets = chunk_peer_get_targets(peers, address);
688        let concurrency_limit =
689            diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
690        let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
691        let overall_timeout =
692            diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
693
694        let mut completed = vec![false; targets.len()];
695        let mut results = Vec::with_capacity(targets.len());
696        let mut get_results = stream::iter(targets.iter().cloned())
697            .map(|target| async move {
698                let chunk_result = self
699                    .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
700                    .await;
701
702                if let Ok(Some(chunk)) = &chunk_result {
703                    self.chunk_cache().put(chunk.address, chunk.content.clone());
704                }
705
706                (
707                    target.index,
708                    ChunkPeerGetResult {
709                        peer_id: target.peer_id,
710                        peer_addrs: target.peer_addrs,
711                        xor_distance: target.xor_distance,
712                        chunk_result,
713                    },
714                )
715            })
716            .buffer_unordered(concurrency_limit);
717
718        let collect_results = async {
719            while let Some((index, result)) = get_results.next().await {
720                completed[index] = true;
721                results.push(result);
722            }
723        };
724
725        if tokio::time::timeout(overall_timeout, collect_results)
726            .await
727            .is_err()
728        {
729            for target in &targets {
730                if !completed[target.index] {
731                    results.push(timed_out_chunk_peer_get_result(
732                        target,
733                        address,
734                        overall_timeout,
735                    ));
736                }
737            }
738        }
739
740        sort_chunk_peer_get_results(&mut results);
741        Ok(results)
742    }
743
744    /// Fetch a chunk from a specific peer.
745    async fn chunk_get_from_peer(
746        &self,
747        address: &XorName,
748        peer: &PeerId,
749        peer_addrs: &[MultiAddr],
750    ) -> Result<Option<DataChunk>> {
751        let node = self.network().node();
752        let request_id = self.next_request_id();
753        let request = ChunkGetRequest::new(*address);
754        let message = ChunkMessage {
755            request_id,
756            body: ChunkMessageBody::GetRequest(request),
757        };
758        let message_bytes = message
759            .encode()
760            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
761
762        let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
763        let addr_hex = hex::encode(address);
764        let timeout_secs = self.config().chunk_get_timeout_secs;
765
766        let start = Instant::now();
767        let result = send_and_await_chunk_response(
768            node,
769            peer,
770            message_bytes,
771            request_id,
772            timeout,
773            peer_addrs,
774            |body| match body {
775                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
776                    address: addr,
777                    content,
778                }) => {
779                    if addr != *address {
780                        return Some(Err(Error::InvalidData(format!(
781                            "Mismatched chunk address: expected {addr_hex}, got {}",
782                            hex::encode(addr)
783                        ))));
784                    }
785
786                    let computed = compute_address(&content);
787                    if computed != addr {
788                        return Some(Err(Error::InvalidData(format!(
789                            "Invalid chunk content: expected hash {addr_hex}, got {}",
790                            hex::encode(computed)
791                        ))));
792                    }
793
794                    debug!(
795                        "Retrieved chunk {} ({} bytes) from peer {peer}",
796                        hex::encode(addr),
797                        content.len()
798                    );
799                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
800                }
801                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
802                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
803                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
804                )),
805                _ => None,
806            },
807            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
808            || {
809                Error::Timeout(format!(
810                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
811                ))
812            },
813        )
814        .await;
815
816        let success = result.is_ok();
817        let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
818        record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
819
820        result
821    }
822
823    /// Check if a chunk exists on the network.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the network operation fails.
828    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
829        self.chunk_get(address).await.map(|opt| opt.is_some())
830    }
831
832    /// Finalize a single-chunk publish after an external signer has paid.
833    ///
834    /// Single-chunk analogue of [`Client::finalize_upload`]. Takes a
835    /// [`PreparedChunk`] (from [`Client::prepare_chunk_payment`]) and a
836    /// `quote_hash -> tx_hash` map containing receipts for every non-zero
837    /// quote in the chunk's payment. Builds the `PaymentProof` and stores
838    /// the chunk on `CLOSE_GROUP_MAJORITY` peers, returning its address.
839    ///
840    /// Wave-batch payment shape only. Single-chunk publishes don't need
841    /// Merkle batching: one chunk's worth of quotes is well below the
842    /// wave-batch threshold.
843    ///
844    /// # Errors
845    ///
846    /// Returns an error if the proof construction fails (e.g. missing
847    /// `tx_hash` for a non-zero quote) or if fewer than
848    /// `CLOSE_GROUP_MAJORITY` peers accept the chunk.
849    pub async fn finalize_chunk(
850        &self,
851        prepared: PreparedChunk,
852        tx_hash_map: &HashMap<QuoteHash, TxHash>,
853    ) -> Result<XorName> {
854        let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
855        // finalize_batch_payment returns one PaidChunk per PreparedChunk
856        // input; we passed exactly one. If that invariant is ever violated
857        // it's an upstream bug — fail loudly rather than silently address-0.
858        let chunk = paid.pop().ok_or_else(|| {
859            Error::Payment(
860                "finalize_batch_payment returned no paid chunks for a single \
861                 prepared chunk — internal invariant violated"
862                    .into(),
863            )
864        })?;
865        self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
866            .await
867    }
868}
869
870#[cfg(test)]
871mod tests {
872    use super::*;
873    use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
874
875    /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
876    const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
877    /// Sentinel byte used to represent an unknown/unrecognized proof tag.
878    const UNKNOWN_PROOF_TAG: u8 = 0xff;
879    /// XorName byte width used by test peer IDs and distances.
880    const TEST_XORNAME_BYTE_LEN: usize = 32;
881    /// Last byte position in the test XOR distance arrays.
882    const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
883
884    fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
885        let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
886        xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
887
888        ChunkPeerGetResult {
889            peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
890            peer_addrs: Vec::new(),
891            xor_distance,
892            chunk_result: Ok(None),
893        }
894    }
895
896    #[test]
897    fn authoritative_not_found_requires_unanimous_well_sampled_response() {
898        // Unanimous AND well-sampled: every queried peer of a full
899        // close group said NotFound. The only safe stop.
900        assert!(is_authoritative_not_found(7, 7));
901        // Unanimous with exactly a majority-sized sample is also
902        // authoritative.
903        assert!(is_authoritative_not_found(
904            CLOSE_GROUP_MAJORITY,
905            CLOSE_GROUP_MAJORITY
906        ));
907
908        // Unanimous but UNDER-sampled: a thin DHT walk returning 1 or 3
909        // peers, all NotFound, is NOT authoritative — the real replica
910        // majority may sit entirely outside that narrow view. Must
911        // retry (re-walk the DHT).
912        assert!(!is_authoritative_not_found(1, 1));
913        assert!(!is_authoritative_not_found(3, 3));
914        assert!(!is_authoritative_not_found(
915            CLOSE_GROUP_MAJORITY - 1,
916            CLOSE_GROUP_MAJORITY - 1
917        ));
918
919        // Not unanimous: 4-of-7 / 6-of-7 NotFound leaves storers in the
920        // timeout bucket. Must retry.
921        assert!(!is_authoritative_not_found(4, 7));
922        assert!(!is_authoritative_not_found(6, 7));
923
924        // Pure-reachability failure — must retry.
925        assert!(!is_authoritative_not_found(0, 7));
926
927        // Defensive: a zeroed outcome (e.g. the first attempt's
928        // close-group lookup errored) is never authoritative.
929        assert!(!is_authoritative_not_found(0, 0));
930    }
931
932    #[test]
933    fn chunk_get_outcome_classifies_each_result_kind() {
934        // Success: chunk_get returned a chunk, regardless of how many
935        // internal peer attempts it took.
936        let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
937        assert_eq!(
938            chunk_get_outcome(&Ok(Some(chunk))),
939            Outcome::Success,
940            "found-chunk must be Success",
941        );
942
943        // Ok(None): chunk_get exhausted the close group across first
944        // attempt + retry. This is the load-shedding signal — count it
945        // as Timeout so a sustained run of them on a saturated link
946        // shrinks the cap.
947        assert_eq!(
948            chunk_get_outcome(&Ok(None)),
949            Outcome::Timeout,
950            "Ok(None) must be Timeout — that's the controller's load-shedding signal",
951        );
952
953        // Capacity signals from explicit error variants.
954        assert_eq!(
955            chunk_get_outcome(&Err(Error::Timeout("t".into()))),
956            Outcome::Timeout,
957        );
958        assert_eq!(
959            chunk_get_outcome(&Err(Error::Network("n".into()))),
960            Outcome::NetworkError,
961        );
962
963        // Unexpected error variant (e.g. Protocol) — propagates out of
964        // chunk_get to the caller and is not a capacity signal.
965        assert_eq!(
966            chunk_get_outcome(&Err(Error::Protocol("p".into()))),
967            Outcome::ApplicationError,
968        );
969    }
970
971    #[test]
972    fn single_node_proof_uses_store_response_timeout() {
973        let timeout =
974            store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
975
976        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
977    }
978
979    #[test]
980    fn unknown_proof_uses_store_response_timeout() {
981        let timeout =
982            store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
983
984        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
985    }
986
987    #[test]
988    fn merkle_proof_uses_configured_store_timeout() {
989        let timeout =
990            store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
991
992        assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
993    }
994
995    #[test]
996    fn chunk_peer_get_results_sort_by_xor_distance() {
997        let mut results = vec![
998            chunk_peer_get_result(3, 3),
999            chunk_peer_get_result(1, 1),
1000            chunk_peer_get_result(2, 2),
1001        ];
1002
1003        sort_chunk_peer_get_results(&mut results);
1004
1005        let ordered_distances = results
1006            .iter()
1007            .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
1008            .collect::<Vec<_>>();
1009        assert_eq!(ordered_distances, vec![1, 2, 3]);
1010    }
1011
1012    #[test]
1013    fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
1014        const PER_PEER_TIMEOUT_SECS: u64 = 10;
1015        const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
1016        const TARGET_COUNT: usize = 7;
1017        const CONCURRENCY_LIMIT: usize = 7;
1018
1019        let timeout = diagnostic_peer_get_overall_timeout(
1020            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1021            TARGET_COUNT,
1022            CONCURRENCY_LIMIT,
1023        );
1024
1025        assert_eq!(
1026            timeout,
1027            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1028        );
1029    }
1030
1031    #[test]
1032    fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
1033        const PER_PEER_TIMEOUT_SECS: u64 = 10;
1034        const TARGET_COUNT: usize = 20;
1035        const CLOSE_GROUP_SIZE: usize = 7;
1036        const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
1037
1038        let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
1039        let timeout = diagnostic_peer_get_overall_timeout(
1040            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1041            TARGET_COUNT,
1042            concurrency_limit,
1043        );
1044
1045        assert_eq!(
1046            timeout,
1047            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1048        );
1049    }
1050
1051    /// Regression: the default `merkle_store_timeout_secs` must be at
1052    /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus
1053    /// padding. If either side moves and this invariant breaks, the
1054    /// client will give up on chunks the storer is still verifying.
1055    /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the
1056    /// derivation.
1057    #[test]
1058    fn default_merkle_store_timeout_satisfies_storer_invariant() {
1059        use crate::data::client::ClientConfig;
1060        const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
1061        const MIN_PADDING_SECS: u64 = 30;
1062        let config = ClientConfig::default();
1063        assert!(
1064            config.merkle_store_timeout_secs
1065                >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
1066            "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
1067            config.merkle_store_timeout_secs,
1068            STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
1069            MIN_PADDING_SECS,
1070        );
1071    }
1072
1073    /// Regression: the non-merkle PUT path uses the hardcoded
1074    /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config
1075    /// `merkle_store_timeout_secs`. If a future refactor accidentally
1076    /// routes non-merkle PUTs through the merkle field they'd inherit
1077    /// the 270 s value and silently regress non-merkle latency.
1078    /// `store_response_timeout_for_proof` with a non-merkle proof tag
1079    /// must return the const regardless of what merkle timeout is
1080    /// passed.
1081    #[test]
1082    fn non_merkle_put_ignores_merkle_timeout_value() {
1083        let absurd_merkle_timeout = 9_999;
1084        for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
1085            let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
1086            assert_eq!(
1087                timeout, STORE_RESPONSE_TIMEOUT,
1088                "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
1089            );
1090        }
1091    }
1092}