Skip to main content

ant_core/data/client/
chunk.rs

1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::adaptive::Outcome;
7use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
8use crate::data::client::peer_xor_distance;
9use crate::data::client::Client;
10use crate::data::error::{Error, Result};
11use ant_protocol::evm::{QuoteHash, TxHash};
12use ant_protocol::transport::{MultiAddr, PeerId};
13use ant_protocol::{
14    compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
15    ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
16    ProofType, ProtocolError, XorName, CLOSE_GROUP_MAJORITY,
17};
18use bytes::Bytes;
19use futures::stream::{self, FuturesUnordered, StreamExt};
20use std::collections::HashMap;
21use std::time::{Duration, Instant};
22use tracing::{debug, info, warn};
23
24/// Data type identifier for chunks (used in quote requests).
25const CHUNK_DATA_TYPE: u32 = 0;
26
27/// Why a single-peer PUT was declined. Drives the surfaced aggregate error
28/// and keeps the store AIMD limiter honest — only genuine local backpressure
29/// (a PUT-response `Timeout`) is a "client is sending too fast" signal; a node
30/// that responds with a structured rejection is an application-level decline
31/// (ADR-0002 / V2-468), and a bare dial/relay failure is remote peer churn,
32/// not local capacity (V2-554).
33#[derive(Clone, Copy)]
34enum PutRejection {
35    /// Node is out of storage (`ProtocolError::StorageFailed`) — try a
36    /// further peer.
37    Full,
38    /// Payment did not clear the node's local price floor, or the proof's
39    /// issuers are not close enough in this peer's view
40    /// (`ProtocolError::PaymentFailed`), or the node asked for more than was
41    /// paid (`ChunkPutResponse::PaymentRequired` → [`Error::Payment`]) — skip
42    /// this peer, do not re-quote.
43    PriceFloor,
44    /// Some other structured remote rejection.
45    OtherRemote,
46    /// The peer accepted the connection but did not answer the PUT within the
47    /// deadline (`Error::Timeout`). This is the genuine local-backpressure
48    /// signal: under real congestion the client's own requests time out, so a
49    /// shortfall carrying any timeout stays a capacity signal (V2-554).
50    Timeout,
51    /// A dial/relay/transport failure — the peer could not be reached at all
52    /// (`Error::Network` and other non-response errors), typically a dead or
53    /// stale relayed DHT address. This is remote peer churn, not local
54    /// backpressure, so a shortfall made up purely of these must not push the
55    /// store AIMD limiter down (V2-554).
56    Dial,
57}
58
59/// Classify a failed single-peer PUT (ADR-0002 / V2-468 / V2-554). A
60/// `RemotePut` carries the node's structured `ProtocolError`; a
61/// `PaymentRequired` response surfaces as [`Error::Payment`]; a
62/// [`Error::Timeout`] is genuine local backpressure; anything else is a
63/// dial/relay failure (remote churn).
64fn classify_put_failure(error: &Error) -> PutRejection {
65    match error {
66        Error::RemotePut { source, .. } => match source {
67            ProtocolError::StorageFailed(_) => PutRejection::Full,
68            ProtocolError::PaymentFailed(_) => PutRejection::PriceFloor,
69            _ => PutRejection::OtherRemote,
70        },
71        // A `PaymentRequired` PUT response (the node wants more than was paid)
72        // arrives as `Error::Payment`. It is a structured application-level
73        // decline — skip the peer and advance fallback, exactly like a
74        // price-floor `PaymentFailed` — not a transport shortfall, so it must
75        // not push the store AIMD limiter down (ADR-0002 / V2-468).
76        Error::Payment(_) => PutRejection::PriceFloor,
77        // The peer did not answer in time: genuine local backpressure.
78        Error::Timeout(_) => PutRejection::Timeout,
79        // Could not reach the peer at all: dial/relay churn (remote), not a
80        // local-capacity signal.
81        _ => PutRejection::Dial,
82    }
83}
84
85/// Decide the error for a close-group store that fell short of quorum.
86///
87/// Only genuine local backpressure should push the store AIMD limiter down.
88/// The failure mix decides which signal to surface:
89///
90/// 1. **Any PUT-response timeout** (`timeout > 0`): the client's own requests
91///    are timing out — genuine local congestion. Surface `InsufficientPeers`
92///    (classified `NetworkError`) so the limiter still backs off (V2-554).
93/// 2. **No timeouts, no dial failures** — every failure was an application
94///    decline (full / price-floor / `PaymentRequired` / other remote
95///    rejection): surface the representative application error so the shortfall
96///    classifies `ApplicationError` and does not suppress the limiter
97///    (ADR-0002 / V2-468).
98/// 3. **No timeouts, but dial/relay failures present**: the shortfall is
99///    close-group dial churn (dead/stale relayed peer addresses) — remote peer
100///    churn, not local capacity. Surface [`Error::CloseGroupShortfall`]
101///    (classified `ApplicationError`) so it does NOT push the limiter down
102///    (V2-554). Still recoverable/retryable.
103fn put_shortfall_error(
104    timeout: usize,
105    dial: usize,
106    first_app_rejection: Option<Error>,
107    shortfall_message: String,
108) -> Error {
109    if timeout > 0 {
110        return Error::InsufficientPeers(shortfall_message);
111    }
112    if dial == 0 {
113        if let Some(app_rejection) = first_app_rejection {
114            return app_rejection;
115        }
116    }
117    Error::CloseGroupShortfall(shortfall_message)
118}
119
120/// Result of one sweep over a chunk's close group.
121///
122/// Either we got the chunk from some peer, or every peer in the group
123/// returned NotFound, timed out, or hit a transport / protocol error.
124/// The counts feed the retry decision (`is_authoritative_not_found`):
125/// only a *unanimous* NotFound from a *well-sampled* close group counts
126/// as authoritative data absence — anything else (a non-unanimous
127/// result, or a thin/under-sampled DHT walk) leaves room for the actual
128/// storer to be in the timeout / network-error / protocol-error bucket
129/// or outside the sampled view, and is worth a retry against a freshly
130/// re-walked close group.
131struct CloseGroupOutcome {
132    chunk: Option<DataChunk>,
133    queried: usize,
134    not_found: usize,
135    timeout: usize,
136    network_err: usize,
137    /// Counts peers that responded with a remote `Error` (e.g.
138    /// "Chunk verification failed") or any other protocol-level error
139    /// that classifies as `Error::Protocol`. Treated the same as
140    /// `timeout` / `network_err` for retry decisions: one peer's bad
141    /// response must not abort the whole close-group sweep — the
142    /// remaining peers might still have a clean copy.
143    protocol_err: usize,
144}
145
146/// `true` if the close-group sweep is strong enough evidence to
147/// conclude the chunk is genuinely absent, so retrying is pointless.
148///
149/// Two conditions, both required:
150///
151/// 1. *Unanimous*: every peer we managed to query responded with an
152///    authoritative NotFound (`not_found == queried`). An earlier
153///    version used a majority quorum (`not_found >= close_group_size /
154///    2 + 1`), but production traffic disproved that: storage
155///    replicates to `CLOSE_GROUP_MAJORITY` (4) of the K=7 close-group
156///    peers, so up to 3 peers legitimately don't store any given chunk
157///    and a `not_found=4 timeout=3` result is "3 storers we couldn't
158///    reach" plus "4 non-storers," not data loss.
159///
160/// 2. *Well-sampled*: at least `CLOSE_GROUP_MAJORITY` peers were
161///    queried. `closest_peers` (via `find_closest_peers`) accepts
162///    any non-empty DHT result, so a thin/under-sampled walk can return
163///    1 or 2 peers. A `1/1` or `3/3` NotFound from such a walk is NOT
164///    authoritative — the real replica majority may sit entirely
165///    outside that narrow view. Requiring a majority-sized sample means
166///    a thin lookup falls through to the retry (which re-walks the DHT)
167///    instead of being declared a final absence.
168fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
169    queried >= CLOSE_GROUP_MAJORITY && not_found == queried
170}
171
172/// Store-response timeout for non-merkle chunk PUTs.
173const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
174
175/// Extra waves allowed after the computed diagnostic peer-sweep deadline.
176const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
177
178/// Result of fetching one chunk address from one close-group peer.
179pub struct ChunkPeerGetResult {
180    /// Peer queried for the chunk.
181    pub peer_id: PeerId,
182    /// Known network addresses used for the peer.
183    pub peer_addrs: Vec<MultiAddr>,
184    /// XOR distance from `peer_id` to the chunk address.
185    pub xor_distance: [u8; 32],
186    /// Per-peer fetch result.
187    pub chunk_result: Result<Option<DataChunk>>,
188}
189
190#[derive(Clone)]
191struct ChunkPeerGetTarget {
192    index: usize,
193    peer_id: PeerId,
194    peer_addrs: Vec<MultiAddr>,
195    xor_distance: [u8; 32],
196}
197
198fn chunk_peer_get_targets(
199    peers: Vec<(PeerId, Vec<MultiAddr>)>,
200    address: &XorName,
201) -> Vec<ChunkPeerGetTarget> {
202    peers
203        .into_iter()
204        .enumerate()
205        .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
206            index,
207            peer_id,
208            peer_addrs,
209            xor_distance: peer_xor_distance(&peer_id, address),
210        })
211        .collect()
212}
213
214fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
215    results.sort_by_key(|result| result.xor_distance);
216}
217
218fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
219    peer_count.min(close_group_size.max(1))
220}
221
222fn diagnostic_peer_get_overall_timeout(
223    per_peer_timeout: Duration,
224    target_count: usize,
225    concurrency_limit: usize,
226) -> Duration {
227    let concurrency_limit = concurrency_limit.max(1);
228    let peer_get_waves = target_count.div_ceil(concurrency_limit);
229    let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
230    let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
231
232    per_peer_timeout.saturating_mul(timeout_waves)
233}
234
235fn timed_out_chunk_peer_get_result(
236    target: &ChunkPeerGetTarget,
237    address: &XorName,
238    timeout: Duration,
239) -> ChunkPeerGetResult {
240    let addr_hex = hex::encode(address);
241    let timeout_secs = timeout.as_secs();
242    ChunkPeerGetResult {
243        peer_id: target.peer_id,
244        peer_addrs: target.peer_addrs.clone(),
245        xor_distance: target.xor_distance,
246        chunk_result: Err(Error::Timeout(format!(
247            "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
248            target.peer_id
249        ))),
250    }
251}
252
253fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
254    match detect_proof_type(proof) {
255        Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
256        _ => STORE_RESPONSE_TIMEOUT,
257    }
258}
259
260impl Client {
261    /// Run `chunk_get` and feed one byte-aware observation per call to
262    /// the adaptive fetch limiter. Use this from any consumer that
263    /// drives chunk-fetch concurrency from `controller().fetch.current()`
264    /// — the controller's window relies on every call along the hot
265    /// path producing an observation.
266    ///
267    /// Classifier semantics: see `chunk_get_outcome`. Most importantly,
268    /// `Ok(None)` is treated as `Outcome::Timeout`, not Success, so a
269    /// sustained run of close-group exhaustions correctly drives the
270    /// cap down rather than silently inflating it.
271    pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
272        self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
273            .await
274    }
275
276    pub(crate) async fn chunk_get_observed_from_closest_peers(
277        &self,
278        address: &XorName,
279        peer_count: usize,
280    ) -> Result<Option<DataChunk>> {
281        let started = Instant::now();
282        let result = self.chunk_get_from_closest_peers(address, peer_count).await;
283        let latency = started.elapsed();
284        let bytes = result
285            .as_ref()
286            .ok()
287            .and_then(Option::as_ref)
288            .map_or(0, |chunk| chunk.content.len() as u64);
289        self.controller()
290            .fetch
291            .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
292        result
293    }
294}
295
296/// Map a `chunk_get` outcome to an adaptive controller `Outcome`.
297///
298/// This is the result-aware classifier used by the file-download paths.
299/// It differs from `classify_error` in one critical way: an `Ok(None)`
300/// from `chunk_get` is `Outcome::Timeout`, not `Outcome::Success`. By
301/// the time `chunk_get` returns `Ok(None)` it has already exhausted
302/// the close group across its first attempt + retry sweep, so
303/// `Ok(None)` is the controller's load-shedding signal — a sustained
304/// run of them on a saturated home link is exactly the case where the
305/// cap should shrink.
306///
307/// Healthy returns (`Ok(Some(_))`) are Success regardless of how many
308/// internal peer attempts the chunk_get had to make. The controller
309/// does not need to see internal peer noise; that's noise about the
310/// production network's natural peer-side variability, not about the
311/// client's effective capacity.
312pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
313    match result {
314        Ok(Some(_)) => Outcome::Success,
315        Ok(None) => Outcome::Timeout,
316        Err(Error::Timeout(_)) => Outcome::Timeout,
317        Err(Error::Network(_)) => Outcome::NetworkError,
318        Err(_) => Outcome::ApplicationError,
319    }
320}
321
322impl Client {
323    /// Store a chunk on the Autonomi network with payment.
324    ///
325    /// Checks if the chunk already exists before paying. If it does,
326    /// returns the address immediately without incurring on-chain costs.
327    /// Otherwise collects quotes, pays on-chain, then stores with proof
328    /// to `CLOSE_GROUP_MAJORITY` peers.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if payment or the network operation fails.
333    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
334        let address = compute_address(&content);
335        let data_size = u64::try_from(content.len())
336            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
337
338        match self
339            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
340            .await
341        {
342            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
343            Err(Error::AlreadyStored) => {
344                debug!(
345                    "Chunk {} already stored on network, skipping payment",
346                    hex::encode(address)
347                );
348                Ok(address)
349            }
350            Err(e) => Err(e),
351        }
352    }
353
354    /// Test-only: pay for `content`, then store it with `dead_count`
355    /// unreachable peers prepended to the real put-target set.
356    ///
357    /// Every initial send hits a dead peer and fails, so the store can only
358    /// reach quorum by falling back through the real put-targets (the closest-K
359    /// set the quote plan already returned), reusing the same `ProofOfPayment`.
360    /// Pass `dead_count >= CLOSE_GROUP_MAJORITY` so a full quorum's worth of
361    /// replacements must come from the fallback; a success proves the fallback
362    /// works end-to-end.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if payment fails or quorum cannot be reached.
367    #[cfg(feature = "test-utils")]
368    pub async fn chunk_put_with_dead_initial_peers(
369        &self,
370        content: Bytes,
371        dead_count: usize,
372    ) -> Result<XorName> {
373        let address = compute_address(&content);
374        let data_size = u64::try_from(content.len())
375            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
376        let (proof, real_peers) = self
377            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
378            .await?;
379        // Unreachable peers (random id, no addresses) first: every initial send
380        // fails, so quorum can only be reached by falling back through the real
381        // put-target set that follows.
382        let mut peers: Vec<(PeerId, Vec<MultiAddr>)> = (0..dead_count)
383            .map(|_| (PeerId::random(), Vec::new()))
384            .collect();
385        peers.extend(real_peers);
386        self.chunk_put_to_close_group(content, proof, &peers).await
387    }
388
389    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers, falling back past full or
390    /// over-priced members of the supplied put-target set (ADR-0002).
391    ///
392    /// Sends the PUT concurrently to the first `CLOSE_GROUP_MAJORITY` peers. On
393    /// each failure it advances to the next peer in `peers` — which the caller
394    /// supplies as the chunk's closest ~K neighbourhood, so no further DHT
395    /// lookup is needed. Every peer reuses the same payment proof: a node
396    /// accepts it as long as one of the proof's quote issuers is within that
397    /// peer's own local closest view, so the client never needs to re-quote or
398    /// re-pay to route around a full node.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
403    /// the chunk.
404    pub(crate) async fn chunk_put_to_close_group(
405        &self,
406        content: Bytes,
407        proof: Vec<u8>,
408        peers: &[(PeerId, Vec<MultiAddr>)],
409    ) -> Result<XorName> {
410        let address = compute_address(&content);
411
412        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
413        let (initial_peers, fallback_peers) = peers.split_at(initial_count);
414        let mut fallback_iter = fallback_peers.iter();
415
416        let mut put_futures = FuturesUnordered::new();
417        for (peer_id, addrs) in initial_peers {
418            put_futures.push(self.spawn_chunk_put(
419                content.clone(),
420                proof.clone(),
421                *peer_id,
422                addrs.clone(),
423            ));
424        }
425
426        let mut success_count = 0usize;
427        let mut failures: Vec<String> = Vec::new();
428        // Tally the *cause* of each failure. The store AIMD limiter must only be
429        // pushed down by a transport shortfall (V2-468): a node that responds —
430        // a structured `RemotePut` decline, or `PaymentRequired` surfacing as
431        // `Error::Payment` — declined at the application layer and is not
432        // evidence the client is sending too fast. The per-cause counts also
433        // surface a legible aggregate reason; hold the first application-level
434        // rejection as the representative error.
435        let mut full = 0usize;
436        let mut price_floor = 0usize;
437        let mut other_remote = 0usize;
438        let mut timeout = 0usize;
439        let mut dial = 0usize;
440        let mut first_app_rejection: Option<Error> = None;
441
442        while let Some((peer_id, result)) = put_futures.next().await {
443            match result {
444                Ok(_) => {
445                    success_count += 1;
446                    if success_count >= CLOSE_GROUP_MAJORITY {
447                        debug!(
448                            "Chunk {} stored on {success_count} peers (majority reached)",
449                            hex::encode(address)
450                        );
451                        return Ok(address);
452                    }
453                }
454                Err(e) => {
455                    warn!("Failed to store chunk on {peer_id}: {e}");
456                    failures.push(format!("{peer_id}: {e}"));
457                    match classify_put_failure(&e) {
458                        PutRejection::Full => full += 1,
459                        PutRejection::PriceFloor => price_floor += 1,
460                        PutRejection::OtherRemote => other_remote += 1,
461                        PutRejection::Timeout => timeout += 1,
462                        PutRejection::Dial => dial += 1,
463                    }
464                    // An application-level decline is `RemotePut` (a structured
465                    // node rejection) or `Error::Payment` (`PaymentRequired`):
466                    // capture the first so an all-application shortfall surfaces
467                    // as `ApplicationError`, not `InsufficientPeers`
468                    // (`NetworkError`), and never suppresses the limiter.
469                    if matches!(e, Error::RemotePut { .. } | Error::Payment(_))
470                        && first_app_rejection.is_none()
471                    {
472                        first_app_rejection = Some(e);
473                    }
474
475                    // Advance to the next peer in the put-target set, reusing
476                    // the same proof.
477                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
478                        debug!(
479                            "Falling back to peer {fb_peer} for chunk {}",
480                            hex::encode(address)
481                        );
482                        put_futures.push(self.spawn_chunk_put(
483                            content.clone(),
484                            proof.clone(),
485                            *fb_peer,
486                            fb_addrs.clone(),
487                        ));
488                    }
489                }
490            }
491        }
492
493        // Quorum not reached. A timeout-bearing shortfall is genuine local
494        // backpressure (capacity signal); an application-only shortfall surfaces
495        // the representative app error; a pure dial-churn shortfall surfaces a
496        // neutral `CloseGroupShortfall` (V2-554). See `put_shortfall_error`.
497        let aggregate = format!(
498            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY} \
499             (full: {full}, price-floor: {price_floor}, other-rejection: {other_remote}, \
500             timeout: {timeout}, dial: {dial}). Failures: [{}]",
501            failures.join("; ")
502        );
503        Err(put_shortfall_error(
504            timeout,
505            dial,
506            first_app_rejection,
507            aggregate,
508        ))
509    }
510
511    /// Build a chunk PUT future for a single peer. Takes owned peer data so
512    /// the future can outlive a fallback queue entry popped per iteration.
513    async fn spawn_chunk_put(
514        &self,
515        content: Bytes,
516        proof: Vec<u8>,
517        peer_id: PeerId,
518        addrs: Vec<MultiAddr>,
519    ) -> (PeerId, Result<XorName>) {
520        let result = self
521            .chunk_put_with_proof(content, proof, &peer_id, &addrs)
522            .await;
523        (peer_id, result)
524    }
525
526    /// Store a chunk on the Autonomi network with a pre-built payment proof.
527    ///
528    /// Sends to a single peer. Callers that need replication across the
529    /// close group should use `chunk_put_to_close_group` instead.
530    ///
531    /// # Errors
532    ///
533    /// Returns an error if the network operation fails.
534    pub async fn chunk_put_with_proof(
535        &self,
536        content: Bytes,
537        proof: Vec<u8>,
538        target_peer: &PeerId,
539        peer_addrs: &[MultiAddr],
540    ) -> Result<XorName> {
541        let address = compute_address(&content);
542        let node = self.network().node();
543        let timeout =
544            store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
545        let timeout_secs = timeout.as_secs();
546
547        let request_id = self.next_request_id();
548        // `content` is a refcounted `Bytes` shared with the sibling
549        // close-group sends; pass it through directly so each peer shares
550        // the same backing buffer instead of deep-copying the 4 MB payload.
551        let request = ChunkPutRequest::with_payment(address, content, proof);
552        let message = ChunkMessage {
553            request_id,
554            body: ChunkMessageBody::PutRequest(request),
555        };
556        let message_bytes = message
557            .encode()
558            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
559
560        let addr_hex = hex::encode(address);
561
562        let result = send_and_await_chunk_response(
563            node,
564            target_peer,
565            message_bytes,
566            request_id,
567            timeout,
568            peer_addrs,
569            |body| match body {
570                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
571                    debug!("Chunk stored at {}", hex::encode(addr));
572                    Some(Ok(addr))
573                }
574                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
575                    address: addr,
576                }) => {
577                    debug!("Chunk already exists at {}", hex::encode(addr));
578                    Some(Ok(addr))
579                }
580                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
581                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
582                }
583                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
584                    // Preserve the structured remote reason instead of
585                    // flattening it into `Error::Protocol`. The node
586                    // responded, so the transport round-trip succeeded —
587                    // this is an application-level rejection and must not
588                    // suppress the store AIMD limiter (V2-468).
589                    Some(Err(Error::RemotePut {
590                        address: addr_hex.clone(),
591                        source: e,
592                    }))
593                }
594                _ => None,
595            },
596            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
597            || {
598                Error::Timeout(format!(
599                    "Timeout waiting for store response after {timeout_secs}s"
600                ))
601            },
602        )
603        .await;
604
605        result
606    }
607
608    /// Retrieve a chunk from the Autonomi network.
609    ///
610    /// Queries all peers in the close group for the chunk address,
611    /// returning the first successful response. This handles the case
612    /// where the storing peer differs from the first peer returned by
613    /// DHT routing.
614    ///
615    /// ## Adaptive controller feedback
616    ///
617    /// Each per-peer GET attempt is fed individually to the adaptive
618    /// fetch limiter via `controller().fetch.observe(...)`. This is
619    /// deliberately finer-grained than wrapping the outer `chunk_get`
620    /// with `observe_op`: when a chunk takes 6 peer tries to land,
621    /// 5 of them are real capacity signals (timeouts / network errors)
622    /// that should pull the cap down even if the chunk eventually
623    /// succeeds. The outer `Ok(_)` would mask all five as a single
624    /// `Outcome::Success`. See `adaptive::Outcome` for the per-attempt
625    /// classification rules used below.
626    ///
627    /// Callers should therefore NOT wrap `chunk_get` in `observe_op`.
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the network operation fails.
632    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
633        self.chunk_get_from_closest_peers(address, self.config().close_group_size)
634            .await
635    }
636
637    /// Retrieve a chunk from the requested number of closest peers.
638    ///
639    /// Queries peers in XOR-distance order for the chunk address,
640    /// returning the first successful response. This handles the case
641    /// where the storing peer differs from the first peer returned by
642    /// DHT routing.
643    ///
644    /// # Errors
645    ///
646    /// Returns an error if the network operation fails.
647    pub async fn chunk_get_from_closest_peers(
648        &self,
649        address: &XorName,
650        peer_count: usize,
651    ) -> Result<Option<DataChunk>> {
652        // Check cache first, with integrity verification.
653        if let Some(cached) = self.chunk_cache().get(address) {
654            let computed = compute_address(&cached);
655            if computed == *address {
656                debug!("Cache hit for chunk {}", hex::encode(address));
657                return Ok(Some(DataChunk::new(*address, cached)));
658            }
659            // Cache entry corrupted — evict and fall through to network fetch.
660            debug!(
661                "Cache corruption detected for {}: evicting",
662                hex::encode(address)
663            );
664            self.chunk_cache().remove(address);
665        }
666
667        let addr_hex = hex::encode(address);
668
669        // First attempt against the current close-group view. A
670        // lookup/transport error here (e.g. closest_peers' DHT walk
671        // momentarily returning an error, or InsufficientPeers from a
672        // thin routing table) is NOT fatal: fall through to the retry
673        // path exactly as a non-authoritative miss would. Otherwise one
674        // transient error on the *initial* close-group walk for a single
675        // chunk would fail an entire multi-hundred-chunk download. A
676        // zeroed outcome (queried=0) is never authoritative, so it flows
677        // straight to the retry below.
678        let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
679            Ok(outcome) => outcome,
680            Err(e) => {
681                info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
682                CloseGroupOutcome {
683                    chunk: None,
684                    queried: 0,
685                    not_found: 0,
686                    timeout: 0,
687                    network_err: 0,
688                    protocol_err: 0,
689                }
690            }
691        };
692        if let Some(chunk) = first.chunk {
693            self.chunk_cache().put(chunk.address, chunk.content.clone());
694            return Ok(Some(chunk));
695        }
696
697        // Only treat as authoritative absence when *every* queried peer
698        // responded NotFound. Anything less leaves the actual storer
699        // possibly in the timeout / network-error bucket, which a retry
700        // could reach.
701        if is_authoritative_not_found(first.not_found, first.queried) {
702            info!(
703                "chunk_get giving up on {addr_hex} (unanimous NotFound): \
704                 queried={} not_found={} timeout={} network_err={} protocol_err={}",
705                first.queried,
706                first.not_found,
707                first.timeout,
708                first.network_err,
709                first.protocol_err,
710            );
711            return Ok(None);
712        }
713
714        // Otherwise the failure looks like reachability (most peers timed out
715        // or hit transport errors). The chunk is most likely still on the
716        // network but the current close-group view either (a) caught a
717        // transient transport blip or (b) converged on the wrong neighbourhood
718        // because the routing table is thin. One retry against a freshly
719        // re-walked close group is the cheapest defence against both.
720        info!(
721            "chunk_get retrying {addr_hex} after reachability failure: \
722             queried={} not_found={} timeout={} network_err={} protocol_err={}",
723            first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
724        );
725
726        // Brief settle so any in-flight transport state can quiesce before
727        // we re-walk the DHT. Keep this small so we don't add meaningful
728        // latency to the genuinely-lost case (we already paid for one full
729        // close-group sweep before getting here).
730        tokio::time::sleep(Duration::from_secs(1)).await;
731
732        // If the retry's DHT lookup itself fails, treat that as "still
733        // couldn't find" rather than escalating the error — matches the
734        // semantics of the first attempt when peers are unreachable.
735        let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
736            Ok(o) => o,
737            Err(e) => {
738                info!(
739                    "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
740                     first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
741                    first.queried,
742                    first.not_found,
743                    first.timeout,
744                    first.network_err,
745                    first.protocol_err,
746                );
747                return Ok(None);
748            }
749        };
750        if let Some(chunk) = retry.chunk {
751            info!("chunk_get retry succeeded for {addr_hex}");
752            self.chunk_cache().put(chunk.address, chunk.content.clone());
753            return Ok(Some(chunk));
754        }
755
756        info!(
757            "chunk_get exhausted close group after retry for {addr_hex}: \
758             first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
759             retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
760            first.queried,
761            first.not_found,
762            first.timeout,
763            first.network_err,
764            first.protocol_err,
765            retry.queried,
766            retry.not_found,
767            retry.timeout,
768            retry.network_err,
769            retry.protocol_err,
770        );
771        Ok(None)
772    }
773
774    /// One sweep of the requested closest peers: fetch the closest peers
775    /// for `address` from the DHT and ask each for the chunk in turn,
776    /// returning on the first success.
777    async fn chunk_get_try_closest_peers(
778        &self,
779        address: &XorName,
780        peer_count: usize,
781    ) -> Result<CloseGroupOutcome> {
782        let peers = self.closest_peers(address, peer_count).await?;
783        let addr_hex = hex::encode(address);
784        let queried = peers.len();
785        let mut not_found = 0usize;
786        let mut timeout = 0usize;
787        let mut network_err = 0usize;
788        let mut protocol_err = 0usize;
789
790        for (peer, addrs) in &peers {
791            match self.chunk_get_from_peer(address, peer, addrs).await {
792                Ok(Some(chunk)) => {
793                    return Ok(CloseGroupOutcome {
794                        chunk: Some(chunk),
795                        queried,
796                        not_found,
797                        timeout,
798                        network_err,
799                        protocol_err,
800                    });
801                }
802                Ok(None) => {
803                    not_found += 1;
804                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
805                }
806                Err(Error::Timeout(_)) => {
807                    timeout += 1;
808                    debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
809                }
810                Err(Error::Network(_)) => {
811                    network_err += 1;
812                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
813                }
814                // A `Protocol` error here is the storer responding with
815                // `ChunkGetResponse::Error(...)` — e.g. "Chunk verification
816                // failed" from a peer that has a corrupted local copy.
817                // That's a per-peer problem, not a per-chunk one: the
818                // remaining peers might still have a clean copy, so
819                // continue the sweep rather than aborting it. Counted
820                // separately from network_err so the summary log still
821                // distinguishes "peer corrupted" from "peer unreachable".
822                Err(Error::Protocol(ref e)) => {
823                    protocol_err += 1;
824                    debug!(
825                        "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
826                    );
827                }
828                Err(e) => return Err(e),
829            }
830        }
831
832        Ok(CloseGroupOutcome {
833            chunk: None,
834            queried,
835            not_found,
836            timeout,
837            network_err,
838            protocol_err,
839        })
840    }
841
842    /// Retrieve a chunk from every peer in the close group.
843    ///
844    /// Unlike [`Client::chunk_get`], this method does not return early
845    /// after the first successful response. It returns one result per
846    /// close-group peer, sorted from closest XOR distance to furthest.
847    ///
848    /// # Errors
849    ///
850    /// Returns an error if the close-group lookup fails.
851    pub async fn chunk_get_from_close_group(
852        &self,
853        address: &XorName,
854    ) -> Result<Vec<ChunkPeerGetResult>> {
855        self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
856            .await
857    }
858
859    /// Retrieve a chunk from the requested number of closest peers.
860    ///
861    /// Unlike [`Client::chunk_get_from_closest_peers`], this method does
862    /// not return early after the first successful response. It returns
863    /// one result per queried peer, sorted from closest XOR distance to
864    /// furthest.
865    ///
866    /// # Errors
867    ///
868    /// Returns an error if the DHT lookup fails.
869    pub async fn chunk_get_from_closest_peer_group(
870        &self,
871        address: &XorName,
872        peer_count: usize,
873    ) -> Result<Vec<ChunkPeerGetResult>> {
874        let peers = self.closest_peers(address, peer_count).await?;
875        let targets = chunk_peer_get_targets(peers, address);
876        let concurrency_limit =
877            diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
878        let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
879        let overall_timeout =
880            diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
881
882        let mut completed = vec![false; targets.len()];
883        let mut results = Vec::with_capacity(targets.len());
884        let mut get_results = stream::iter(targets.iter().cloned())
885            .map(|target| async move {
886                let chunk_result = self
887                    .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
888                    .await;
889
890                if let Ok(Some(chunk)) = &chunk_result {
891                    self.chunk_cache().put(chunk.address, chunk.content.clone());
892                }
893
894                (
895                    target.index,
896                    ChunkPeerGetResult {
897                        peer_id: target.peer_id,
898                        peer_addrs: target.peer_addrs,
899                        xor_distance: target.xor_distance,
900                        chunk_result,
901                    },
902                )
903            })
904            .buffer_unordered(concurrency_limit);
905
906        let collect_results = async {
907            while let Some((index, result)) = get_results.next().await {
908                completed[index] = true;
909                results.push(result);
910            }
911        };
912
913        if tokio::time::timeout(overall_timeout, collect_results)
914            .await
915            .is_err()
916        {
917            for target in &targets {
918                if !completed[target.index] {
919                    results.push(timed_out_chunk_peer_get_result(
920                        target,
921                        address,
922                        overall_timeout,
923                    ));
924                }
925            }
926        }
927
928        sort_chunk_peer_get_results(&mut results);
929        Ok(results)
930    }
931
932    /// Fetch a chunk from a specific peer.
933    async fn chunk_get_from_peer(
934        &self,
935        address: &XorName,
936        peer: &PeerId,
937        peer_addrs: &[MultiAddr],
938    ) -> Result<Option<DataChunk>> {
939        let node = self.network().node();
940        let request_id = self.next_request_id();
941        let request = ChunkGetRequest::new(*address);
942        let message = ChunkMessage {
943            request_id,
944            body: ChunkMessageBody::GetRequest(request),
945        };
946        let message_bytes = message
947            .encode()
948            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
949
950        let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
951        let addr_hex = hex::encode(address);
952        let timeout_secs = self.config().chunk_get_timeout_secs;
953
954        let result = send_and_await_chunk_response(
955            node,
956            peer,
957            message_bytes,
958            request_id,
959            timeout,
960            peer_addrs,
961            |body| match body {
962                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
963                    address: addr,
964                    content,
965                }) => {
966                    if addr != *address {
967                        return Some(Err(Error::InvalidData(format!(
968                            "Mismatched chunk address: expected {addr_hex}, got {}",
969                            hex::encode(addr)
970                        ))));
971                    }
972
973                    let computed = compute_address(&content);
974                    if computed != addr {
975                        return Some(Err(Error::InvalidData(format!(
976                            "Invalid chunk content: expected hash {addr_hex}, got {}",
977                            hex::encode(computed)
978                        ))));
979                    }
980
981                    debug!(
982                        "Retrieved chunk {} ({} bytes) from peer {peer}",
983                        hex::encode(addr),
984                        content.len()
985                    );
986                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
987                }
988                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
989                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
990                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
991                )),
992                _ => None,
993            },
994            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
995            || {
996                Error::Timeout(format!(
997                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
998                ))
999            },
1000        )
1001        .await;
1002
1003        result
1004    }
1005
1006    /// Check if a chunk exists on the network.
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns an error if the network operation fails.
1011    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
1012        self.chunk_get(address).await.map(|opt| opt.is_some())
1013    }
1014
1015    /// Finalize a single-chunk publish after an external signer has paid.
1016    ///
1017    /// Single-chunk analogue of [`Client::finalize_upload`]. Takes a
1018    /// [`PreparedChunk`] (from [`Client::prepare_chunk_payment`]) and a
1019    /// `quote_hash -> tx_hash` map containing receipts for every non-zero
1020    /// quote in the chunk's payment. Builds the `PaymentProof` and stores
1021    /// the chunk on `CLOSE_GROUP_MAJORITY` peers, returning its address.
1022    ///
1023    /// Wave-batch payment shape only. Single-chunk publishes don't need
1024    /// Merkle batching: one chunk's worth of quotes is well below the
1025    /// wave-batch threshold.
1026    ///
1027    /// # Errors
1028    ///
1029    /// Returns an error if the proof construction fails (e.g. missing
1030    /// `tx_hash` for a non-zero quote) or if fewer than
1031    /// `CLOSE_GROUP_MAJORITY` peers accept the chunk.
1032    pub async fn finalize_chunk(
1033        &self,
1034        prepared: PreparedChunk,
1035        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1036    ) -> Result<XorName> {
1037        let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
1038        // finalize_batch_payment returns one PaidChunk per PreparedChunk
1039        // input; we passed exactly one. If that invariant is ever violated
1040        // it's an upstream bug — fail loudly rather than silently address-0.
1041        let chunk = paid.pop().ok_or_else(|| {
1042            Error::Payment(
1043                "finalize_batch_payment returned no paid chunks for a single \
1044                 prepared chunk — internal invariant violated"
1045                    .into(),
1046            )
1047        })?;
1048        self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
1049            .await
1050    }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055    use super::*;
1056    use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
1057
1058    /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
1059    const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
1060    /// Sentinel byte used to represent an unknown/unrecognized proof tag.
1061    const UNKNOWN_PROOF_TAG: u8 = 0xff;
1062    /// XorName byte width used by test peer IDs and distances.
1063    const TEST_XORNAME_BYTE_LEN: usize = 32;
1064    /// Last byte position in the test XOR distance arrays.
1065    const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
1066
1067    #[test]
1068    fn classify_put_failure_maps_remote_timeout_and_dial_reasons() {
1069        let remote = |source| Error::RemotePut {
1070            address: "test-addr".to_string(),
1071            source,
1072        };
1073        assert!(matches!(
1074            classify_put_failure(&remote(ProtocolError::StorageFailed("full".to_string()))),
1075            PutRejection::Full
1076        ));
1077        assert!(matches!(
1078            classify_put_failure(&remote(ProtocolError::PaymentFailed(
1079                "below floor".to_string()
1080            ))),
1081            PutRejection::PriceFloor
1082        ));
1083        assert!(matches!(
1084            classify_put_failure(&remote(ProtocolError::Internal("boom".to_string()))),
1085            PutRejection::OtherRemote
1086        ));
1087        // A `PaymentRequired` PUT response surfaces as `Error::Payment` and is an
1088        // application-level decline, not a transport shortfall (ADR-0002).
1089        assert!(matches!(
1090            classify_put_failure(&Error::Payment("Payment required: more".to_string())),
1091            PutRejection::PriceFloor
1092        ));
1093        // A PUT-response timeout is genuine local backpressure (V2-554).
1094        assert!(matches!(
1095            classify_put_failure(&Error::Timeout("no response".to_string())),
1096            PutRejection::Timeout
1097        ));
1098        // A dial/relay failure (dead/stale relayed address) is remote churn.
1099        assert!(matches!(
1100            classify_put_failure(&Error::Network("dial failed".to_string())),
1101            PutRejection::Dial
1102        ));
1103    }
1104
1105    #[test]
1106    fn put_shortfall_routes_by_failure_mix() {
1107        let app = || Error::Payment("Payment required: more".to_string());
1108        let msg = || "shortfall".to_string();
1109
1110        // Every failure was an application-level decline (no timeout, no dial):
1111        // surface the app error so the limiter isn't driven down as a false
1112        // capacity signal (ADR-0002 / V2-468).
1113        assert!(matches!(
1114            put_shortfall_error(0, 0, Some(app()), msg()),
1115            Error::Payment(_)
1116        ));
1117        // Any PUT-response timeout in the mix is genuine local backpressure:
1118        // keep it a capacity signal so the store limiter still backs off (V2-554).
1119        assert!(matches!(
1120            put_shortfall_error(1, 0, Some(app()), msg()),
1121            Error::InsufficientPeers(_)
1122        ));
1123        assert!(matches!(
1124            put_shortfall_error(1, 3, None, msg()),
1125            Error::InsufficientPeers(_)
1126        ));
1127        // No timeouts but dial/relay churn present: remote peer churn, not local
1128        // capacity — surface a neutral CloseGroupShortfall (V2-554).
1129        assert!(matches!(
1130            put_shortfall_error(0, 2, None, msg()),
1131            Error::CloseGroupShortfall(_)
1132        ));
1133        // Dial churn alongside an app rejection, still no timeout: neutral.
1134        assert!(matches!(
1135            put_shortfall_error(0, 1, Some(app()), msg()),
1136            Error::CloseGroupShortfall(_)
1137        ));
1138    }
1139
1140    fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
1141        let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
1142        xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
1143
1144        ChunkPeerGetResult {
1145            peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
1146            peer_addrs: Vec::new(),
1147            xor_distance,
1148            chunk_result: Ok(None),
1149        }
1150    }
1151
1152    #[test]
1153    fn authoritative_not_found_requires_unanimous_well_sampled_response() {
1154        // Unanimous AND well-sampled: every queried peer of a full
1155        // close group said NotFound. The only safe stop.
1156        assert!(is_authoritative_not_found(7, 7));
1157        // Unanimous with exactly a majority-sized sample is also
1158        // authoritative.
1159        assert!(is_authoritative_not_found(
1160            CLOSE_GROUP_MAJORITY,
1161            CLOSE_GROUP_MAJORITY
1162        ));
1163
1164        // Unanimous but UNDER-sampled: a thin DHT walk returning 1 or 3
1165        // peers, all NotFound, is NOT authoritative — the real replica
1166        // majority may sit entirely outside that narrow view. Must
1167        // retry (re-walk the DHT).
1168        assert!(!is_authoritative_not_found(1, 1));
1169        assert!(!is_authoritative_not_found(3, 3));
1170        assert!(!is_authoritative_not_found(
1171            CLOSE_GROUP_MAJORITY - 1,
1172            CLOSE_GROUP_MAJORITY - 1
1173        ));
1174
1175        // Not unanimous: 4-of-7 / 6-of-7 NotFound leaves storers in the
1176        // timeout bucket. Must retry.
1177        assert!(!is_authoritative_not_found(4, 7));
1178        assert!(!is_authoritative_not_found(6, 7));
1179
1180        // Pure-reachability failure — must retry.
1181        assert!(!is_authoritative_not_found(0, 7));
1182
1183        // Defensive: a zeroed outcome (e.g. the first attempt's
1184        // close-group lookup errored) is never authoritative.
1185        assert!(!is_authoritative_not_found(0, 0));
1186    }
1187
1188    #[test]
1189    fn chunk_get_outcome_classifies_each_result_kind() {
1190        // Success: chunk_get returned a chunk, regardless of how many
1191        // internal peer attempts it took.
1192        let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
1193        assert_eq!(
1194            chunk_get_outcome(&Ok(Some(chunk))),
1195            Outcome::Success,
1196            "found-chunk must be Success",
1197        );
1198
1199        // Ok(None): chunk_get exhausted the close group across first
1200        // attempt + retry. This is the load-shedding signal — count it
1201        // as Timeout so a sustained run of them on a saturated link
1202        // shrinks the cap.
1203        assert_eq!(
1204            chunk_get_outcome(&Ok(None)),
1205            Outcome::Timeout,
1206            "Ok(None) must be Timeout — that's the controller's load-shedding signal",
1207        );
1208
1209        // Capacity signals from explicit error variants.
1210        assert_eq!(
1211            chunk_get_outcome(&Err(Error::Timeout("t".into()))),
1212            Outcome::Timeout,
1213        );
1214        assert_eq!(
1215            chunk_get_outcome(&Err(Error::Network("n".into()))),
1216            Outcome::NetworkError,
1217        );
1218
1219        // Unexpected error variant (e.g. Protocol) — propagates out of
1220        // chunk_get to the caller and is not a capacity signal.
1221        assert_eq!(
1222            chunk_get_outcome(&Err(Error::Protocol("p".into()))),
1223            Outcome::ApplicationError,
1224        );
1225    }
1226
1227    #[test]
1228    fn single_node_proof_uses_store_response_timeout() {
1229        let timeout =
1230            store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
1231
1232        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1233    }
1234
1235    #[test]
1236    fn unknown_proof_uses_store_response_timeout() {
1237        let timeout =
1238            store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
1239
1240        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1241    }
1242
1243    #[test]
1244    fn merkle_proof_uses_configured_store_timeout() {
1245        let timeout =
1246            store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
1247
1248        assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
1249    }
1250
1251    #[test]
1252    fn chunk_peer_get_results_sort_by_xor_distance() {
1253        let mut results = vec![
1254            chunk_peer_get_result(3, 3),
1255            chunk_peer_get_result(1, 1),
1256            chunk_peer_get_result(2, 2),
1257        ];
1258
1259        sort_chunk_peer_get_results(&mut results);
1260
1261        let ordered_distances = results
1262            .iter()
1263            .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
1264            .collect::<Vec<_>>();
1265        assert_eq!(ordered_distances, vec![1, 2, 3]);
1266    }
1267
1268    #[test]
1269    fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
1270        const PER_PEER_TIMEOUT_SECS: u64 = 10;
1271        const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
1272        const TARGET_COUNT: usize = 7;
1273        const CONCURRENCY_LIMIT: usize = 7;
1274
1275        let timeout = diagnostic_peer_get_overall_timeout(
1276            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1277            TARGET_COUNT,
1278            CONCURRENCY_LIMIT,
1279        );
1280
1281        assert_eq!(
1282            timeout,
1283            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1284        );
1285    }
1286
1287    #[test]
1288    fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
1289        const PER_PEER_TIMEOUT_SECS: u64 = 10;
1290        const TARGET_COUNT: usize = 20;
1291        const CLOSE_GROUP_SIZE: usize = 7;
1292        const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
1293
1294        let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
1295        let timeout = diagnostic_peer_get_overall_timeout(
1296            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1297            TARGET_COUNT,
1298            concurrency_limit,
1299        );
1300
1301        assert_eq!(
1302            timeout,
1303            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1304        );
1305    }
1306
1307    /// Regression: the default `merkle_store_timeout_secs` must be at
1308    /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus
1309    /// padding. If either side moves and this invariant breaks, the
1310    /// client will give up on chunks the storer is still verifying.
1311    /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the
1312    /// derivation.
1313    #[test]
1314    fn default_merkle_store_timeout_satisfies_storer_invariant() {
1315        use crate::data::client::ClientConfig;
1316        const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
1317        const MIN_PADDING_SECS: u64 = 30;
1318        let config = ClientConfig::default();
1319        assert!(
1320            config.merkle_store_timeout_secs
1321                >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
1322            "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
1323            config.merkle_store_timeout_secs,
1324            STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
1325            MIN_PADDING_SECS,
1326        );
1327    }
1328
1329    /// Regression: the non-merkle PUT path uses the hardcoded
1330    /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config
1331    /// `merkle_store_timeout_secs`. If a future refactor accidentally
1332    /// routes non-merkle PUTs through the merkle field they'd inherit
1333    /// the 270 s value and silently regress non-merkle latency.
1334    /// `store_response_timeout_for_proof` with a non-merkle proof tag
1335    /// must return the const regardless of what merkle timeout is
1336    /// passed.
1337    #[test]
1338    fn non_merkle_put_ignores_merkle_timeout_value() {
1339        let absurd_merkle_timeout = 9_999;
1340        for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
1341            let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
1342            assert_eq!(
1343                timeout, STORE_RESPONSE_TIMEOUT,
1344                "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
1345            );
1346        }
1347    }
1348}