Skip to main content

ant_core/data/client/
chunk.rs

1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::adaptive::Outcome;
7use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
8use crate::data::client::peer_xor_distance;
9use crate::data::client::Client;
10use crate::data::error::{Error, Result};
11use ant_protocol::evm::{QuoteHash, TxHash};
12use ant_protocol::transport::{MultiAddr, PeerId};
13use ant_protocol::{
14    compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
15    ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
16    ProofType, XorName, CLOSE_GROUP_MAJORITY,
17};
18use bytes::Bytes;
19use futures::stream::{self, FuturesUnordered, StreamExt};
20use std::collections::HashMap;
21use std::future::Future;
22use std::time::{Duration, Instant};
23use tracing::{debug, info, warn};
24
25/// Data type identifier for chunks (used in quote requests).
26const CHUNK_DATA_TYPE: u32 = 0;
27
28/// Result of one sweep over a chunk's close group.
29///
30/// Either we got the chunk from some peer, or every peer in the group
31/// returned NotFound, timed out, or hit a transport / protocol error.
32/// The counts feed the retry decision (`is_authoritative_not_found`):
33/// only a *unanimous* NotFound from a *well-sampled* close group counts
34/// as authoritative data absence — anything else (a non-unanimous
35/// result, or a thin/under-sampled DHT walk) leaves room for the actual
36/// storer to be in the timeout / network-error / protocol-error bucket
37/// or outside the sampled view, and is worth a retry against a freshly
38/// re-walked close group.
39struct CloseGroupOutcome {
40    chunk: Option<DataChunk>,
41    queried: usize,
42    not_found: usize,
43    timeout: usize,
44    network_err: usize,
45    /// Counts peers that responded with a remote `Error` (e.g.
46    /// "Chunk verification failed") or any other protocol-level error
47    /// that classifies as `Error::Protocol`. Treated the same as
48    /// `timeout` / `network_err` for retry decisions: one peer's bad
49    /// response must not abort the whole close-group sweep — the
50    /// remaining peers might still have a clean copy.
51    protocol_err: usize,
52}
53
54/// `true` if the close-group sweep is strong enough evidence to
55/// conclude the chunk is genuinely absent, so retrying is pointless.
56///
57/// Two conditions, both required:
58///
59/// 1. *Unanimous*: every peer we managed to query responded with an
60///    authoritative NotFound (`not_found == queried`). An earlier
61///    version used a majority quorum (`not_found >= close_group_size /
62///    2 + 1`), but production traffic disproved that: storage
63///    replicates to `CLOSE_GROUP_MAJORITY` (4) of the K=7 close-group
64///    peers, so up to 3 peers legitimately don't store any given chunk
65///    and a `not_found=4 timeout=3` result is "3 storers we couldn't
66///    reach" plus "4 non-storers," not data loss.
67///
68/// 2. *Well-sampled*: at least `CLOSE_GROUP_MAJORITY` peers were
69///    queried. `close_group_peers` (via `find_closest_peers`) accepts
70///    any non-empty DHT result, so a thin/under-sampled walk can return
71///    1 or 2 peers. A `1/1` or `3/3` NotFound from such a walk is NOT
72///    authoritative — the real replica majority may sit entirely
73///    outside that narrow view. Requiring a majority-sized sample means
74///    a thin lookup falls through to the retry (which re-walks the DHT)
75///    instead of being declared a final absence.
76fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
77    queried >= CLOSE_GROUP_MAJORITY && not_found == queried
78}
79
80/// Store-response timeout for non-merkle chunk PUTs.
81const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
82
83/// Extra waves allowed after the computed diagnostic peer-sweep deadline.
84const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
85
86/// Result of fetching one chunk address from one close-group peer.
87pub struct ChunkPeerGetResult {
88    /// Peer queried for the chunk.
89    pub peer_id: PeerId,
90    /// Known network addresses used for the peer.
91    pub peer_addrs: Vec<MultiAddr>,
92    /// XOR distance from `peer_id` to the chunk address.
93    pub xor_distance: [u8; 32],
94    /// Per-peer fetch result.
95    pub chunk_result: Result<Option<DataChunk>>,
96}
97
98#[derive(Clone)]
99struct ChunkPeerGetTarget {
100    index: usize,
101    peer_id: PeerId,
102    peer_addrs: Vec<MultiAddr>,
103    xor_distance: [u8; 32],
104}
105
106fn chunk_peer_get_targets(
107    peers: Vec<(PeerId, Vec<MultiAddr>)>,
108    address: &XorName,
109) -> Vec<ChunkPeerGetTarget> {
110    peers
111        .into_iter()
112        .enumerate()
113        .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
114            index,
115            peer_id,
116            peer_addrs,
117            xor_distance: peer_xor_distance(&peer_id, address),
118        })
119        .collect()
120}
121
122fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
123    results.sort_by_key(|result| result.xor_distance);
124}
125
126fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
127    peer_count.min(close_group_size.max(1))
128}
129
130fn diagnostic_peer_get_overall_timeout(
131    per_peer_timeout: Duration,
132    target_count: usize,
133    concurrency_limit: usize,
134) -> Duration {
135    let concurrency_limit = concurrency_limit.max(1);
136    let peer_get_waves = target_count.div_ceil(concurrency_limit);
137    let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
138    let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
139
140    per_peer_timeout.saturating_mul(timeout_waves)
141}
142
143fn timed_out_chunk_peer_get_result(
144    target: &ChunkPeerGetTarget,
145    address: &XorName,
146    timeout: Duration,
147) -> ChunkPeerGetResult {
148    let addr_hex = hex::encode(address);
149    let timeout_secs = timeout.as_secs();
150    ChunkPeerGetResult {
151        peer_id: target.peer_id,
152        peer_addrs: target.peer_addrs.clone(),
153        xor_distance: target.xor_distance,
154        chunk_result: Err(Error::Timeout(format!(
155            "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
156            target.peer_id
157        ))),
158    }
159}
160
161fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
162    match detect_proof_type(proof) {
163        Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
164        _ => STORE_RESPONSE_TIMEOUT,
165    }
166}
167
168impl Client {
169    /// Run `chunk_get` and feed one byte-aware observation per call to
170    /// the adaptive fetch limiter. Use this from any consumer that
171    /// drives chunk-fetch concurrency from `controller().fetch.current()`
172    /// — the controller's window relies on every call along the hot
173    /// path producing an observation.
174    ///
175    /// Classifier semantics: see `chunk_get_outcome`. Most importantly,
176    /// `Ok(None)` is treated as `Outcome::Timeout`, not Success, so a
177    /// sustained run of close-group exhaustions correctly drives the
178    /// cap down rather than silently inflating it.
179    pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
180        self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
181            .await
182    }
183
184    pub(crate) async fn chunk_get_observed_from_closest_peers(
185        &self,
186        address: &XorName,
187        peer_count: usize,
188    ) -> Result<Option<DataChunk>> {
189        let started = Instant::now();
190        let result = self.chunk_get_from_closest_peers(address, peer_count).await;
191        let latency = started.elapsed();
192        let bytes = result
193            .as_ref()
194            .ok()
195            .and_then(Option::as_ref)
196            .map_or(0, |chunk| chunk.content.len() as u64);
197        self.controller()
198            .fetch
199            .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
200        result
201    }
202}
203
204/// Map a `chunk_get` outcome to an adaptive controller `Outcome`.
205///
206/// This is the result-aware classifier used by the file-download paths.
207/// It differs from `classify_error` in one critical way: an `Ok(None)`
208/// from `chunk_get` is `Outcome::Timeout`, not `Outcome::Success`. By
209/// the time `chunk_get` returns `Ok(None)` it has already exhausted
210/// the close group across its first attempt + retry sweep, so
211/// `Ok(None)` is the controller's load-shedding signal — a sustained
212/// run of them on a saturated home link is exactly the case where the
213/// cap should shrink.
214///
215/// Healthy returns (`Ok(Some(_))`) are Success regardless of how many
216/// internal peer attempts the chunk_get had to make. The controller
217/// does not need to see internal peer noise; that's noise about the
218/// production network's natural peer-side variability, not about the
219/// client's effective capacity.
220pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
221    match result {
222        Ok(Some(_)) => Outcome::Success,
223        Ok(None) => Outcome::Timeout,
224        Err(Error::Timeout(_)) => Outcome::Timeout,
225        Err(Error::Network(_)) => Outcome::NetworkError,
226        Err(_) => Outcome::ApplicationError,
227    }
228}
229
230impl Client {
231    /// Store a chunk on the Autonomi network with payment.
232    ///
233    /// Checks if the chunk already exists before paying. If it does,
234    /// returns the address immediately without incurring on-chain costs.
235    /// Otherwise collects quotes, pays on-chain, then stores with proof
236    /// to `CLOSE_GROUP_MAJORITY` peers.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if payment or the network operation fails.
241    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
242        let address = compute_address(&content);
243        let data_size = u64::try_from(content.len())
244            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
245
246        match self
247            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
248            .await
249        {
250            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
251            Err(Error::AlreadyStored) => {
252                debug!(
253                    "Chunk {} already stored on network, skipping payment",
254                    hex::encode(address)
255                );
256                Ok(address)
257            }
258            Err(e) => Err(e),
259        }
260    }
261
262    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set.
263    ///
264    /// Initially sends the PUT concurrently to the first
265    /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the
266    /// remaining peers in the quoted set until majority is reached or
267    /// all peers are exhausted.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
272    /// the chunk.
273    pub(crate) async fn chunk_put_to_close_group(
274        &self,
275        content: Bytes,
276        proof: Vec<u8>,
277        peers: &[(PeerId, Vec<MultiAddr>)],
278    ) -> Result<XorName> {
279        let address = compute_address(&content);
280
281        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
282        let (initial_peers, fallback_peers) = peers.split_at(initial_count);
283
284        let mut put_futures = FuturesUnordered::new();
285        for (peer_id, addrs) in initial_peers {
286            put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
287        }
288
289        let mut success_count = 0usize;
290        let mut failures: Vec<String> = Vec::new();
291        // Distinguish the *cause* of a quorum shortfall so it feeds the
292        // store AIMD limiter correctly (V2-468). If every failure was a
293        // structured remote application rejection (`Error::RemotePut` — the
294        // node responded and declined: pool-rejected / quote-stale /
295        // disk-full), the shortfall is not evidence the client is sending
296        // too fast and must not push the limiter down. Anything else
297        // (transport failure, or a different error) keeps it a real
298        // capacity signal. Hold the first remote rejection as the
299        // representative reason to surface when the shortfall is app-only.
300        let mut had_non_rejection_failure = false;
301        let mut first_remote_rejection: Option<Error> = None;
302        let mut fallback_iter = fallback_peers.iter();
303
304        while let Some((peer_id, result)) = put_futures.next().await {
305            match result {
306                Ok(_) => {
307                    success_count += 1;
308                    if success_count >= CLOSE_GROUP_MAJORITY {
309                        debug!(
310                            "Chunk {} stored on {success_count} peers (majority reached)",
311                            hex::encode(address)
312                        );
313                        return Ok(address);
314                    }
315                }
316                Err(e) => {
317                    warn!("Failed to store chunk on {peer_id}: {e}");
318                    failures.push(format!("{peer_id}: {e}"));
319                    if matches!(e, Error::RemotePut { .. }) {
320                        if first_remote_rejection.is_none() {
321                            first_remote_rejection = Some(e);
322                        }
323                    } else {
324                        had_non_rejection_failure = true;
325                    }
326
327                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
328                        debug!(
329                            "Falling back to peer {fb_peer} for chunk {}",
330                            hex::encode(address)
331                        );
332                        put_futures.push(self.spawn_chunk_put(
333                            content.clone(),
334                            proof.clone(),
335                            fb_peer,
336                            fb_addrs,
337                        ));
338                    }
339                }
340            }
341        }
342
343        // Quorum not reached. If the only failures were structured remote
344        // rejections, surface a representative `RemotePut` (classifies
345        // `ApplicationError`, still recoverable in the merkle retry path)
346        // so the shortfall doesn't suppress the store limiter. Otherwise
347        // it's a real capacity shortfall.
348        if !had_non_rejection_failure {
349            if let Some(remote_rejection) = first_remote_rejection {
350                return Err(remote_rejection);
351            }
352        }
353
354        Err(Error::InsufficientPeers(format!(
355            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
356            failures.join("; ")
357        )))
358    }
359
360    /// Spawn a chunk PUT future for a single peer.
361    fn spawn_chunk_put<'a>(
362        &'a self,
363        content: Bytes,
364        proof: Vec<u8>,
365        peer_id: &'a PeerId,
366        addrs: &'a [MultiAddr],
367    ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
368        let peer_id_owned = *peer_id;
369        async move {
370            let result = self
371                .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
372                .await;
373            (peer_id_owned, result)
374        }
375    }
376
377    /// Store a chunk on the Autonomi network with a pre-built payment proof.
378    ///
379    /// Sends to a single peer. Callers that need replication across the
380    /// close group should use `chunk_put_to_close_group` instead.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the network operation fails.
385    pub async fn chunk_put_with_proof(
386        &self,
387        content: Bytes,
388        proof: Vec<u8>,
389        target_peer: &PeerId,
390        peer_addrs: &[MultiAddr],
391    ) -> Result<XorName> {
392        let address = compute_address(&content);
393        let node = self.network().node();
394        let timeout =
395            store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
396        let timeout_secs = timeout.as_secs();
397
398        let request_id = self.next_request_id();
399        // `content` is a refcounted `Bytes` shared with the sibling
400        // close-group sends; pass it through directly so each peer shares
401        // the same backing buffer instead of deep-copying the 4 MB payload.
402        let request = ChunkPutRequest::with_payment(address, content, proof);
403        let message = ChunkMessage {
404            request_id,
405            body: ChunkMessageBody::PutRequest(request),
406        };
407        let message_bytes = message
408            .encode()
409            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
410
411        let addr_hex = hex::encode(address);
412
413        let result = send_and_await_chunk_response(
414            node,
415            target_peer,
416            message_bytes,
417            request_id,
418            timeout,
419            peer_addrs,
420            |body| match body {
421                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
422                    debug!("Chunk stored at {}", hex::encode(addr));
423                    Some(Ok(addr))
424                }
425                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
426                    address: addr,
427                }) => {
428                    debug!("Chunk already exists at {}", hex::encode(addr));
429                    Some(Ok(addr))
430                }
431                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
432                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
433                }
434                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
435                    // Preserve the structured remote reason instead of
436                    // flattening it into `Error::Protocol`. The node
437                    // responded, so the transport round-trip succeeded —
438                    // this is an application-level rejection and must not
439                    // suppress the store AIMD limiter (V2-468).
440                    Some(Err(Error::RemotePut {
441                        address: addr_hex.clone(),
442                        source: e,
443                    }))
444                }
445                _ => None,
446            },
447            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
448            || {
449                Error::Timeout(format!(
450                    "Timeout waiting for store response after {timeout_secs}s"
451                ))
452            },
453        )
454        .await;
455
456        result
457    }
458
459    /// Retrieve a chunk from the Autonomi network.
460    ///
461    /// Queries all peers in the close group for the chunk address,
462    /// returning the first successful response. This handles the case
463    /// where the storing peer differs from the first peer returned by
464    /// DHT routing.
465    ///
466    /// ## Adaptive controller feedback
467    ///
468    /// Each per-peer GET attempt is fed individually to the adaptive
469    /// fetch limiter via `controller().fetch.observe(...)`. This is
470    /// deliberately finer-grained than wrapping the outer `chunk_get`
471    /// with `observe_op`: when a chunk takes 6 peer tries to land,
472    /// 5 of them are real capacity signals (timeouts / network errors)
473    /// that should pull the cap down even if the chunk eventually
474    /// succeeds. The outer `Ok(_)` would mask all five as a single
475    /// `Outcome::Success`. See `adaptive::Outcome` for the per-attempt
476    /// classification rules used below.
477    ///
478    /// Callers should therefore NOT wrap `chunk_get` in `observe_op`.
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if the network operation fails.
483    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
484        self.chunk_get_from_closest_peers(address, self.config().close_group_size)
485            .await
486    }
487
488    /// Retrieve a chunk from the requested number of closest peers.
489    ///
490    /// Queries peers in XOR-distance order for the chunk address,
491    /// returning the first successful response. This handles the case
492    /// where the storing peer differs from the first peer returned by
493    /// DHT routing.
494    ///
495    /// # Errors
496    ///
497    /// Returns an error if the network operation fails.
498    pub async fn chunk_get_from_closest_peers(
499        &self,
500        address: &XorName,
501        peer_count: usize,
502    ) -> Result<Option<DataChunk>> {
503        // Check cache first, with integrity verification.
504        if let Some(cached) = self.chunk_cache().get(address) {
505            let computed = compute_address(&cached);
506            if computed == *address {
507                debug!("Cache hit for chunk {}", hex::encode(address));
508                return Ok(Some(DataChunk::new(*address, cached)));
509            }
510            // Cache entry corrupted — evict and fall through to network fetch.
511            debug!(
512                "Cache corruption detected for {}: evicting",
513                hex::encode(address)
514            );
515            self.chunk_cache().remove(address);
516        }
517
518        let addr_hex = hex::encode(address);
519
520        // First attempt against the current close-group view. A
521        // lookup/transport error here (e.g. close_group_peers' DHT walk
522        // momentarily returning an error, or InsufficientPeers from a
523        // thin routing table) is NOT fatal: fall through to the retry
524        // path exactly as a non-authoritative miss would. Otherwise one
525        // transient error on the *initial* close-group walk for a single
526        // chunk would fail an entire multi-hundred-chunk download. A
527        // zeroed outcome (queried=0) is never authoritative, so it flows
528        // straight to the retry below.
529        let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
530            Ok(outcome) => outcome,
531            Err(e) => {
532                info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
533                CloseGroupOutcome {
534                    chunk: None,
535                    queried: 0,
536                    not_found: 0,
537                    timeout: 0,
538                    network_err: 0,
539                    protocol_err: 0,
540                }
541            }
542        };
543        if let Some(chunk) = first.chunk {
544            self.chunk_cache().put(chunk.address, chunk.content.clone());
545            return Ok(Some(chunk));
546        }
547
548        // Only treat as authoritative absence when *every* queried peer
549        // responded NotFound. Anything less leaves the actual storer
550        // possibly in the timeout / network-error bucket, which a retry
551        // could reach.
552        if is_authoritative_not_found(first.not_found, first.queried) {
553            info!(
554                "chunk_get giving up on {addr_hex} (unanimous NotFound): \
555                 queried={} not_found={} timeout={} network_err={} protocol_err={}",
556                first.queried,
557                first.not_found,
558                first.timeout,
559                first.network_err,
560                first.protocol_err,
561            );
562            return Ok(None);
563        }
564
565        // Otherwise the failure looks like reachability (most peers timed out
566        // or hit transport errors). The chunk is most likely still on the
567        // network but the current close-group view either (a) caught a
568        // transient transport blip or (b) converged on the wrong neighbourhood
569        // because the routing table is thin. One retry against a freshly
570        // re-walked close group is the cheapest defence against both.
571        info!(
572            "chunk_get retrying {addr_hex} after reachability failure: \
573             queried={} not_found={} timeout={} network_err={} protocol_err={}",
574            first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
575        );
576
577        // Brief settle so any in-flight transport state can quiesce before
578        // we re-walk the DHT. Keep this small so we don't add meaningful
579        // latency to the genuinely-lost case (we already paid for one full
580        // close-group sweep before getting here).
581        tokio::time::sleep(Duration::from_secs(1)).await;
582
583        // If the retry's DHT lookup itself fails, treat that as "still
584        // couldn't find" rather than escalating the error — matches the
585        // semantics of the first attempt when peers are unreachable.
586        let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
587            Ok(o) => o,
588            Err(e) => {
589                info!(
590                    "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
591                     first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
592                    first.queried,
593                    first.not_found,
594                    first.timeout,
595                    first.network_err,
596                    first.protocol_err,
597                );
598                return Ok(None);
599            }
600        };
601        if let Some(chunk) = retry.chunk {
602            info!("chunk_get retry succeeded for {addr_hex}");
603            self.chunk_cache().put(chunk.address, chunk.content.clone());
604            return Ok(Some(chunk));
605        }
606
607        info!(
608            "chunk_get exhausted close group after retry for {addr_hex}: \
609             first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
610             retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
611            first.queried,
612            first.not_found,
613            first.timeout,
614            first.network_err,
615            first.protocol_err,
616            retry.queried,
617            retry.not_found,
618            retry.timeout,
619            retry.network_err,
620            retry.protocol_err,
621        );
622        Ok(None)
623    }
624
625    /// One sweep of the requested closest peers: fetch the closest peers
626    /// for `address` from the DHT and ask each for the chunk in turn,
627    /// returning on the first success.
628    async fn chunk_get_try_closest_peers(
629        &self,
630        address: &XorName,
631        peer_count: usize,
632    ) -> Result<CloseGroupOutcome> {
633        let peers = self.closest_peers(address, peer_count).await?;
634        let addr_hex = hex::encode(address);
635        let queried = peers.len();
636        let mut not_found = 0usize;
637        let mut timeout = 0usize;
638        let mut network_err = 0usize;
639        let mut protocol_err = 0usize;
640
641        for (peer, addrs) in &peers {
642            match self.chunk_get_from_peer(address, peer, addrs).await {
643                Ok(Some(chunk)) => {
644                    return Ok(CloseGroupOutcome {
645                        chunk: Some(chunk),
646                        queried,
647                        not_found,
648                        timeout,
649                        network_err,
650                        protocol_err,
651                    });
652                }
653                Ok(None) => {
654                    not_found += 1;
655                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
656                }
657                Err(Error::Timeout(_)) => {
658                    timeout += 1;
659                    debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
660                }
661                Err(Error::Network(_)) => {
662                    network_err += 1;
663                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
664                }
665                // A `Protocol` error here is the storer responding with
666                // `ChunkGetResponse::Error(...)` — e.g. "Chunk verification
667                // failed" from a peer that has a corrupted local copy.
668                // That's a per-peer problem, not a per-chunk one: the
669                // remaining peers might still have a clean copy, so
670                // continue the sweep rather than aborting it. Counted
671                // separately from network_err so the summary log still
672                // distinguishes "peer corrupted" from "peer unreachable".
673                Err(Error::Protocol(ref e)) => {
674                    protocol_err += 1;
675                    debug!(
676                        "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
677                    );
678                }
679                Err(e) => return Err(e),
680            }
681        }
682
683        Ok(CloseGroupOutcome {
684            chunk: None,
685            queried,
686            not_found,
687            timeout,
688            network_err,
689            protocol_err,
690        })
691    }
692
693    /// Retrieve a chunk from every peer in the close group.
694    ///
695    /// Unlike [`Client::chunk_get`], this method does not return early
696    /// after the first successful response. It returns one result per
697    /// close-group peer, sorted from closest XOR distance to furthest.
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if the close-group lookup fails.
702    pub async fn chunk_get_from_close_group(
703        &self,
704        address: &XorName,
705    ) -> Result<Vec<ChunkPeerGetResult>> {
706        self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
707            .await
708    }
709
710    /// Retrieve a chunk from the requested number of closest peers.
711    ///
712    /// Unlike [`Client::chunk_get_from_closest_peers`], this method does
713    /// not return early after the first successful response. It returns
714    /// one result per queried peer, sorted from closest XOR distance to
715    /// furthest.
716    ///
717    /// # Errors
718    ///
719    /// Returns an error if the DHT lookup fails.
720    pub async fn chunk_get_from_closest_peer_group(
721        &self,
722        address: &XorName,
723        peer_count: usize,
724    ) -> Result<Vec<ChunkPeerGetResult>> {
725        let peers = self.closest_peers(address, peer_count).await?;
726        let targets = chunk_peer_get_targets(peers, address);
727        let concurrency_limit =
728            diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
729        let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
730        let overall_timeout =
731            diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
732
733        let mut completed = vec![false; targets.len()];
734        let mut results = Vec::with_capacity(targets.len());
735        let mut get_results = stream::iter(targets.iter().cloned())
736            .map(|target| async move {
737                let chunk_result = self
738                    .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
739                    .await;
740
741                if let Ok(Some(chunk)) = &chunk_result {
742                    self.chunk_cache().put(chunk.address, chunk.content.clone());
743                }
744
745                (
746                    target.index,
747                    ChunkPeerGetResult {
748                        peer_id: target.peer_id,
749                        peer_addrs: target.peer_addrs,
750                        xor_distance: target.xor_distance,
751                        chunk_result,
752                    },
753                )
754            })
755            .buffer_unordered(concurrency_limit);
756
757        let collect_results = async {
758            while let Some((index, result)) = get_results.next().await {
759                completed[index] = true;
760                results.push(result);
761            }
762        };
763
764        if tokio::time::timeout(overall_timeout, collect_results)
765            .await
766            .is_err()
767        {
768            for target in &targets {
769                if !completed[target.index] {
770                    results.push(timed_out_chunk_peer_get_result(
771                        target,
772                        address,
773                        overall_timeout,
774                    ));
775                }
776            }
777        }
778
779        sort_chunk_peer_get_results(&mut results);
780        Ok(results)
781    }
782
783    /// Fetch a chunk from a specific peer.
784    async fn chunk_get_from_peer(
785        &self,
786        address: &XorName,
787        peer: &PeerId,
788        peer_addrs: &[MultiAddr],
789    ) -> Result<Option<DataChunk>> {
790        let node = self.network().node();
791        let request_id = self.next_request_id();
792        let request = ChunkGetRequest::new(*address);
793        let message = ChunkMessage {
794            request_id,
795            body: ChunkMessageBody::GetRequest(request),
796        };
797        let message_bytes = message
798            .encode()
799            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
800
801        let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
802        let addr_hex = hex::encode(address);
803        let timeout_secs = self.config().chunk_get_timeout_secs;
804
805        let result = send_and_await_chunk_response(
806            node,
807            peer,
808            message_bytes,
809            request_id,
810            timeout,
811            peer_addrs,
812            |body| match body {
813                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
814                    address: addr,
815                    content,
816                }) => {
817                    if addr != *address {
818                        return Some(Err(Error::InvalidData(format!(
819                            "Mismatched chunk address: expected {addr_hex}, got {}",
820                            hex::encode(addr)
821                        ))));
822                    }
823
824                    let computed = compute_address(&content);
825                    if computed != addr {
826                        return Some(Err(Error::InvalidData(format!(
827                            "Invalid chunk content: expected hash {addr_hex}, got {}",
828                            hex::encode(computed)
829                        ))));
830                    }
831
832                    debug!(
833                        "Retrieved chunk {} ({} bytes) from peer {peer}",
834                        hex::encode(addr),
835                        content.len()
836                    );
837                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
838                }
839                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
840                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
841                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
842                )),
843                _ => None,
844            },
845            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
846            || {
847                Error::Timeout(format!(
848                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
849                ))
850            },
851        )
852        .await;
853
854        result
855    }
856
857    /// Check if a chunk exists on the network.
858    ///
859    /// # Errors
860    ///
861    /// Returns an error if the network operation fails.
862    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
863        self.chunk_get(address).await.map(|opt| opt.is_some())
864    }
865
866    /// Finalize a single-chunk publish after an external signer has paid.
867    ///
868    /// Single-chunk analogue of [`Client::finalize_upload`]. Takes a
869    /// [`PreparedChunk`] (from [`Client::prepare_chunk_payment`]) and a
870    /// `quote_hash -> tx_hash` map containing receipts for every non-zero
871    /// quote in the chunk's payment. Builds the `PaymentProof` and stores
872    /// the chunk on `CLOSE_GROUP_MAJORITY` peers, returning its address.
873    ///
874    /// Wave-batch payment shape only. Single-chunk publishes don't need
875    /// Merkle batching: one chunk's worth of quotes is well below the
876    /// wave-batch threshold.
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if the proof construction fails (e.g. missing
881    /// `tx_hash` for a non-zero quote) or if fewer than
882    /// `CLOSE_GROUP_MAJORITY` peers accept the chunk.
883    pub async fn finalize_chunk(
884        &self,
885        prepared: PreparedChunk,
886        tx_hash_map: &HashMap<QuoteHash, TxHash>,
887    ) -> Result<XorName> {
888        let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
889        // finalize_batch_payment returns one PaidChunk per PreparedChunk
890        // input; we passed exactly one. If that invariant is ever violated
891        // it's an upstream bug — fail loudly rather than silently address-0.
892        let chunk = paid.pop().ok_or_else(|| {
893            Error::Payment(
894                "finalize_batch_payment returned no paid chunks for a single \
895                 prepared chunk — internal invariant violated"
896                    .into(),
897            )
898        })?;
899        self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
900            .await
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use super::*;
907    use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
908
909    /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
910    const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
911    /// Sentinel byte used to represent an unknown/unrecognized proof tag.
912    const UNKNOWN_PROOF_TAG: u8 = 0xff;
913    /// XorName byte width used by test peer IDs and distances.
914    const TEST_XORNAME_BYTE_LEN: usize = 32;
915    /// Last byte position in the test XOR distance arrays.
916    const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
917
918    fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
919        let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
920        xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
921
922        ChunkPeerGetResult {
923            peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
924            peer_addrs: Vec::new(),
925            xor_distance,
926            chunk_result: Ok(None),
927        }
928    }
929
930    #[test]
931    fn authoritative_not_found_requires_unanimous_well_sampled_response() {
932        // Unanimous AND well-sampled: every queried peer of a full
933        // close group said NotFound. The only safe stop.
934        assert!(is_authoritative_not_found(7, 7));
935        // Unanimous with exactly a majority-sized sample is also
936        // authoritative.
937        assert!(is_authoritative_not_found(
938            CLOSE_GROUP_MAJORITY,
939            CLOSE_GROUP_MAJORITY
940        ));
941
942        // Unanimous but UNDER-sampled: a thin DHT walk returning 1 or 3
943        // peers, all NotFound, is NOT authoritative — the real replica
944        // majority may sit entirely outside that narrow view. Must
945        // retry (re-walk the DHT).
946        assert!(!is_authoritative_not_found(1, 1));
947        assert!(!is_authoritative_not_found(3, 3));
948        assert!(!is_authoritative_not_found(
949            CLOSE_GROUP_MAJORITY - 1,
950            CLOSE_GROUP_MAJORITY - 1
951        ));
952
953        // Not unanimous: 4-of-7 / 6-of-7 NotFound leaves storers in the
954        // timeout bucket. Must retry.
955        assert!(!is_authoritative_not_found(4, 7));
956        assert!(!is_authoritative_not_found(6, 7));
957
958        // Pure-reachability failure — must retry.
959        assert!(!is_authoritative_not_found(0, 7));
960
961        // Defensive: a zeroed outcome (e.g. the first attempt's
962        // close-group lookup errored) is never authoritative.
963        assert!(!is_authoritative_not_found(0, 0));
964    }
965
966    #[test]
967    fn chunk_get_outcome_classifies_each_result_kind() {
968        // Success: chunk_get returned a chunk, regardless of how many
969        // internal peer attempts it took.
970        let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
971        assert_eq!(
972            chunk_get_outcome(&Ok(Some(chunk))),
973            Outcome::Success,
974            "found-chunk must be Success",
975        );
976
977        // Ok(None): chunk_get exhausted the close group across first
978        // attempt + retry. This is the load-shedding signal — count it
979        // as Timeout so a sustained run of them on a saturated link
980        // shrinks the cap.
981        assert_eq!(
982            chunk_get_outcome(&Ok(None)),
983            Outcome::Timeout,
984            "Ok(None) must be Timeout — that's the controller's load-shedding signal",
985        );
986
987        // Capacity signals from explicit error variants.
988        assert_eq!(
989            chunk_get_outcome(&Err(Error::Timeout("t".into()))),
990            Outcome::Timeout,
991        );
992        assert_eq!(
993            chunk_get_outcome(&Err(Error::Network("n".into()))),
994            Outcome::NetworkError,
995        );
996
997        // Unexpected error variant (e.g. Protocol) — propagates out of
998        // chunk_get to the caller and is not a capacity signal.
999        assert_eq!(
1000            chunk_get_outcome(&Err(Error::Protocol("p".into()))),
1001            Outcome::ApplicationError,
1002        );
1003    }
1004
1005    #[test]
1006    fn single_node_proof_uses_store_response_timeout() {
1007        let timeout =
1008            store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
1009
1010        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1011    }
1012
1013    #[test]
1014    fn unknown_proof_uses_store_response_timeout() {
1015        let timeout =
1016            store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
1017
1018        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1019    }
1020
1021    #[test]
1022    fn merkle_proof_uses_configured_store_timeout() {
1023        let timeout =
1024            store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
1025
1026        assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
1027    }
1028
1029    #[test]
1030    fn chunk_peer_get_results_sort_by_xor_distance() {
1031        let mut results = vec![
1032            chunk_peer_get_result(3, 3),
1033            chunk_peer_get_result(1, 1),
1034            chunk_peer_get_result(2, 2),
1035        ];
1036
1037        sort_chunk_peer_get_results(&mut results);
1038
1039        let ordered_distances = results
1040            .iter()
1041            .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
1042            .collect::<Vec<_>>();
1043        assert_eq!(ordered_distances, vec![1, 2, 3]);
1044    }
1045
1046    #[test]
1047    fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
1048        const PER_PEER_TIMEOUT_SECS: u64 = 10;
1049        const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
1050        const TARGET_COUNT: usize = 7;
1051        const CONCURRENCY_LIMIT: usize = 7;
1052
1053        let timeout = diagnostic_peer_get_overall_timeout(
1054            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1055            TARGET_COUNT,
1056            CONCURRENCY_LIMIT,
1057        );
1058
1059        assert_eq!(
1060            timeout,
1061            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1062        );
1063    }
1064
1065    #[test]
1066    fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
1067        const PER_PEER_TIMEOUT_SECS: u64 = 10;
1068        const TARGET_COUNT: usize = 20;
1069        const CLOSE_GROUP_SIZE: usize = 7;
1070        const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
1071
1072        let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
1073        let timeout = diagnostic_peer_get_overall_timeout(
1074            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1075            TARGET_COUNT,
1076            concurrency_limit,
1077        );
1078
1079        assert_eq!(
1080            timeout,
1081            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1082        );
1083    }
1084
1085    /// Regression: the default `merkle_store_timeout_secs` must be at
1086    /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus
1087    /// padding. If either side moves and this invariant breaks, the
1088    /// client will give up on chunks the storer is still verifying.
1089    /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the
1090    /// derivation.
1091    #[test]
1092    fn default_merkle_store_timeout_satisfies_storer_invariant() {
1093        use crate::data::client::ClientConfig;
1094        const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
1095        const MIN_PADDING_SECS: u64 = 30;
1096        let config = ClientConfig::default();
1097        assert!(
1098            config.merkle_store_timeout_secs
1099                >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
1100            "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
1101            config.merkle_store_timeout_secs,
1102            STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
1103            MIN_PADDING_SECS,
1104        );
1105    }
1106
1107    /// Regression: the non-merkle PUT path uses the hardcoded
1108    /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config
1109    /// `merkle_store_timeout_secs`. If a future refactor accidentally
1110    /// routes non-merkle PUTs through the merkle field they'd inherit
1111    /// the 270 s value and silently regress non-merkle latency.
1112    /// `store_response_timeout_for_proof` with a non-merkle proof tag
1113    /// must return the const regardless of what merkle timeout is
1114    /// passed.
1115    #[test]
1116    fn non_merkle_put_ignores_merkle_timeout_value() {
1117        let absurd_merkle_timeout = 9_999;
1118        for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
1119            let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
1120            assert_eq!(
1121                timeout, STORE_RESPONSE_TIMEOUT,
1122                "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
1123            );
1124        }
1125    }
1126}