Skip to main content

ant_core/data/client/
chunk.rs

1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::adaptive::Outcome;
7use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
8use crate::data::client::peer_xor_distance;
9use crate::data::client::Client;
10use crate::data::error::{Error, Result};
11use ant_protocol::evm::{QuoteHash, TxHash};
12use ant_protocol::transport::{MultiAddr, PeerId};
13use ant_protocol::{
14    compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
15    ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
16    ProofType, ProtocolError, XorName, CLOSE_GROUP_MAJORITY,
17};
18use bytes::Bytes;
19use futures::stream::{self, FuturesUnordered, StreamExt};
20use std::collections::HashMap;
21use std::time::{Duration, Instant};
22use tracing::{debug, info, warn};
23
24/// Data type identifier for chunks (used in quote requests).
25const CHUNK_DATA_TYPE: u32 = 0;
26
27/// Why a single-peer PUT was declined. Drives the surfaced aggregate error
28/// and keeps the store AIMD limiter honest — only a transport shortfall is a
29/// "client is sending too fast" signal (V2-468); a node that responds with a
30/// structured rejection is an application-level decline (ADR-0002).
31#[derive(Clone, Copy)]
32enum PutRejection {
33    /// Node is out of storage (`ProtocolError::StorageFailed`) — try a
34    /// further peer.
35    Full,
36    /// Payment did not clear the node's local price floor, or the proof's
37    /// issuers are not close enough in this peer's view
38    /// (`ProtocolError::PaymentFailed`), or the node asked for more than was
39    /// paid (`ChunkPutResponse::PaymentRequired` → [`Error::Payment`]) — skip
40    /// this peer, do not re-quote.
41    PriceFloor,
42    /// Some other structured remote rejection.
43    OtherRemote,
44    /// Transport/timeout failure — the node did not respond.
45    Transport,
46}
47
48/// Classify a failed single-peer PUT (ADR-0002). A `RemotePut` carries the
49/// node's structured `ProtocolError`; a `PaymentRequired` response surfaces as
50/// [`Error::Payment`]; anything else is a transport failure.
51fn classify_put_failure(error: &Error) -> PutRejection {
52    match error {
53        Error::RemotePut { source, .. } => match source {
54            ProtocolError::StorageFailed(_) => PutRejection::Full,
55            ProtocolError::PaymentFailed(_) => PutRejection::PriceFloor,
56            _ => PutRejection::OtherRemote,
57        },
58        // A `PaymentRequired` PUT response (the node wants more than was paid)
59        // arrives as `Error::Payment`. It is a structured application-level
60        // decline — skip the peer and advance fallback, exactly like a
61        // price-floor `PaymentFailed` — not a transport shortfall, so it must
62        // not push the store AIMD limiter down (ADR-0002 / V2-468).
63        Error::Payment(_) => PutRejection::PriceFloor,
64        _ => PutRejection::Transport,
65    }
66}
67
68/// Decide the error for a close-group store that fell short of quorum.
69///
70/// When every failure was an application-level decline — the node responded
71/// (full / price-floor / `PaymentRequired` / other remote rejection) and there
72/// was **no** transport failure — return the representative application error so
73/// the shortfall classifies as `ApplicationError` and does not push the store
74/// AIMD limiter down as a false capacity signal (ADR-0002 / V2-468). A shortfall
75/// that included any transport failure is a genuine capacity signal and surfaces
76/// as `InsufficientPeers` (classified `NetworkError`).
77fn put_shortfall_error(
78    transport: usize,
79    first_app_rejection: Option<Error>,
80    insufficient_peers_message: String,
81) -> Error {
82    if transport == 0 {
83        if let Some(app_rejection) = first_app_rejection {
84            return app_rejection;
85        }
86    }
87    Error::InsufficientPeers(insufficient_peers_message)
88}
89
90/// Result of one sweep over a chunk's close group.
91///
92/// Either we got the chunk from some peer, or every peer in the group
93/// returned NotFound, timed out, or hit a transport / protocol error.
94/// The counts feed the retry decision (`is_authoritative_not_found`):
95/// only a *unanimous* NotFound from a *well-sampled* close group counts
96/// as authoritative data absence — anything else (a non-unanimous
97/// result, or a thin/under-sampled DHT walk) leaves room for the actual
98/// storer to be in the timeout / network-error / protocol-error bucket
99/// or outside the sampled view, and is worth a retry against a freshly
100/// re-walked close group.
101struct CloseGroupOutcome {
102    chunk: Option<DataChunk>,
103    queried: usize,
104    not_found: usize,
105    timeout: usize,
106    network_err: usize,
107    /// Counts peers that responded with a remote `Error` (e.g.
108    /// "Chunk verification failed") or any other protocol-level error
109    /// that classifies as `Error::Protocol`. Treated the same as
110    /// `timeout` / `network_err` for retry decisions: one peer's bad
111    /// response must not abort the whole close-group sweep — the
112    /// remaining peers might still have a clean copy.
113    protocol_err: usize,
114}
115
116/// `true` if the close-group sweep is strong enough evidence to
117/// conclude the chunk is genuinely absent, so retrying is pointless.
118///
119/// Two conditions, both required:
120///
121/// 1. *Unanimous*: every peer we managed to query responded with an
122///    authoritative NotFound (`not_found == queried`). An earlier
123///    version used a majority quorum (`not_found >= close_group_size /
124///    2 + 1`), but production traffic disproved that: storage
125///    replicates to `CLOSE_GROUP_MAJORITY` (4) of the K=7 close-group
126///    peers, so up to 3 peers legitimately don't store any given chunk
127///    and a `not_found=4 timeout=3` result is "3 storers we couldn't
128///    reach" plus "4 non-storers," not data loss.
129///
130/// 2. *Well-sampled*: at least `CLOSE_GROUP_MAJORITY` peers were
131///    queried. `closest_peers` (via `find_closest_peers`) accepts
132///    any non-empty DHT result, so a thin/under-sampled walk can return
133///    1 or 2 peers. A `1/1` or `3/3` NotFound from such a walk is NOT
134///    authoritative — the real replica majority may sit entirely
135///    outside that narrow view. Requiring a majority-sized sample means
136///    a thin lookup falls through to the retry (which re-walks the DHT)
137///    instead of being declared a final absence.
138fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
139    queried >= CLOSE_GROUP_MAJORITY && not_found == queried
140}
141
142/// Store-response timeout for non-merkle chunk PUTs.
143const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
144
145/// Extra waves allowed after the computed diagnostic peer-sweep deadline.
146const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
147
148/// Result of fetching one chunk address from one close-group peer.
149pub struct ChunkPeerGetResult {
150    /// Peer queried for the chunk.
151    pub peer_id: PeerId,
152    /// Known network addresses used for the peer.
153    pub peer_addrs: Vec<MultiAddr>,
154    /// XOR distance from `peer_id` to the chunk address.
155    pub xor_distance: [u8; 32],
156    /// Per-peer fetch result.
157    pub chunk_result: Result<Option<DataChunk>>,
158}
159
160#[derive(Clone)]
161struct ChunkPeerGetTarget {
162    index: usize,
163    peer_id: PeerId,
164    peer_addrs: Vec<MultiAddr>,
165    xor_distance: [u8; 32],
166}
167
168fn chunk_peer_get_targets(
169    peers: Vec<(PeerId, Vec<MultiAddr>)>,
170    address: &XorName,
171) -> Vec<ChunkPeerGetTarget> {
172    peers
173        .into_iter()
174        .enumerate()
175        .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
176            index,
177            peer_id,
178            peer_addrs,
179            xor_distance: peer_xor_distance(&peer_id, address),
180        })
181        .collect()
182}
183
184fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
185    results.sort_by_key(|result| result.xor_distance);
186}
187
188fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
189    peer_count.min(close_group_size.max(1))
190}
191
192fn diagnostic_peer_get_overall_timeout(
193    per_peer_timeout: Duration,
194    target_count: usize,
195    concurrency_limit: usize,
196) -> Duration {
197    let concurrency_limit = concurrency_limit.max(1);
198    let peer_get_waves = target_count.div_ceil(concurrency_limit);
199    let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
200    let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
201
202    per_peer_timeout.saturating_mul(timeout_waves)
203}
204
205fn timed_out_chunk_peer_get_result(
206    target: &ChunkPeerGetTarget,
207    address: &XorName,
208    timeout: Duration,
209) -> ChunkPeerGetResult {
210    let addr_hex = hex::encode(address);
211    let timeout_secs = timeout.as_secs();
212    ChunkPeerGetResult {
213        peer_id: target.peer_id,
214        peer_addrs: target.peer_addrs.clone(),
215        xor_distance: target.xor_distance,
216        chunk_result: Err(Error::Timeout(format!(
217            "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
218            target.peer_id
219        ))),
220    }
221}
222
223fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
224    match detect_proof_type(proof) {
225        Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
226        _ => STORE_RESPONSE_TIMEOUT,
227    }
228}
229
230impl Client {
231    /// Run `chunk_get` and feed one byte-aware observation per call to
232    /// the adaptive fetch limiter. Use this from any consumer that
233    /// drives chunk-fetch concurrency from `controller().fetch.current()`
234    /// — the controller's window relies on every call along the hot
235    /// path producing an observation.
236    ///
237    /// Classifier semantics: see `chunk_get_outcome`. Most importantly,
238    /// `Ok(None)` is treated as `Outcome::Timeout`, not Success, so a
239    /// sustained run of close-group exhaustions correctly drives the
240    /// cap down rather than silently inflating it.
241    pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
242        self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
243            .await
244    }
245
246    pub(crate) async fn chunk_get_observed_from_closest_peers(
247        &self,
248        address: &XorName,
249        peer_count: usize,
250    ) -> Result<Option<DataChunk>> {
251        let started = Instant::now();
252        let result = self.chunk_get_from_closest_peers(address, peer_count).await;
253        let latency = started.elapsed();
254        let bytes = result
255            .as_ref()
256            .ok()
257            .and_then(Option::as_ref)
258            .map_or(0, |chunk| chunk.content.len() as u64);
259        self.controller()
260            .fetch
261            .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
262        result
263    }
264}
265
266/// Map a `chunk_get` outcome to an adaptive controller `Outcome`.
267///
268/// This is the result-aware classifier used by the file-download paths.
269/// It differs from `classify_error` in one critical way: an `Ok(None)`
270/// from `chunk_get` is `Outcome::Timeout`, not `Outcome::Success`. By
271/// the time `chunk_get` returns `Ok(None)` it has already exhausted
272/// the close group across its first attempt + retry sweep, so
273/// `Ok(None)` is the controller's load-shedding signal — a sustained
274/// run of them on a saturated home link is exactly the case where the
275/// cap should shrink.
276///
277/// Healthy returns (`Ok(Some(_))`) are Success regardless of how many
278/// internal peer attempts the chunk_get had to make. The controller
279/// does not need to see internal peer noise; that's noise about the
280/// production network's natural peer-side variability, not about the
281/// client's effective capacity.
282pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
283    match result {
284        Ok(Some(_)) => Outcome::Success,
285        Ok(None) => Outcome::Timeout,
286        Err(Error::Timeout(_)) => Outcome::Timeout,
287        Err(Error::Network(_)) => Outcome::NetworkError,
288        Err(_) => Outcome::ApplicationError,
289    }
290}
291
292impl Client {
293    /// Store a chunk on the Autonomi network with payment.
294    ///
295    /// Checks if the chunk already exists before paying. If it does,
296    /// returns the address immediately without incurring on-chain costs.
297    /// Otherwise collects quotes, pays on-chain, then stores with proof
298    /// to `CLOSE_GROUP_MAJORITY` peers.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if payment or the network operation fails.
303    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
304        let address = compute_address(&content);
305        let data_size = u64::try_from(content.len())
306            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
307
308        match self
309            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
310            .await
311        {
312            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
313            Err(Error::AlreadyStored) => {
314                debug!(
315                    "Chunk {} already stored on network, skipping payment",
316                    hex::encode(address)
317                );
318                Ok(address)
319            }
320            Err(e) => Err(e),
321        }
322    }
323
324    /// Test-only: pay for `content`, then store it with `dead_count`
325    /// unreachable peers prepended to the real put-target set.
326    ///
327    /// Every initial send hits a dead peer and fails, so the store can only
328    /// reach quorum by falling back through the real put-targets (the closest-K
329    /// set the quote plan already returned), reusing the same `ProofOfPayment`.
330    /// Pass `dead_count >= CLOSE_GROUP_MAJORITY` so a full quorum's worth of
331    /// replacements must come from the fallback; a success proves the fallback
332    /// works end-to-end.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if payment fails or quorum cannot be reached.
337    #[cfg(feature = "test-utils")]
338    pub async fn chunk_put_with_dead_initial_peers(
339        &self,
340        content: Bytes,
341        dead_count: usize,
342    ) -> Result<XorName> {
343        let address = compute_address(&content);
344        let data_size = u64::try_from(content.len())
345            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
346        let (proof, real_peers) = self
347            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
348            .await?;
349        // Unreachable peers (random id, no addresses) first: every initial send
350        // fails, so quorum can only be reached by falling back through the real
351        // put-target set that follows.
352        let mut peers: Vec<(PeerId, Vec<MultiAddr>)> = (0..dead_count)
353            .map(|_| (PeerId::random(), Vec::new()))
354            .collect();
355        peers.extend(real_peers);
356        self.chunk_put_to_close_group(content, proof, &peers).await
357    }
358
359    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers, falling back past full or
360    /// over-priced members of the supplied put-target set (ADR-0002).
361    ///
362    /// Sends the PUT concurrently to the first `CLOSE_GROUP_MAJORITY` peers. On
363    /// each failure it advances to the next peer in `peers` — which the caller
364    /// supplies as the chunk's closest ~K neighbourhood, so no further DHT
365    /// lookup is needed. Every peer reuses the same payment proof: a node
366    /// accepts it as long as one of the proof's quote issuers is within that
367    /// peer's own local closest view, so the client never needs to re-quote or
368    /// re-pay to route around a full node.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
373    /// the chunk.
374    pub(crate) async fn chunk_put_to_close_group(
375        &self,
376        content: Bytes,
377        proof: Vec<u8>,
378        peers: &[(PeerId, Vec<MultiAddr>)],
379    ) -> Result<XorName> {
380        let address = compute_address(&content);
381
382        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
383        let (initial_peers, fallback_peers) = peers.split_at(initial_count);
384        let mut fallback_iter = fallback_peers.iter();
385
386        let mut put_futures = FuturesUnordered::new();
387        for (peer_id, addrs) in initial_peers {
388            put_futures.push(self.spawn_chunk_put(
389                content.clone(),
390                proof.clone(),
391                *peer_id,
392                addrs.clone(),
393            ));
394        }
395
396        let mut success_count = 0usize;
397        let mut failures: Vec<String> = Vec::new();
398        // Tally the *cause* of each failure. The store AIMD limiter must only be
399        // pushed down by a transport shortfall (V2-468): a node that responds —
400        // a structured `RemotePut` decline, or `PaymentRequired` surfacing as
401        // `Error::Payment` — declined at the application layer and is not
402        // evidence the client is sending too fast. The per-cause counts also
403        // surface a legible aggregate reason; hold the first application-level
404        // rejection as the representative error.
405        let mut full = 0usize;
406        let mut price_floor = 0usize;
407        let mut other_remote = 0usize;
408        let mut transport = 0usize;
409        let mut first_app_rejection: Option<Error> = None;
410
411        while let Some((peer_id, result)) = put_futures.next().await {
412            match result {
413                Ok(_) => {
414                    success_count += 1;
415                    if success_count >= CLOSE_GROUP_MAJORITY {
416                        debug!(
417                            "Chunk {} stored on {success_count} peers (majority reached)",
418                            hex::encode(address)
419                        );
420                        return Ok(address);
421                    }
422                }
423                Err(e) => {
424                    warn!("Failed to store chunk on {peer_id}: {e}");
425                    failures.push(format!("{peer_id}: {e}"));
426                    match classify_put_failure(&e) {
427                        PutRejection::Full => full += 1,
428                        PutRejection::PriceFloor => price_floor += 1,
429                        PutRejection::OtherRemote => other_remote += 1,
430                        PutRejection::Transport => transport += 1,
431                    }
432                    // An application-level decline is `RemotePut` (a structured
433                    // node rejection) or `Error::Payment` (`PaymentRequired`):
434                    // capture the first so an all-application shortfall surfaces
435                    // as `ApplicationError`, not `InsufficientPeers`
436                    // (`NetworkError`), and never suppresses the limiter.
437                    if matches!(e, Error::RemotePut { .. } | Error::Payment(_))
438                        && first_app_rejection.is_none()
439                    {
440                        first_app_rejection = Some(e);
441                    }
442
443                    // Advance to the next peer in the put-target set, reusing
444                    // the same proof.
445                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
446                        debug!(
447                            "Falling back to peer {fb_peer} for chunk {}",
448                            hex::encode(address)
449                        );
450                        put_futures.push(self.spawn_chunk_put(
451                            content.clone(),
452                            proof.clone(),
453                            *fb_peer,
454                            fb_addrs.clone(),
455                        ));
456                    }
457                }
458            }
459        }
460
461        // Quorum not reached. An application-only shortfall surfaces the
462        // representative app error (so it doesn't suppress the limiter); a
463        // shortfall with any transport failure is a real capacity signal.
464        let aggregate = format!(
465            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY} \
466             (full: {full}, price-floor: {price_floor}, other-rejection: {other_remote}, \
467             transport: {transport}). Failures: [{}]",
468            failures.join("; ")
469        );
470        Err(put_shortfall_error(
471            transport,
472            first_app_rejection,
473            aggregate,
474        ))
475    }
476
477    /// Build a chunk PUT future for a single peer. Takes owned peer data so
478    /// the future can outlive a fallback queue entry popped per iteration.
479    async fn spawn_chunk_put(
480        &self,
481        content: Bytes,
482        proof: Vec<u8>,
483        peer_id: PeerId,
484        addrs: Vec<MultiAddr>,
485    ) -> (PeerId, Result<XorName>) {
486        let result = self
487            .chunk_put_with_proof(content, proof, &peer_id, &addrs)
488            .await;
489        (peer_id, result)
490    }
491
492    /// Store a chunk on the Autonomi network with a pre-built payment proof.
493    ///
494    /// Sends to a single peer. Callers that need replication across the
495    /// close group should use `chunk_put_to_close_group` instead.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if the network operation fails.
500    pub async fn chunk_put_with_proof(
501        &self,
502        content: Bytes,
503        proof: Vec<u8>,
504        target_peer: &PeerId,
505        peer_addrs: &[MultiAddr],
506    ) -> Result<XorName> {
507        let address = compute_address(&content);
508        let node = self.network().node();
509        let timeout =
510            store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
511        let timeout_secs = timeout.as_secs();
512
513        let request_id = self.next_request_id();
514        // `content` is a refcounted `Bytes` shared with the sibling
515        // close-group sends; pass it through directly so each peer shares
516        // the same backing buffer instead of deep-copying the 4 MB payload.
517        let request = ChunkPutRequest::with_payment(address, content, proof);
518        let message = ChunkMessage {
519            request_id,
520            body: ChunkMessageBody::PutRequest(request),
521        };
522        let message_bytes = message
523            .encode()
524            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
525
526        let addr_hex = hex::encode(address);
527
528        let result = send_and_await_chunk_response(
529            node,
530            target_peer,
531            message_bytes,
532            request_id,
533            timeout,
534            peer_addrs,
535            |body| match body {
536                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
537                    debug!("Chunk stored at {}", hex::encode(addr));
538                    Some(Ok(addr))
539                }
540                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
541                    address: addr,
542                }) => {
543                    debug!("Chunk already exists at {}", hex::encode(addr));
544                    Some(Ok(addr))
545                }
546                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
547                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
548                }
549                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
550                    // Preserve the structured remote reason instead of
551                    // flattening it into `Error::Protocol`. The node
552                    // responded, so the transport round-trip succeeded —
553                    // this is an application-level rejection and must not
554                    // suppress the store AIMD limiter (V2-468).
555                    Some(Err(Error::RemotePut {
556                        address: addr_hex.clone(),
557                        source: e,
558                    }))
559                }
560                _ => None,
561            },
562            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
563            || {
564                Error::Timeout(format!(
565                    "Timeout waiting for store response after {timeout_secs}s"
566                ))
567            },
568        )
569        .await;
570
571        result
572    }
573
574    /// Retrieve a chunk from the Autonomi network.
575    ///
576    /// Queries all peers in the close group for the chunk address,
577    /// returning the first successful response. This handles the case
578    /// where the storing peer differs from the first peer returned by
579    /// DHT routing.
580    ///
581    /// ## Adaptive controller feedback
582    ///
583    /// Each per-peer GET attempt is fed individually to the adaptive
584    /// fetch limiter via `controller().fetch.observe(...)`. This is
585    /// deliberately finer-grained than wrapping the outer `chunk_get`
586    /// with `observe_op`: when a chunk takes 6 peer tries to land,
587    /// 5 of them are real capacity signals (timeouts / network errors)
588    /// that should pull the cap down even if the chunk eventually
589    /// succeeds. The outer `Ok(_)` would mask all five as a single
590    /// `Outcome::Success`. See `adaptive::Outcome` for the per-attempt
591    /// classification rules used below.
592    ///
593    /// Callers should therefore NOT wrap `chunk_get` in `observe_op`.
594    ///
595    /// # Errors
596    ///
597    /// Returns an error if the network operation fails.
598    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
599        self.chunk_get_from_closest_peers(address, self.config().close_group_size)
600            .await
601    }
602
603    /// Retrieve a chunk from the requested number of closest peers.
604    ///
605    /// Queries peers in XOR-distance order for the chunk address,
606    /// returning the first successful response. This handles the case
607    /// where the storing peer differs from the first peer returned by
608    /// DHT routing.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if the network operation fails.
613    pub async fn chunk_get_from_closest_peers(
614        &self,
615        address: &XorName,
616        peer_count: usize,
617    ) -> Result<Option<DataChunk>> {
618        // Check cache first, with integrity verification.
619        if let Some(cached) = self.chunk_cache().get(address) {
620            let computed = compute_address(&cached);
621            if computed == *address {
622                debug!("Cache hit for chunk {}", hex::encode(address));
623                return Ok(Some(DataChunk::new(*address, cached)));
624            }
625            // Cache entry corrupted — evict and fall through to network fetch.
626            debug!(
627                "Cache corruption detected for {}: evicting",
628                hex::encode(address)
629            );
630            self.chunk_cache().remove(address);
631        }
632
633        let addr_hex = hex::encode(address);
634
635        // First attempt against the current close-group view. A
636        // lookup/transport error here (e.g. closest_peers' DHT walk
637        // momentarily returning an error, or InsufficientPeers from a
638        // thin routing table) is NOT fatal: fall through to the retry
639        // path exactly as a non-authoritative miss would. Otherwise one
640        // transient error on the *initial* close-group walk for a single
641        // chunk would fail an entire multi-hundred-chunk download. A
642        // zeroed outcome (queried=0) is never authoritative, so it flows
643        // straight to the retry below.
644        let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
645            Ok(outcome) => outcome,
646            Err(e) => {
647                info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
648                CloseGroupOutcome {
649                    chunk: None,
650                    queried: 0,
651                    not_found: 0,
652                    timeout: 0,
653                    network_err: 0,
654                    protocol_err: 0,
655                }
656            }
657        };
658        if let Some(chunk) = first.chunk {
659            self.chunk_cache().put(chunk.address, chunk.content.clone());
660            return Ok(Some(chunk));
661        }
662
663        // Only treat as authoritative absence when *every* queried peer
664        // responded NotFound. Anything less leaves the actual storer
665        // possibly in the timeout / network-error bucket, which a retry
666        // could reach.
667        if is_authoritative_not_found(first.not_found, first.queried) {
668            info!(
669                "chunk_get giving up on {addr_hex} (unanimous NotFound): \
670                 queried={} not_found={} timeout={} network_err={} protocol_err={}",
671                first.queried,
672                first.not_found,
673                first.timeout,
674                first.network_err,
675                first.protocol_err,
676            );
677            return Ok(None);
678        }
679
680        // Otherwise the failure looks like reachability (most peers timed out
681        // or hit transport errors). The chunk is most likely still on the
682        // network but the current close-group view either (a) caught a
683        // transient transport blip or (b) converged on the wrong neighbourhood
684        // because the routing table is thin. One retry against a freshly
685        // re-walked close group is the cheapest defence against both.
686        info!(
687            "chunk_get retrying {addr_hex} after reachability failure: \
688             queried={} not_found={} timeout={} network_err={} protocol_err={}",
689            first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
690        );
691
692        // Brief settle so any in-flight transport state can quiesce before
693        // we re-walk the DHT. Keep this small so we don't add meaningful
694        // latency to the genuinely-lost case (we already paid for one full
695        // close-group sweep before getting here).
696        tokio::time::sleep(Duration::from_secs(1)).await;
697
698        // If the retry's DHT lookup itself fails, treat that as "still
699        // couldn't find" rather than escalating the error — matches the
700        // semantics of the first attempt when peers are unreachable.
701        let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
702            Ok(o) => o,
703            Err(e) => {
704                info!(
705                    "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
706                     first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
707                    first.queried,
708                    first.not_found,
709                    first.timeout,
710                    first.network_err,
711                    first.protocol_err,
712                );
713                return Ok(None);
714            }
715        };
716        if let Some(chunk) = retry.chunk {
717            info!("chunk_get retry succeeded for {addr_hex}");
718            self.chunk_cache().put(chunk.address, chunk.content.clone());
719            return Ok(Some(chunk));
720        }
721
722        info!(
723            "chunk_get exhausted close group after retry for {addr_hex}: \
724             first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
725             retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
726            first.queried,
727            first.not_found,
728            first.timeout,
729            first.network_err,
730            first.protocol_err,
731            retry.queried,
732            retry.not_found,
733            retry.timeout,
734            retry.network_err,
735            retry.protocol_err,
736        );
737        Ok(None)
738    }
739
740    /// One sweep of the requested closest peers: fetch the closest peers
741    /// for `address` from the DHT and ask each for the chunk in turn,
742    /// returning on the first success.
743    async fn chunk_get_try_closest_peers(
744        &self,
745        address: &XorName,
746        peer_count: usize,
747    ) -> Result<CloseGroupOutcome> {
748        let peers = self.closest_peers(address, peer_count).await?;
749        let addr_hex = hex::encode(address);
750        let queried = peers.len();
751        let mut not_found = 0usize;
752        let mut timeout = 0usize;
753        let mut network_err = 0usize;
754        let mut protocol_err = 0usize;
755
756        for (peer, addrs) in &peers {
757            match self.chunk_get_from_peer(address, peer, addrs).await {
758                Ok(Some(chunk)) => {
759                    return Ok(CloseGroupOutcome {
760                        chunk: Some(chunk),
761                        queried,
762                        not_found,
763                        timeout,
764                        network_err,
765                        protocol_err,
766                    });
767                }
768                Ok(None) => {
769                    not_found += 1;
770                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
771                }
772                Err(Error::Timeout(_)) => {
773                    timeout += 1;
774                    debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
775                }
776                Err(Error::Network(_)) => {
777                    network_err += 1;
778                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
779                }
780                // A `Protocol` error here is the storer responding with
781                // `ChunkGetResponse::Error(...)` — e.g. "Chunk verification
782                // failed" from a peer that has a corrupted local copy.
783                // That's a per-peer problem, not a per-chunk one: the
784                // remaining peers might still have a clean copy, so
785                // continue the sweep rather than aborting it. Counted
786                // separately from network_err so the summary log still
787                // distinguishes "peer corrupted" from "peer unreachable".
788                Err(Error::Protocol(ref e)) => {
789                    protocol_err += 1;
790                    debug!(
791                        "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
792                    );
793                }
794                Err(e) => return Err(e),
795            }
796        }
797
798        Ok(CloseGroupOutcome {
799            chunk: None,
800            queried,
801            not_found,
802            timeout,
803            network_err,
804            protocol_err,
805        })
806    }
807
808    /// Retrieve a chunk from every peer in the close group.
809    ///
810    /// Unlike [`Client::chunk_get`], this method does not return early
811    /// after the first successful response. It returns one result per
812    /// close-group peer, sorted from closest XOR distance to furthest.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if the close-group lookup fails.
817    pub async fn chunk_get_from_close_group(
818        &self,
819        address: &XorName,
820    ) -> Result<Vec<ChunkPeerGetResult>> {
821        self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
822            .await
823    }
824
825    /// Retrieve a chunk from the requested number of closest peers.
826    ///
827    /// Unlike [`Client::chunk_get_from_closest_peers`], this method does
828    /// not return early after the first successful response. It returns
829    /// one result per queried peer, sorted from closest XOR distance to
830    /// furthest.
831    ///
832    /// # Errors
833    ///
834    /// Returns an error if the DHT lookup fails.
835    pub async fn chunk_get_from_closest_peer_group(
836        &self,
837        address: &XorName,
838        peer_count: usize,
839    ) -> Result<Vec<ChunkPeerGetResult>> {
840        let peers = self.closest_peers(address, peer_count).await?;
841        let targets = chunk_peer_get_targets(peers, address);
842        let concurrency_limit =
843            diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
844        let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
845        let overall_timeout =
846            diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
847
848        let mut completed = vec![false; targets.len()];
849        let mut results = Vec::with_capacity(targets.len());
850        let mut get_results = stream::iter(targets.iter().cloned())
851            .map(|target| async move {
852                let chunk_result = self
853                    .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
854                    .await;
855
856                if let Ok(Some(chunk)) = &chunk_result {
857                    self.chunk_cache().put(chunk.address, chunk.content.clone());
858                }
859
860                (
861                    target.index,
862                    ChunkPeerGetResult {
863                        peer_id: target.peer_id,
864                        peer_addrs: target.peer_addrs,
865                        xor_distance: target.xor_distance,
866                        chunk_result,
867                    },
868                )
869            })
870            .buffer_unordered(concurrency_limit);
871
872        let collect_results = async {
873            while let Some((index, result)) = get_results.next().await {
874                completed[index] = true;
875                results.push(result);
876            }
877        };
878
879        if tokio::time::timeout(overall_timeout, collect_results)
880            .await
881            .is_err()
882        {
883            for target in &targets {
884                if !completed[target.index] {
885                    results.push(timed_out_chunk_peer_get_result(
886                        target,
887                        address,
888                        overall_timeout,
889                    ));
890                }
891            }
892        }
893
894        sort_chunk_peer_get_results(&mut results);
895        Ok(results)
896    }
897
898    /// Fetch a chunk from a specific peer.
899    async fn chunk_get_from_peer(
900        &self,
901        address: &XorName,
902        peer: &PeerId,
903        peer_addrs: &[MultiAddr],
904    ) -> Result<Option<DataChunk>> {
905        let node = self.network().node();
906        let request_id = self.next_request_id();
907        let request = ChunkGetRequest::new(*address);
908        let message = ChunkMessage {
909            request_id,
910            body: ChunkMessageBody::GetRequest(request),
911        };
912        let message_bytes = message
913            .encode()
914            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
915
916        let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
917        let addr_hex = hex::encode(address);
918        let timeout_secs = self.config().chunk_get_timeout_secs;
919
920        let result = send_and_await_chunk_response(
921            node,
922            peer,
923            message_bytes,
924            request_id,
925            timeout,
926            peer_addrs,
927            |body| match body {
928                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
929                    address: addr,
930                    content,
931                }) => {
932                    if addr != *address {
933                        return Some(Err(Error::InvalidData(format!(
934                            "Mismatched chunk address: expected {addr_hex}, got {}",
935                            hex::encode(addr)
936                        ))));
937                    }
938
939                    let computed = compute_address(&content);
940                    if computed != addr {
941                        return Some(Err(Error::InvalidData(format!(
942                            "Invalid chunk content: expected hash {addr_hex}, got {}",
943                            hex::encode(computed)
944                        ))));
945                    }
946
947                    debug!(
948                        "Retrieved chunk {} ({} bytes) from peer {peer}",
949                        hex::encode(addr),
950                        content.len()
951                    );
952                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
953                }
954                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
955                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
956                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
957                )),
958                _ => None,
959            },
960            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
961            || {
962                Error::Timeout(format!(
963                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
964                ))
965            },
966        )
967        .await;
968
969        result
970    }
971
972    /// Check if a chunk exists on the network.
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if the network operation fails.
977    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
978        self.chunk_get(address).await.map(|opt| opt.is_some())
979    }
980
981    /// Finalize a single-chunk publish after an external signer has paid.
982    ///
983    /// Single-chunk analogue of [`Client::finalize_upload`]. Takes a
984    /// [`PreparedChunk`] (from [`Client::prepare_chunk_payment`]) and a
985    /// `quote_hash -> tx_hash` map containing receipts for every non-zero
986    /// quote in the chunk's payment. Builds the `PaymentProof` and stores
987    /// the chunk on `CLOSE_GROUP_MAJORITY` peers, returning its address.
988    ///
989    /// Wave-batch payment shape only. Single-chunk publishes don't need
990    /// Merkle batching: one chunk's worth of quotes is well below the
991    /// wave-batch threshold.
992    ///
993    /// # Errors
994    ///
995    /// Returns an error if the proof construction fails (e.g. missing
996    /// `tx_hash` for a non-zero quote) or if fewer than
997    /// `CLOSE_GROUP_MAJORITY` peers accept the chunk.
998    pub async fn finalize_chunk(
999        &self,
1000        prepared: PreparedChunk,
1001        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1002    ) -> Result<XorName> {
1003        let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
1004        // finalize_batch_payment returns one PaidChunk per PreparedChunk
1005        // input; we passed exactly one. If that invariant is ever violated
1006        // it's an upstream bug — fail loudly rather than silently address-0.
1007        let chunk = paid.pop().ok_or_else(|| {
1008            Error::Payment(
1009                "finalize_batch_payment returned no paid chunks for a single \
1010                 prepared chunk — internal invariant violated"
1011                    .into(),
1012            )
1013        })?;
1014        self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
1015            .await
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022    use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
1023
1024    /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
1025    const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
1026    /// Sentinel byte used to represent an unknown/unrecognized proof tag.
1027    const UNKNOWN_PROOF_TAG: u8 = 0xff;
1028    /// XorName byte width used by test peer IDs and distances.
1029    const TEST_XORNAME_BYTE_LEN: usize = 32;
1030    /// Last byte position in the test XOR distance arrays.
1031    const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
1032
1033    #[test]
1034    fn classify_put_failure_maps_remote_and_transport_reasons() {
1035        let remote = |source| Error::RemotePut {
1036            address: "test-addr".to_string(),
1037            source,
1038        };
1039        assert!(matches!(
1040            classify_put_failure(&remote(ProtocolError::StorageFailed("full".to_string()))),
1041            PutRejection::Full
1042        ));
1043        assert!(matches!(
1044            classify_put_failure(&remote(ProtocolError::PaymentFailed(
1045                "below floor".to_string()
1046            ))),
1047            PutRejection::PriceFloor
1048        ));
1049        assert!(matches!(
1050            classify_put_failure(&remote(ProtocolError::Internal("boom".to_string()))),
1051            PutRejection::OtherRemote
1052        ));
1053        // A `PaymentRequired` PUT response surfaces as `Error::Payment` and is an
1054        // application-level decline, not a transport shortfall (ADR-0002).
1055        assert!(matches!(
1056            classify_put_failure(&Error::Payment("Payment required: more".to_string())),
1057            PutRejection::PriceFloor
1058        ));
1059        assert!(matches!(
1060            classify_put_failure(&Error::Timeout("no response".to_string())),
1061            PutRejection::Transport
1062        ));
1063    }
1064
1065    #[test]
1066    fn put_shortfall_surfaces_app_error_only_without_transport_failure() {
1067        let app = || Error::Payment("Payment required: more".to_string());
1068        let msg = || "shortfall".to_string();
1069
1070        // Every failure was an application-level decline (e.g. all peers asked
1071        // for more payment): surface the app error so the limiter isn't driven
1072        // down as a false capacity signal (ADR-0002 / V2-468).
1073        assert!(matches!(
1074            put_shortfall_error(0, Some(app()), msg()),
1075            Error::Payment(_)
1076        ));
1077        // A transport failure in the mix: a genuine capacity shortfall.
1078        assert!(matches!(
1079            put_shortfall_error(1, Some(app()), msg()),
1080            Error::InsufficientPeers(_)
1081        ));
1082        // No application rejection captured at all (pure transport): capacity.
1083        assert!(matches!(
1084            put_shortfall_error(0, None, msg()),
1085            Error::InsufficientPeers(_)
1086        ));
1087    }
1088
1089    fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
1090        let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
1091        xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
1092
1093        ChunkPeerGetResult {
1094            peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
1095            peer_addrs: Vec::new(),
1096            xor_distance,
1097            chunk_result: Ok(None),
1098        }
1099    }
1100
1101    #[test]
1102    fn authoritative_not_found_requires_unanimous_well_sampled_response() {
1103        // Unanimous AND well-sampled: every queried peer of a full
1104        // close group said NotFound. The only safe stop.
1105        assert!(is_authoritative_not_found(7, 7));
1106        // Unanimous with exactly a majority-sized sample is also
1107        // authoritative.
1108        assert!(is_authoritative_not_found(
1109            CLOSE_GROUP_MAJORITY,
1110            CLOSE_GROUP_MAJORITY
1111        ));
1112
1113        // Unanimous but UNDER-sampled: a thin DHT walk returning 1 or 3
1114        // peers, all NotFound, is NOT authoritative — the real replica
1115        // majority may sit entirely outside that narrow view. Must
1116        // retry (re-walk the DHT).
1117        assert!(!is_authoritative_not_found(1, 1));
1118        assert!(!is_authoritative_not_found(3, 3));
1119        assert!(!is_authoritative_not_found(
1120            CLOSE_GROUP_MAJORITY - 1,
1121            CLOSE_GROUP_MAJORITY - 1
1122        ));
1123
1124        // Not unanimous: 4-of-7 / 6-of-7 NotFound leaves storers in the
1125        // timeout bucket. Must retry.
1126        assert!(!is_authoritative_not_found(4, 7));
1127        assert!(!is_authoritative_not_found(6, 7));
1128
1129        // Pure-reachability failure — must retry.
1130        assert!(!is_authoritative_not_found(0, 7));
1131
1132        // Defensive: a zeroed outcome (e.g. the first attempt's
1133        // close-group lookup errored) is never authoritative.
1134        assert!(!is_authoritative_not_found(0, 0));
1135    }
1136
1137    #[test]
1138    fn chunk_get_outcome_classifies_each_result_kind() {
1139        // Success: chunk_get returned a chunk, regardless of how many
1140        // internal peer attempts it took.
1141        let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
1142        assert_eq!(
1143            chunk_get_outcome(&Ok(Some(chunk))),
1144            Outcome::Success,
1145            "found-chunk must be Success",
1146        );
1147
1148        // Ok(None): chunk_get exhausted the close group across first
1149        // attempt + retry. This is the load-shedding signal — count it
1150        // as Timeout so a sustained run of them on a saturated link
1151        // shrinks the cap.
1152        assert_eq!(
1153            chunk_get_outcome(&Ok(None)),
1154            Outcome::Timeout,
1155            "Ok(None) must be Timeout — that's the controller's load-shedding signal",
1156        );
1157
1158        // Capacity signals from explicit error variants.
1159        assert_eq!(
1160            chunk_get_outcome(&Err(Error::Timeout("t".into()))),
1161            Outcome::Timeout,
1162        );
1163        assert_eq!(
1164            chunk_get_outcome(&Err(Error::Network("n".into()))),
1165            Outcome::NetworkError,
1166        );
1167
1168        // Unexpected error variant (e.g. Protocol) — propagates out of
1169        // chunk_get to the caller and is not a capacity signal.
1170        assert_eq!(
1171            chunk_get_outcome(&Err(Error::Protocol("p".into()))),
1172            Outcome::ApplicationError,
1173        );
1174    }
1175
1176    #[test]
1177    fn single_node_proof_uses_store_response_timeout() {
1178        let timeout =
1179            store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
1180
1181        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1182    }
1183
1184    #[test]
1185    fn unknown_proof_uses_store_response_timeout() {
1186        let timeout =
1187            store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
1188
1189        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1190    }
1191
1192    #[test]
1193    fn merkle_proof_uses_configured_store_timeout() {
1194        let timeout =
1195            store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
1196
1197        assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
1198    }
1199
1200    #[test]
1201    fn chunk_peer_get_results_sort_by_xor_distance() {
1202        let mut results = vec![
1203            chunk_peer_get_result(3, 3),
1204            chunk_peer_get_result(1, 1),
1205            chunk_peer_get_result(2, 2),
1206        ];
1207
1208        sort_chunk_peer_get_results(&mut results);
1209
1210        let ordered_distances = results
1211            .iter()
1212            .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
1213            .collect::<Vec<_>>();
1214        assert_eq!(ordered_distances, vec![1, 2, 3]);
1215    }
1216
1217    #[test]
1218    fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
1219        const PER_PEER_TIMEOUT_SECS: u64 = 10;
1220        const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
1221        const TARGET_COUNT: usize = 7;
1222        const CONCURRENCY_LIMIT: usize = 7;
1223
1224        let timeout = diagnostic_peer_get_overall_timeout(
1225            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1226            TARGET_COUNT,
1227            CONCURRENCY_LIMIT,
1228        );
1229
1230        assert_eq!(
1231            timeout,
1232            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1233        );
1234    }
1235
1236    #[test]
1237    fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
1238        const PER_PEER_TIMEOUT_SECS: u64 = 10;
1239        const TARGET_COUNT: usize = 20;
1240        const CLOSE_GROUP_SIZE: usize = 7;
1241        const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
1242
1243        let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
1244        let timeout = diagnostic_peer_get_overall_timeout(
1245            Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1246            TARGET_COUNT,
1247            concurrency_limit,
1248        );
1249
1250        assert_eq!(
1251            timeout,
1252            Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1253        );
1254    }
1255
1256    /// Regression: the default `merkle_store_timeout_secs` must be at
1257    /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus
1258    /// padding. If either side moves and this invariant breaks, the
1259    /// client will give up on chunks the storer is still verifying.
1260    /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the
1261    /// derivation.
1262    #[test]
1263    fn default_merkle_store_timeout_satisfies_storer_invariant() {
1264        use crate::data::client::ClientConfig;
1265        const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
1266        const MIN_PADDING_SECS: u64 = 30;
1267        let config = ClientConfig::default();
1268        assert!(
1269            config.merkle_store_timeout_secs
1270                >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
1271            "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
1272            config.merkle_store_timeout_secs,
1273            STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
1274            MIN_PADDING_SECS,
1275        );
1276    }
1277
1278    /// Regression: the non-merkle PUT path uses the hardcoded
1279    /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config
1280    /// `merkle_store_timeout_secs`. If a future refactor accidentally
1281    /// routes non-merkle PUTs through the merkle field they'd inherit
1282    /// the 270 s value and silently regress non-merkle latency.
1283    /// `store_response_timeout_for_proof` with a non-merkle proof tag
1284    /// must return the const regardless of what merkle timeout is
1285    /// passed.
1286    #[test]
1287    fn non_merkle_put_ignores_merkle_timeout_value() {
1288        let absurd_merkle_timeout = 9_999;
1289        for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
1290            let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
1291            assert_eq!(
1292                timeout, STORE_RESPONSE_TIMEOUT,
1293                "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
1294            );
1295        }
1296    }
1297}