ant_core/data/client/chunk.rs
1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::adaptive::Outcome;
7use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
8use crate::data::client::peer_xor_distance;
9use crate::data::client::Client;
10use crate::data::error::{Error, Result};
11use ant_protocol::evm::{QuoteHash, TxHash};
12use ant_protocol::transport::{MultiAddr, PeerId};
13use ant_protocol::{
14 compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
15 ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
16 ProofType, ProtocolError, XorName, CLOSE_GROUP_MAJORITY,
17};
18use bytes::Bytes;
19use futures::stream::{self, FuturesUnordered, StreamExt};
20use std::collections::HashMap;
21use std::time::{Duration, Instant};
22use tracing::{debug, info, warn};
23
24/// Data type identifier for chunks (used in quote requests).
25const CHUNK_DATA_TYPE: u32 = 0;
26
27/// Why a single-peer PUT was declined. Drives the surfaced aggregate error
28/// and keeps the store AIMD limiter honest — only a transport shortfall is a
29/// "client is sending too fast" signal (V2-468); a node that responds with a
30/// structured rejection is an application-level decline (ADR-0002).
31#[derive(Clone, Copy)]
32enum PutRejection {
33 /// Node is out of storage (`ProtocolError::StorageFailed`) — try a
34 /// further peer.
35 Full,
36 /// Payment did not clear the node's local price floor, or the proof's
37 /// issuers are not close enough in this peer's view
38 /// (`ProtocolError::PaymentFailed`), or the node asked for more than was
39 /// paid (`ChunkPutResponse::PaymentRequired` → [`Error::Payment`]) — skip
40 /// this peer, do not re-quote.
41 PriceFloor,
42 /// Some other structured remote rejection.
43 OtherRemote,
44 /// Transport/timeout failure — the node did not respond.
45 Transport,
46}
47
48/// Classify a failed single-peer PUT (ADR-0002). A `RemotePut` carries the
49/// node's structured `ProtocolError`; a `PaymentRequired` response surfaces as
50/// [`Error::Payment`]; anything else is a transport failure.
51fn classify_put_failure(error: &Error) -> PutRejection {
52 match error {
53 Error::RemotePut { source, .. } => match source {
54 ProtocolError::StorageFailed(_) => PutRejection::Full,
55 ProtocolError::PaymentFailed(_) => PutRejection::PriceFloor,
56 _ => PutRejection::OtherRemote,
57 },
58 // A `PaymentRequired` PUT response (the node wants more than was paid)
59 // arrives as `Error::Payment`. It is a structured application-level
60 // decline — skip the peer and advance fallback, exactly like a
61 // price-floor `PaymentFailed` — not a transport shortfall, so it must
62 // not push the store AIMD limiter down (ADR-0002 / V2-468).
63 Error::Payment(_) => PutRejection::PriceFloor,
64 _ => PutRejection::Transport,
65 }
66}
67
68/// Decide the error for a close-group store that fell short of quorum.
69///
70/// When every failure was an application-level decline — the node responded
71/// (full / price-floor / `PaymentRequired` / other remote rejection) and there
72/// was **no** transport failure — return the representative application error so
73/// the shortfall classifies as `ApplicationError` and does not push the store
74/// AIMD limiter down as a false capacity signal (ADR-0002 / V2-468). A shortfall
75/// that included any transport failure is a genuine capacity signal and surfaces
76/// as `InsufficientPeers` (classified `NetworkError`).
77fn put_shortfall_error(
78 transport: usize,
79 first_app_rejection: Option<Error>,
80 insufficient_peers_message: String,
81) -> Error {
82 if transport == 0 {
83 if let Some(app_rejection) = first_app_rejection {
84 return app_rejection;
85 }
86 }
87 Error::InsufficientPeers(insufficient_peers_message)
88}
89
90/// Result of one sweep over a chunk's close group.
91///
92/// Either we got the chunk from some peer, or every peer in the group
93/// returned NotFound, timed out, or hit a transport / protocol error.
94/// The counts feed the retry decision (`is_authoritative_not_found`):
95/// only a *unanimous* NotFound from a *well-sampled* close group counts
96/// as authoritative data absence — anything else (a non-unanimous
97/// result, or a thin/under-sampled DHT walk) leaves room for the actual
98/// storer to be in the timeout / network-error / protocol-error bucket
99/// or outside the sampled view, and is worth a retry against a freshly
100/// re-walked close group.
101struct CloseGroupOutcome {
102 chunk: Option<DataChunk>,
103 queried: usize,
104 not_found: usize,
105 timeout: usize,
106 network_err: usize,
107 /// Counts peers that responded with a remote `Error` (e.g.
108 /// "Chunk verification failed") or any other protocol-level error
109 /// that classifies as `Error::Protocol`. Treated the same as
110 /// `timeout` / `network_err` for retry decisions: one peer's bad
111 /// response must not abort the whole close-group sweep — the
112 /// remaining peers might still have a clean copy.
113 protocol_err: usize,
114}
115
116/// `true` if the close-group sweep is strong enough evidence to
117/// conclude the chunk is genuinely absent, so retrying is pointless.
118///
119/// Two conditions, both required:
120///
121/// 1. *Unanimous*: every peer we managed to query responded with an
122/// authoritative NotFound (`not_found == queried`). An earlier
123/// version used a majority quorum (`not_found >= close_group_size /
124/// 2 + 1`), but production traffic disproved that: storage
125/// replicates to `CLOSE_GROUP_MAJORITY` (4) of the K=7 close-group
126/// peers, so up to 3 peers legitimately don't store any given chunk
127/// and a `not_found=4 timeout=3` result is "3 storers we couldn't
128/// reach" plus "4 non-storers," not data loss.
129///
130/// 2. *Well-sampled*: at least `CLOSE_GROUP_MAJORITY` peers were
131/// queried. `closest_peers` (via `find_closest_peers`) accepts
132/// any non-empty DHT result, so a thin/under-sampled walk can return
133/// 1 or 2 peers. A `1/1` or `3/3` NotFound from such a walk is NOT
134/// authoritative — the real replica majority may sit entirely
135/// outside that narrow view. Requiring a majority-sized sample means
136/// a thin lookup falls through to the retry (which re-walks the DHT)
137/// instead of being declared a final absence.
138fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
139 queried >= CLOSE_GROUP_MAJORITY && not_found == queried
140}
141
142/// Store-response timeout for non-merkle chunk PUTs.
143const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
144
145/// Extra waves allowed after the computed diagnostic peer-sweep deadline.
146const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
147
148/// Result of fetching one chunk address from one close-group peer.
149pub struct ChunkPeerGetResult {
150 /// Peer queried for the chunk.
151 pub peer_id: PeerId,
152 /// Known network addresses used for the peer.
153 pub peer_addrs: Vec<MultiAddr>,
154 /// XOR distance from `peer_id` to the chunk address.
155 pub xor_distance: [u8; 32],
156 /// Per-peer fetch result.
157 pub chunk_result: Result<Option<DataChunk>>,
158}
159
160#[derive(Clone)]
161struct ChunkPeerGetTarget {
162 index: usize,
163 peer_id: PeerId,
164 peer_addrs: Vec<MultiAddr>,
165 xor_distance: [u8; 32],
166}
167
168fn chunk_peer_get_targets(
169 peers: Vec<(PeerId, Vec<MultiAddr>)>,
170 address: &XorName,
171) -> Vec<ChunkPeerGetTarget> {
172 peers
173 .into_iter()
174 .enumerate()
175 .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
176 index,
177 peer_id,
178 peer_addrs,
179 xor_distance: peer_xor_distance(&peer_id, address),
180 })
181 .collect()
182}
183
184fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
185 results.sort_by_key(|result| result.xor_distance);
186}
187
188fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
189 peer_count.min(close_group_size.max(1))
190}
191
192fn diagnostic_peer_get_overall_timeout(
193 per_peer_timeout: Duration,
194 target_count: usize,
195 concurrency_limit: usize,
196) -> Duration {
197 let concurrency_limit = concurrency_limit.max(1);
198 let peer_get_waves = target_count.div_ceil(concurrency_limit);
199 let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
200 let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
201
202 per_peer_timeout.saturating_mul(timeout_waves)
203}
204
205fn timed_out_chunk_peer_get_result(
206 target: &ChunkPeerGetTarget,
207 address: &XorName,
208 timeout: Duration,
209) -> ChunkPeerGetResult {
210 let addr_hex = hex::encode(address);
211 let timeout_secs = timeout.as_secs();
212 ChunkPeerGetResult {
213 peer_id: target.peer_id,
214 peer_addrs: target.peer_addrs.clone(),
215 xor_distance: target.xor_distance,
216 chunk_result: Err(Error::Timeout(format!(
217 "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
218 target.peer_id
219 ))),
220 }
221}
222
223fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
224 match detect_proof_type(proof) {
225 Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
226 _ => STORE_RESPONSE_TIMEOUT,
227 }
228}
229
230impl Client {
231 /// Run `chunk_get` and feed one byte-aware observation per call to
232 /// the adaptive fetch limiter. Use this from any consumer that
233 /// drives chunk-fetch concurrency from `controller().fetch.current()`
234 /// — the controller's window relies on every call along the hot
235 /// path producing an observation.
236 ///
237 /// Classifier semantics: see `chunk_get_outcome`. Most importantly,
238 /// `Ok(None)` is treated as `Outcome::Timeout`, not Success, so a
239 /// sustained run of close-group exhaustions correctly drives the
240 /// cap down rather than silently inflating it.
241 pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
242 self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
243 .await
244 }
245
246 pub(crate) async fn chunk_get_observed_from_closest_peers(
247 &self,
248 address: &XorName,
249 peer_count: usize,
250 ) -> Result<Option<DataChunk>> {
251 let started = Instant::now();
252 let result = self.chunk_get_from_closest_peers(address, peer_count).await;
253 let latency = started.elapsed();
254 let bytes = result
255 .as_ref()
256 .ok()
257 .and_then(Option::as_ref)
258 .map_or(0, |chunk| chunk.content.len() as u64);
259 self.controller()
260 .fetch
261 .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
262 result
263 }
264}
265
266/// Map a `chunk_get` outcome to an adaptive controller `Outcome`.
267///
268/// This is the result-aware classifier used by the file-download paths.
269/// It differs from `classify_error` in one critical way: an `Ok(None)`
270/// from `chunk_get` is `Outcome::Timeout`, not `Outcome::Success`. By
271/// the time `chunk_get` returns `Ok(None)` it has already exhausted
272/// the close group across its first attempt + retry sweep, so
273/// `Ok(None)` is the controller's load-shedding signal — a sustained
274/// run of them on a saturated home link is exactly the case where the
275/// cap should shrink.
276///
277/// Healthy returns (`Ok(Some(_))`) are Success regardless of how many
278/// internal peer attempts the chunk_get had to make. The controller
279/// does not need to see internal peer noise; that's noise about the
280/// production network's natural peer-side variability, not about the
281/// client's effective capacity.
282pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
283 match result {
284 Ok(Some(_)) => Outcome::Success,
285 Ok(None) => Outcome::Timeout,
286 Err(Error::Timeout(_)) => Outcome::Timeout,
287 Err(Error::Network(_)) => Outcome::NetworkError,
288 Err(_) => Outcome::ApplicationError,
289 }
290}
291
292impl Client {
293 /// Store a chunk on the Autonomi network with payment.
294 ///
295 /// Checks if the chunk already exists before paying. If it does,
296 /// returns the address immediately without incurring on-chain costs.
297 /// Otherwise collects quotes, pays on-chain, then stores with proof
298 /// to `CLOSE_GROUP_MAJORITY` peers.
299 ///
300 /// # Errors
301 ///
302 /// Returns an error if payment or the network operation fails.
303 pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
304 let address = compute_address(&content);
305 let data_size = u64::try_from(content.len())
306 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
307
308 match self
309 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
310 .await
311 {
312 Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
313 Err(Error::AlreadyStored) => {
314 debug!(
315 "Chunk {} already stored on network, skipping payment",
316 hex::encode(address)
317 );
318 Ok(address)
319 }
320 Err(e) => Err(e),
321 }
322 }
323
324 /// Test-only: pay for `content`, then store it with `dead_count`
325 /// unreachable peers prepended to the real put-target set.
326 ///
327 /// Every initial send hits a dead peer and fails, so the store can only
328 /// reach quorum by falling back through the real put-targets (the closest-K
329 /// set the quote plan already returned), reusing the same `ProofOfPayment`.
330 /// Pass `dead_count >= CLOSE_GROUP_MAJORITY` so a full quorum's worth of
331 /// replacements must come from the fallback; a success proves the fallback
332 /// works end-to-end.
333 ///
334 /// # Errors
335 ///
336 /// Returns an error if payment fails or quorum cannot be reached.
337 #[cfg(feature = "test-utils")]
338 pub async fn chunk_put_with_dead_initial_peers(
339 &self,
340 content: Bytes,
341 dead_count: usize,
342 ) -> Result<XorName> {
343 let address = compute_address(&content);
344 let data_size = u64::try_from(content.len())
345 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
346 let (proof, real_peers) = self
347 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
348 .await?;
349 // Unreachable peers (random id, no addresses) first: every initial send
350 // fails, so quorum can only be reached by falling back through the real
351 // put-target set that follows.
352 let mut peers: Vec<(PeerId, Vec<MultiAddr>)> = (0..dead_count)
353 .map(|_| (PeerId::random(), Vec::new()))
354 .collect();
355 peers.extend(real_peers);
356 self.chunk_put_to_close_group(content, proof, &peers).await
357 }
358
359 /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers, falling back past full or
360 /// over-priced members of the supplied put-target set (ADR-0002).
361 ///
362 /// Sends the PUT concurrently to the first `CLOSE_GROUP_MAJORITY` peers. On
363 /// each failure it advances to the next peer in `peers` — which the caller
364 /// supplies as the chunk's closest ~K neighbourhood, so no further DHT
365 /// lookup is needed. Every peer reuses the same payment proof: a node
366 /// accepts it as long as one of the proof's quote issuers is within that
367 /// peer's own local closest view, so the client never needs to re-quote or
368 /// re-pay to route around a full node.
369 ///
370 /// # Errors
371 ///
372 /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
373 /// the chunk.
374 pub(crate) async fn chunk_put_to_close_group(
375 &self,
376 content: Bytes,
377 proof: Vec<u8>,
378 peers: &[(PeerId, Vec<MultiAddr>)],
379 ) -> Result<XorName> {
380 let address = compute_address(&content);
381
382 let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
383 let (initial_peers, fallback_peers) = peers.split_at(initial_count);
384 let mut fallback_iter = fallback_peers.iter();
385
386 let mut put_futures = FuturesUnordered::new();
387 for (peer_id, addrs) in initial_peers {
388 put_futures.push(self.spawn_chunk_put(
389 content.clone(),
390 proof.clone(),
391 *peer_id,
392 addrs.clone(),
393 ));
394 }
395
396 let mut success_count = 0usize;
397 let mut failures: Vec<String> = Vec::new();
398 // Tally the *cause* of each failure. The store AIMD limiter must only be
399 // pushed down by a transport shortfall (V2-468): a node that responds —
400 // a structured `RemotePut` decline, or `PaymentRequired` surfacing as
401 // `Error::Payment` — declined at the application layer and is not
402 // evidence the client is sending too fast. The per-cause counts also
403 // surface a legible aggregate reason; hold the first application-level
404 // rejection as the representative error.
405 let mut full = 0usize;
406 let mut price_floor = 0usize;
407 let mut other_remote = 0usize;
408 let mut transport = 0usize;
409 let mut first_app_rejection: Option<Error> = None;
410
411 while let Some((peer_id, result)) = put_futures.next().await {
412 match result {
413 Ok(_) => {
414 success_count += 1;
415 if success_count >= CLOSE_GROUP_MAJORITY {
416 debug!(
417 "Chunk {} stored on {success_count} peers (majority reached)",
418 hex::encode(address)
419 );
420 return Ok(address);
421 }
422 }
423 Err(e) => {
424 warn!("Failed to store chunk on {peer_id}: {e}");
425 failures.push(format!("{peer_id}: {e}"));
426 match classify_put_failure(&e) {
427 PutRejection::Full => full += 1,
428 PutRejection::PriceFloor => price_floor += 1,
429 PutRejection::OtherRemote => other_remote += 1,
430 PutRejection::Transport => transport += 1,
431 }
432 // An application-level decline is `RemotePut` (a structured
433 // node rejection) or `Error::Payment` (`PaymentRequired`):
434 // capture the first so an all-application shortfall surfaces
435 // as `ApplicationError`, not `InsufficientPeers`
436 // (`NetworkError`), and never suppresses the limiter.
437 if matches!(e, Error::RemotePut { .. } | Error::Payment(_))
438 && first_app_rejection.is_none()
439 {
440 first_app_rejection = Some(e);
441 }
442
443 // Advance to the next peer in the put-target set, reusing
444 // the same proof.
445 if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
446 debug!(
447 "Falling back to peer {fb_peer} for chunk {}",
448 hex::encode(address)
449 );
450 put_futures.push(self.spawn_chunk_put(
451 content.clone(),
452 proof.clone(),
453 *fb_peer,
454 fb_addrs.clone(),
455 ));
456 }
457 }
458 }
459 }
460
461 // Quorum not reached. An application-only shortfall surfaces the
462 // representative app error (so it doesn't suppress the limiter); a
463 // shortfall with any transport failure is a real capacity signal.
464 let aggregate = format!(
465 "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY} \
466 (full: {full}, price-floor: {price_floor}, other-rejection: {other_remote}, \
467 transport: {transport}). Failures: [{}]",
468 failures.join("; ")
469 );
470 Err(put_shortfall_error(
471 transport,
472 first_app_rejection,
473 aggregate,
474 ))
475 }
476
477 /// Build a chunk PUT future for a single peer. Takes owned peer data so
478 /// the future can outlive a fallback queue entry popped per iteration.
479 async fn spawn_chunk_put(
480 &self,
481 content: Bytes,
482 proof: Vec<u8>,
483 peer_id: PeerId,
484 addrs: Vec<MultiAddr>,
485 ) -> (PeerId, Result<XorName>) {
486 let result = self
487 .chunk_put_with_proof(content, proof, &peer_id, &addrs)
488 .await;
489 (peer_id, result)
490 }
491
492 /// Store a chunk on the Autonomi network with a pre-built payment proof.
493 ///
494 /// Sends to a single peer. Callers that need replication across the
495 /// close group should use `chunk_put_to_close_group` instead.
496 ///
497 /// # Errors
498 ///
499 /// Returns an error if the network operation fails.
500 pub async fn chunk_put_with_proof(
501 &self,
502 content: Bytes,
503 proof: Vec<u8>,
504 target_peer: &PeerId,
505 peer_addrs: &[MultiAddr],
506 ) -> Result<XorName> {
507 let address = compute_address(&content);
508 let node = self.network().node();
509 let timeout =
510 store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
511 let timeout_secs = timeout.as_secs();
512
513 let request_id = self.next_request_id();
514 // `content` is a refcounted `Bytes` shared with the sibling
515 // close-group sends; pass it through directly so each peer shares
516 // the same backing buffer instead of deep-copying the 4 MB payload.
517 let request = ChunkPutRequest::with_payment(address, content, proof);
518 let message = ChunkMessage {
519 request_id,
520 body: ChunkMessageBody::PutRequest(request),
521 };
522 let message_bytes = message
523 .encode()
524 .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
525
526 let addr_hex = hex::encode(address);
527
528 let result = send_and_await_chunk_response(
529 node,
530 target_peer,
531 message_bytes,
532 request_id,
533 timeout,
534 peer_addrs,
535 |body| match body {
536 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
537 debug!("Chunk stored at {}", hex::encode(addr));
538 Some(Ok(addr))
539 }
540 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
541 address: addr,
542 }) => {
543 debug!("Chunk already exists at {}", hex::encode(addr));
544 Some(Ok(addr))
545 }
546 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
547 Some(Err(Error::Payment(format!("Payment required: {message}"))))
548 }
549 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
550 // Preserve the structured remote reason instead of
551 // flattening it into `Error::Protocol`. The node
552 // responded, so the transport round-trip succeeded —
553 // this is an application-level rejection and must not
554 // suppress the store AIMD limiter (V2-468).
555 Some(Err(Error::RemotePut {
556 address: addr_hex.clone(),
557 source: e,
558 }))
559 }
560 _ => None,
561 },
562 |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
563 || {
564 Error::Timeout(format!(
565 "Timeout waiting for store response after {timeout_secs}s"
566 ))
567 },
568 )
569 .await;
570
571 result
572 }
573
574 /// Retrieve a chunk from the Autonomi network.
575 ///
576 /// Queries all peers in the close group for the chunk address,
577 /// returning the first successful response. This handles the case
578 /// where the storing peer differs from the first peer returned by
579 /// DHT routing.
580 ///
581 /// ## Adaptive controller feedback
582 ///
583 /// Each per-peer GET attempt is fed individually to the adaptive
584 /// fetch limiter via `controller().fetch.observe(...)`. This is
585 /// deliberately finer-grained than wrapping the outer `chunk_get`
586 /// with `observe_op`: when a chunk takes 6 peer tries to land,
587 /// 5 of them are real capacity signals (timeouts / network errors)
588 /// that should pull the cap down even if the chunk eventually
589 /// succeeds. The outer `Ok(_)` would mask all five as a single
590 /// `Outcome::Success`. See `adaptive::Outcome` for the per-attempt
591 /// classification rules used below.
592 ///
593 /// Callers should therefore NOT wrap `chunk_get` in `observe_op`.
594 ///
595 /// # Errors
596 ///
597 /// Returns an error if the network operation fails.
598 pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
599 self.chunk_get_from_closest_peers(address, self.config().close_group_size)
600 .await
601 }
602
603 /// Retrieve a chunk from the requested number of closest peers.
604 ///
605 /// Queries peers in XOR-distance order for the chunk address,
606 /// returning the first successful response. This handles the case
607 /// where the storing peer differs from the first peer returned by
608 /// DHT routing.
609 ///
610 /// # Errors
611 ///
612 /// Returns an error if the network operation fails.
613 pub async fn chunk_get_from_closest_peers(
614 &self,
615 address: &XorName,
616 peer_count: usize,
617 ) -> Result<Option<DataChunk>> {
618 // Check cache first, with integrity verification.
619 if let Some(cached) = self.chunk_cache().get(address) {
620 let computed = compute_address(&cached);
621 if computed == *address {
622 debug!("Cache hit for chunk {}", hex::encode(address));
623 return Ok(Some(DataChunk::new(*address, cached)));
624 }
625 // Cache entry corrupted — evict and fall through to network fetch.
626 debug!(
627 "Cache corruption detected for {}: evicting",
628 hex::encode(address)
629 );
630 self.chunk_cache().remove(address);
631 }
632
633 let addr_hex = hex::encode(address);
634
635 // First attempt against the current close-group view. A
636 // lookup/transport error here (e.g. closest_peers' DHT walk
637 // momentarily returning an error, or InsufficientPeers from a
638 // thin routing table) is NOT fatal: fall through to the retry
639 // path exactly as a non-authoritative miss would. Otherwise one
640 // transient error on the *initial* close-group walk for a single
641 // chunk would fail an entire multi-hundred-chunk download. A
642 // zeroed outcome (queried=0) is never authoritative, so it flows
643 // straight to the retry below.
644 let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
645 Ok(outcome) => outcome,
646 Err(e) => {
647 info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
648 CloseGroupOutcome {
649 chunk: None,
650 queried: 0,
651 not_found: 0,
652 timeout: 0,
653 network_err: 0,
654 protocol_err: 0,
655 }
656 }
657 };
658 if let Some(chunk) = first.chunk {
659 self.chunk_cache().put(chunk.address, chunk.content.clone());
660 return Ok(Some(chunk));
661 }
662
663 // Only treat as authoritative absence when *every* queried peer
664 // responded NotFound. Anything less leaves the actual storer
665 // possibly in the timeout / network-error bucket, which a retry
666 // could reach.
667 if is_authoritative_not_found(first.not_found, first.queried) {
668 info!(
669 "chunk_get giving up on {addr_hex} (unanimous NotFound): \
670 queried={} not_found={} timeout={} network_err={} protocol_err={}",
671 first.queried,
672 first.not_found,
673 first.timeout,
674 first.network_err,
675 first.protocol_err,
676 );
677 return Ok(None);
678 }
679
680 // Otherwise the failure looks like reachability (most peers timed out
681 // or hit transport errors). The chunk is most likely still on the
682 // network but the current close-group view either (a) caught a
683 // transient transport blip or (b) converged on the wrong neighbourhood
684 // because the routing table is thin. One retry against a freshly
685 // re-walked close group is the cheapest defence against both.
686 info!(
687 "chunk_get retrying {addr_hex} after reachability failure: \
688 queried={} not_found={} timeout={} network_err={} protocol_err={}",
689 first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
690 );
691
692 // Brief settle so any in-flight transport state can quiesce before
693 // we re-walk the DHT. Keep this small so we don't add meaningful
694 // latency to the genuinely-lost case (we already paid for one full
695 // close-group sweep before getting here).
696 tokio::time::sleep(Duration::from_secs(1)).await;
697
698 // If the retry's DHT lookup itself fails, treat that as "still
699 // couldn't find" rather than escalating the error — matches the
700 // semantics of the first attempt when peers are unreachable.
701 let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
702 Ok(o) => o,
703 Err(e) => {
704 info!(
705 "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
706 first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
707 first.queried,
708 first.not_found,
709 first.timeout,
710 first.network_err,
711 first.protocol_err,
712 );
713 return Ok(None);
714 }
715 };
716 if let Some(chunk) = retry.chunk {
717 info!("chunk_get retry succeeded for {addr_hex}");
718 self.chunk_cache().put(chunk.address, chunk.content.clone());
719 return Ok(Some(chunk));
720 }
721
722 info!(
723 "chunk_get exhausted close group after retry for {addr_hex}: \
724 first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
725 retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
726 first.queried,
727 first.not_found,
728 first.timeout,
729 first.network_err,
730 first.protocol_err,
731 retry.queried,
732 retry.not_found,
733 retry.timeout,
734 retry.network_err,
735 retry.protocol_err,
736 );
737 Ok(None)
738 }
739
740 /// One sweep of the requested closest peers: fetch the closest peers
741 /// for `address` from the DHT and ask each for the chunk in turn,
742 /// returning on the first success.
743 async fn chunk_get_try_closest_peers(
744 &self,
745 address: &XorName,
746 peer_count: usize,
747 ) -> Result<CloseGroupOutcome> {
748 let peers = self.closest_peers(address, peer_count).await?;
749 let addr_hex = hex::encode(address);
750 let queried = peers.len();
751 let mut not_found = 0usize;
752 let mut timeout = 0usize;
753 let mut network_err = 0usize;
754 let mut protocol_err = 0usize;
755
756 for (peer, addrs) in &peers {
757 match self.chunk_get_from_peer(address, peer, addrs).await {
758 Ok(Some(chunk)) => {
759 return Ok(CloseGroupOutcome {
760 chunk: Some(chunk),
761 queried,
762 not_found,
763 timeout,
764 network_err,
765 protocol_err,
766 });
767 }
768 Ok(None) => {
769 not_found += 1;
770 debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
771 }
772 Err(Error::Timeout(_)) => {
773 timeout += 1;
774 debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
775 }
776 Err(Error::Network(_)) => {
777 network_err += 1;
778 debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
779 }
780 // A `Protocol` error here is the storer responding with
781 // `ChunkGetResponse::Error(...)` — e.g. "Chunk verification
782 // failed" from a peer that has a corrupted local copy.
783 // That's a per-peer problem, not a per-chunk one: the
784 // remaining peers might still have a clean copy, so
785 // continue the sweep rather than aborting it. Counted
786 // separately from network_err so the summary log still
787 // distinguishes "peer corrupted" from "peer unreachable".
788 Err(Error::Protocol(ref e)) => {
789 protocol_err += 1;
790 debug!(
791 "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
792 );
793 }
794 Err(e) => return Err(e),
795 }
796 }
797
798 Ok(CloseGroupOutcome {
799 chunk: None,
800 queried,
801 not_found,
802 timeout,
803 network_err,
804 protocol_err,
805 })
806 }
807
808 /// Retrieve a chunk from every peer in the close group.
809 ///
810 /// Unlike [`Client::chunk_get`], this method does not return early
811 /// after the first successful response. It returns one result per
812 /// close-group peer, sorted from closest XOR distance to furthest.
813 ///
814 /// # Errors
815 ///
816 /// Returns an error if the close-group lookup fails.
817 pub async fn chunk_get_from_close_group(
818 &self,
819 address: &XorName,
820 ) -> Result<Vec<ChunkPeerGetResult>> {
821 self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
822 .await
823 }
824
825 /// Retrieve a chunk from the requested number of closest peers.
826 ///
827 /// Unlike [`Client::chunk_get_from_closest_peers`], this method does
828 /// not return early after the first successful response. It returns
829 /// one result per queried peer, sorted from closest XOR distance to
830 /// furthest.
831 ///
832 /// # Errors
833 ///
834 /// Returns an error if the DHT lookup fails.
835 pub async fn chunk_get_from_closest_peer_group(
836 &self,
837 address: &XorName,
838 peer_count: usize,
839 ) -> Result<Vec<ChunkPeerGetResult>> {
840 let peers = self.closest_peers(address, peer_count).await?;
841 let targets = chunk_peer_get_targets(peers, address);
842 let concurrency_limit =
843 diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
844 let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
845 let overall_timeout =
846 diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
847
848 let mut completed = vec![false; targets.len()];
849 let mut results = Vec::with_capacity(targets.len());
850 let mut get_results = stream::iter(targets.iter().cloned())
851 .map(|target| async move {
852 let chunk_result = self
853 .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
854 .await;
855
856 if let Ok(Some(chunk)) = &chunk_result {
857 self.chunk_cache().put(chunk.address, chunk.content.clone());
858 }
859
860 (
861 target.index,
862 ChunkPeerGetResult {
863 peer_id: target.peer_id,
864 peer_addrs: target.peer_addrs,
865 xor_distance: target.xor_distance,
866 chunk_result,
867 },
868 )
869 })
870 .buffer_unordered(concurrency_limit);
871
872 let collect_results = async {
873 while let Some((index, result)) = get_results.next().await {
874 completed[index] = true;
875 results.push(result);
876 }
877 };
878
879 if tokio::time::timeout(overall_timeout, collect_results)
880 .await
881 .is_err()
882 {
883 for target in &targets {
884 if !completed[target.index] {
885 results.push(timed_out_chunk_peer_get_result(
886 target,
887 address,
888 overall_timeout,
889 ));
890 }
891 }
892 }
893
894 sort_chunk_peer_get_results(&mut results);
895 Ok(results)
896 }
897
898 /// Fetch a chunk from a specific peer.
899 async fn chunk_get_from_peer(
900 &self,
901 address: &XorName,
902 peer: &PeerId,
903 peer_addrs: &[MultiAddr],
904 ) -> Result<Option<DataChunk>> {
905 let node = self.network().node();
906 let request_id = self.next_request_id();
907 let request = ChunkGetRequest::new(*address);
908 let message = ChunkMessage {
909 request_id,
910 body: ChunkMessageBody::GetRequest(request),
911 };
912 let message_bytes = message
913 .encode()
914 .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
915
916 let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
917 let addr_hex = hex::encode(address);
918 let timeout_secs = self.config().chunk_get_timeout_secs;
919
920 let result = send_and_await_chunk_response(
921 node,
922 peer,
923 message_bytes,
924 request_id,
925 timeout,
926 peer_addrs,
927 |body| match body {
928 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
929 address: addr,
930 content,
931 }) => {
932 if addr != *address {
933 return Some(Err(Error::InvalidData(format!(
934 "Mismatched chunk address: expected {addr_hex}, got {}",
935 hex::encode(addr)
936 ))));
937 }
938
939 let computed = compute_address(&content);
940 if computed != addr {
941 return Some(Err(Error::InvalidData(format!(
942 "Invalid chunk content: expected hash {addr_hex}, got {}",
943 hex::encode(computed)
944 ))));
945 }
946
947 debug!(
948 "Retrieved chunk {} ({} bytes) from peer {peer}",
949 hex::encode(addr),
950 content.len()
951 );
952 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
953 }
954 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
955 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
956 Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
957 )),
958 _ => None,
959 },
960 |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
961 || {
962 Error::Timeout(format!(
963 "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
964 ))
965 },
966 )
967 .await;
968
969 result
970 }
971
972 /// Check if a chunk exists on the network.
973 ///
974 /// # Errors
975 ///
976 /// Returns an error if the network operation fails.
977 pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
978 self.chunk_get(address).await.map(|opt| opt.is_some())
979 }
980
981 /// Finalize a single-chunk publish after an external signer has paid.
982 ///
983 /// Single-chunk analogue of [`Client::finalize_upload`]. Takes a
984 /// [`PreparedChunk`] (from [`Client::prepare_chunk_payment`]) and a
985 /// `quote_hash -> tx_hash` map containing receipts for every non-zero
986 /// quote in the chunk's payment. Builds the `PaymentProof` and stores
987 /// the chunk on `CLOSE_GROUP_MAJORITY` peers, returning its address.
988 ///
989 /// Wave-batch payment shape only. Single-chunk publishes don't need
990 /// Merkle batching: one chunk's worth of quotes is well below the
991 /// wave-batch threshold.
992 ///
993 /// # Errors
994 ///
995 /// Returns an error if the proof construction fails (e.g. missing
996 /// `tx_hash` for a non-zero quote) or if fewer than
997 /// `CLOSE_GROUP_MAJORITY` peers accept the chunk.
998 pub async fn finalize_chunk(
999 &self,
1000 prepared: PreparedChunk,
1001 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1002 ) -> Result<XorName> {
1003 let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
1004 // finalize_batch_payment returns one PaidChunk per PreparedChunk
1005 // input; we passed exactly one. If that invariant is ever violated
1006 // it's an upstream bug — fail loudly rather than silently address-0.
1007 let chunk = paid.pop().ok_or_else(|| {
1008 Error::Payment(
1009 "finalize_batch_payment returned no paid chunks for a single \
1010 prepared chunk — internal invariant violated"
1011 .into(),
1012 )
1013 })?;
1014 self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
1015 .await
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022 use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
1023
1024 /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
1025 const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
1026 /// Sentinel byte used to represent an unknown/unrecognized proof tag.
1027 const UNKNOWN_PROOF_TAG: u8 = 0xff;
1028 /// XorName byte width used by test peer IDs and distances.
1029 const TEST_XORNAME_BYTE_LEN: usize = 32;
1030 /// Last byte position in the test XOR distance arrays.
1031 const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
1032
1033 #[test]
1034 fn classify_put_failure_maps_remote_and_transport_reasons() {
1035 let remote = |source| Error::RemotePut {
1036 address: "test-addr".to_string(),
1037 source,
1038 };
1039 assert!(matches!(
1040 classify_put_failure(&remote(ProtocolError::StorageFailed("full".to_string()))),
1041 PutRejection::Full
1042 ));
1043 assert!(matches!(
1044 classify_put_failure(&remote(ProtocolError::PaymentFailed(
1045 "below floor".to_string()
1046 ))),
1047 PutRejection::PriceFloor
1048 ));
1049 assert!(matches!(
1050 classify_put_failure(&remote(ProtocolError::Internal("boom".to_string()))),
1051 PutRejection::OtherRemote
1052 ));
1053 // A `PaymentRequired` PUT response surfaces as `Error::Payment` and is an
1054 // application-level decline, not a transport shortfall (ADR-0002).
1055 assert!(matches!(
1056 classify_put_failure(&Error::Payment("Payment required: more".to_string())),
1057 PutRejection::PriceFloor
1058 ));
1059 assert!(matches!(
1060 classify_put_failure(&Error::Timeout("no response".to_string())),
1061 PutRejection::Transport
1062 ));
1063 }
1064
1065 #[test]
1066 fn put_shortfall_surfaces_app_error_only_without_transport_failure() {
1067 let app = || Error::Payment("Payment required: more".to_string());
1068 let msg = || "shortfall".to_string();
1069
1070 // Every failure was an application-level decline (e.g. all peers asked
1071 // for more payment): surface the app error so the limiter isn't driven
1072 // down as a false capacity signal (ADR-0002 / V2-468).
1073 assert!(matches!(
1074 put_shortfall_error(0, Some(app()), msg()),
1075 Error::Payment(_)
1076 ));
1077 // A transport failure in the mix: a genuine capacity shortfall.
1078 assert!(matches!(
1079 put_shortfall_error(1, Some(app()), msg()),
1080 Error::InsufficientPeers(_)
1081 ));
1082 // No application rejection captured at all (pure transport): capacity.
1083 assert!(matches!(
1084 put_shortfall_error(0, None, msg()),
1085 Error::InsufficientPeers(_)
1086 ));
1087 }
1088
1089 fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
1090 let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
1091 xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
1092
1093 ChunkPeerGetResult {
1094 peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
1095 peer_addrs: Vec::new(),
1096 xor_distance,
1097 chunk_result: Ok(None),
1098 }
1099 }
1100
1101 #[test]
1102 fn authoritative_not_found_requires_unanimous_well_sampled_response() {
1103 // Unanimous AND well-sampled: every queried peer of a full
1104 // close group said NotFound. The only safe stop.
1105 assert!(is_authoritative_not_found(7, 7));
1106 // Unanimous with exactly a majority-sized sample is also
1107 // authoritative.
1108 assert!(is_authoritative_not_found(
1109 CLOSE_GROUP_MAJORITY,
1110 CLOSE_GROUP_MAJORITY
1111 ));
1112
1113 // Unanimous but UNDER-sampled: a thin DHT walk returning 1 or 3
1114 // peers, all NotFound, is NOT authoritative — the real replica
1115 // majority may sit entirely outside that narrow view. Must
1116 // retry (re-walk the DHT).
1117 assert!(!is_authoritative_not_found(1, 1));
1118 assert!(!is_authoritative_not_found(3, 3));
1119 assert!(!is_authoritative_not_found(
1120 CLOSE_GROUP_MAJORITY - 1,
1121 CLOSE_GROUP_MAJORITY - 1
1122 ));
1123
1124 // Not unanimous: 4-of-7 / 6-of-7 NotFound leaves storers in the
1125 // timeout bucket. Must retry.
1126 assert!(!is_authoritative_not_found(4, 7));
1127 assert!(!is_authoritative_not_found(6, 7));
1128
1129 // Pure-reachability failure — must retry.
1130 assert!(!is_authoritative_not_found(0, 7));
1131
1132 // Defensive: a zeroed outcome (e.g. the first attempt's
1133 // close-group lookup errored) is never authoritative.
1134 assert!(!is_authoritative_not_found(0, 0));
1135 }
1136
1137 #[test]
1138 fn chunk_get_outcome_classifies_each_result_kind() {
1139 // Success: chunk_get returned a chunk, regardless of how many
1140 // internal peer attempts it took.
1141 let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
1142 assert_eq!(
1143 chunk_get_outcome(&Ok(Some(chunk))),
1144 Outcome::Success,
1145 "found-chunk must be Success",
1146 );
1147
1148 // Ok(None): chunk_get exhausted the close group across first
1149 // attempt + retry. This is the load-shedding signal — count it
1150 // as Timeout so a sustained run of them on a saturated link
1151 // shrinks the cap.
1152 assert_eq!(
1153 chunk_get_outcome(&Ok(None)),
1154 Outcome::Timeout,
1155 "Ok(None) must be Timeout — that's the controller's load-shedding signal",
1156 );
1157
1158 // Capacity signals from explicit error variants.
1159 assert_eq!(
1160 chunk_get_outcome(&Err(Error::Timeout("t".into()))),
1161 Outcome::Timeout,
1162 );
1163 assert_eq!(
1164 chunk_get_outcome(&Err(Error::Network("n".into()))),
1165 Outcome::NetworkError,
1166 );
1167
1168 // Unexpected error variant (e.g. Protocol) — propagates out of
1169 // chunk_get to the caller and is not a capacity signal.
1170 assert_eq!(
1171 chunk_get_outcome(&Err(Error::Protocol("p".into()))),
1172 Outcome::ApplicationError,
1173 );
1174 }
1175
1176 #[test]
1177 fn single_node_proof_uses_store_response_timeout() {
1178 let timeout =
1179 store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
1180
1181 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1182 }
1183
1184 #[test]
1185 fn unknown_proof_uses_store_response_timeout() {
1186 let timeout =
1187 store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
1188
1189 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1190 }
1191
1192 #[test]
1193 fn merkle_proof_uses_configured_store_timeout() {
1194 let timeout =
1195 store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
1196
1197 assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
1198 }
1199
1200 #[test]
1201 fn chunk_peer_get_results_sort_by_xor_distance() {
1202 let mut results = vec![
1203 chunk_peer_get_result(3, 3),
1204 chunk_peer_get_result(1, 1),
1205 chunk_peer_get_result(2, 2),
1206 ];
1207
1208 sort_chunk_peer_get_results(&mut results);
1209
1210 let ordered_distances = results
1211 .iter()
1212 .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
1213 .collect::<Vec<_>>();
1214 assert_eq!(ordered_distances, vec![1, 2, 3]);
1215 }
1216
1217 #[test]
1218 fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
1219 const PER_PEER_TIMEOUT_SECS: u64 = 10;
1220 const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
1221 const TARGET_COUNT: usize = 7;
1222 const CONCURRENCY_LIMIT: usize = 7;
1223
1224 let timeout = diagnostic_peer_get_overall_timeout(
1225 Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1226 TARGET_COUNT,
1227 CONCURRENCY_LIMIT,
1228 );
1229
1230 assert_eq!(
1231 timeout,
1232 Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1233 );
1234 }
1235
1236 #[test]
1237 fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
1238 const PER_PEER_TIMEOUT_SECS: u64 = 10;
1239 const TARGET_COUNT: usize = 20;
1240 const CLOSE_GROUP_SIZE: usize = 7;
1241 const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
1242
1243 let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
1244 let timeout = diagnostic_peer_get_overall_timeout(
1245 Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1246 TARGET_COUNT,
1247 concurrency_limit,
1248 );
1249
1250 assert_eq!(
1251 timeout,
1252 Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1253 );
1254 }
1255
1256 /// Regression: the default `merkle_store_timeout_secs` must be at
1257 /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus
1258 /// padding. If either side moves and this invariant breaks, the
1259 /// client will give up on chunks the storer is still verifying.
1260 /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the
1261 /// derivation.
1262 #[test]
1263 fn default_merkle_store_timeout_satisfies_storer_invariant() {
1264 use crate::data::client::ClientConfig;
1265 const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
1266 const MIN_PADDING_SECS: u64 = 30;
1267 let config = ClientConfig::default();
1268 assert!(
1269 config.merkle_store_timeout_secs
1270 >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
1271 "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
1272 config.merkle_store_timeout_secs,
1273 STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
1274 MIN_PADDING_SECS,
1275 );
1276 }
1277
1278 /// Regression: the non-merkle PUT path uses the hardcoded
1279 /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config
1280 /// `merkle_store_timeout_secs`. If a future refactor accidentally
1281 /// routes non-merkle PUTs through the merkle field they'd inherit
1282 /// the 270 s value and silently regress non-merkle latency.
1283 /// `store_response_timeout_for_proof` with a non-merkle proof tag
1284 /// must return the const regardless of what merkle timeout is
1285 /// passed.
1286 #[test]
1287 fn non_merkle_put_ignores_merkle_timeout_value() {
1288 let absurd_merkle_timeout = 9_999;
1289 for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
1290 let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
1291 assert_eq!(
1292 timeout, STORE_RESPONSE_TIMEOUT,
1293 "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
1294 );
1295 }
1296 }
1297}