ant_core/data/client/chunk.rs
1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::adaptive::Outcome;
7use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
8use crate::data::client::peer_xor_distance;
9use crate::data::client::Client;
10use crate::data::error::{Error, Result};
11use ant_protocol::evm::{QuoteHash, TxHash};
12use ant_protocol::transport::{MultiAddr, PeerId};
13use ant_protocol::{
14 compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
15 ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
16 ProofType, ProtocolError, XorName, CLOSE_GROUP_MAJORITY,
17};
18use bytes::Bytes;
19use futures::stream::{self, FuturesUnordered, StreamExt};
20use std::collections::HashMap;
21use std::time::{Duration, Instant};
22use tracing::{debug, info, warn};
23
24/// Data type identifier for chunks (used in quote requests).
25const CHUNK_DATA_TYPE: u32 = 0;
26
27/// Why a single-peer PUT was declined. Drives the surfaced aggregate error
28/// and keeps the store AIMD limiter honest — only genuine local backpressure
29/// (a PUT-response `Timeout`) is a "client is sending too fast" signal; a node
30/// that responds with a structured rejection is an application-level decline
31/// (ADR-0002 / V2-468), and a bare dial/relay failure is remote peer churn,
32/// not local capacity (V2-554).
33#[derive(Clone, Copy)]
34enum PutRejection {
35 /// Node is out of storage (`ProtocolError::StorageFailed`) — try a
36 /// further peer.
37 Full,
38 /// Payment did not clear the node's local price floor, or the proof's
39 /// issuers are not close enough in this peer's view
40 /// (`ProtocolError::PaymentFailed`), or the node asked for more than was
41 /// paid (`ChunkPutResponse::PaymentRequired` → [`Error::Payment`]) — skip
42 /// this peer, do not re-quote.
43 PriceFloor,
44 /// Some other structured remote rejection.
45 OtherRemote,
46 /// The peer accepted the connection but did not answer the PUT within the
47 /// deadline (`Error::Timeout`). This is the genuine local-backpressure
48 /// signal: under real congestion the client's own requests time out, so a
49 /// shortfall carrying any timeout stays a capacity signal (V2-554).
50 Timeout,
51 /// A dial/relay/transport failure — the peer could not be reached at all
52 /// (`Error::Network` and other non-response errors), typically a dead or
53 /// stale relayed DHT address. This is remote peer churn, not local
54 /// backpressure, so a shortfall made up purely of these must not push the
55 /// store AIMD limiter down (V2-554).
56 Dial,
57}
58
59/// Classify a failed single-peer PUT (ADR-0002 / V2-468 / V2-554). A
60/// `RemotePut` carries the node's structured `ProtocolError`; a
61/// `PaymentRequired` response surfaces as [`Error::Payment`]; a
62/// [`Error::Timeout`] is genuine local backpressure; anything else is a
63/// dial/relay failure (remote churn).
64fn classify_put_failure(error: &Error) -> PutRejection {
65 match error {
66 Error::RemotePut { source, .. } => match source {
67 ProtocolError::StorageFailed(_) => PutRejection::Full,
68 ProtocolError::PaymentFailed(_) => PutRejection::PriceFloor,
69 _ => PutRejection::OtherRemote,
70 },
71 // A `PaymentRequired` PUT response (the node wants more than was paid)
72 // arrives as `Error::Payment`. It is a structured application-level
73 // decline — skip the peer and advance fallback, exactly like a
74 // price-floor `PaymentFailed` — not a transport shortfall, so it must
75 // not push the store AIMD limiter down (ADR-0002 / V2-468).
76 Error::Payment(_) => PutRejection::PriceFloor,
77 // The peer did not answer in time: genuine local backpressure.
78 Error::Timeout(_) => PutRejection::Timeout,
79 // Could not reach the peer at all: dial/relay churn (remote), not a
80 // local-capacity signal.
81 _ => PutRejection::Dial,
82 }
83}
84
85/// Decide the error for a close-group store that fell short of quorum.
86///
87/// Only genuine local backpressure should push the store AIMD limiter down.
88/// The failure mix decides which signal to surface:
89///
90/// 1. **Any PUT-response timeout** (`timeout > 0`): the client's own requests
91/// are timing out — genuine local congestion. Surface `InsufficientPeers`
92/// (classified `NetworkError`) so the limiter still backs off (V2-554).
93/// 2. **No timeouts, no dial failures** — every failure was an application
94/// decline (full / price-floor / `PaymentRequired` / other remote
95/// rejection): surface the representative application error so the shortfall
96/// classifies `ApplicationError` and does not suppress the limiter
97/// (ADR-0002 / V2-468).
98/// 3. **No timeouts, but dial/relay failures present**: the shortfall is
99/// close-group dial churn (dead/stale relayed peer addresses) — remote peer
100/// churn, not local capacity. Surface [`Error::CloseGroupShortfall`]
101/// (classified `ApplicationError`) so it does NOT push the limiter down
102/// (V2-554). Still recoverable/retryable.
103fn put_shortfall_error(
104 timeout: usize,
105 dial: usize,
106 first_app_rejection: Option<Error>,
107 shortfall_message: String,
108) -> Error {
109 if timeout > 0 {
110 return Error::InsufficientPeers(shortfall_message);
111 }
112 if dial == 0 {
113 if let Some(app_rejection) = first_app_rejection {
114 return app_rejection;
115 }
116 }
117 Error::CloseGroupShortfall(shortfall_message)
118}
119
120/// Result of one sweep over a chunk's close group.
121///
122/// Either we got the chunk from some peer, or every peer in the group
123/// returned NotFound, timed out, or hit a transport / protocol error.
124/// The counts feed the retry decision (`is_authoritative_not_found`):
125/// only a *unanimous* NotFound from a *well-sampled* close group counts
126/// as authoritative data absence — anything else (a non-unanimous
127/// result, or a thin/under-sampled DHT walk) leaves room for the actual
128/// storer to be in the timeout / network-error / protocol-error bucket
129/// or outside the sampled view, and is worth a retry against a freshly
130/// re-walked close group.
131struct CloseGroupOutcome {
132 chunk: Option<DataChunk>,
133 queried: usize,
134 not_found: usize,
135 timeout: usize,
136 network_err: usize,
137 /// Counts peers that responded with a remote `Error` (e.g.
138 /// "Chunk verification failed") or any other protocol-level error
139 /// that classifies as `Error::Protocol`. Treated the same as
140 /// `timeout` / `network_err` for retry decisions: one peer's bad
141 /// response must not abort the whole close-group sweep — the
142 /// remaining peers might still have a clean copy.
143 protocol_err: usize,
144}
145
146/// `true` if the close-group sweep is strong enough evidence to
147/// conclude the chunk is genuinely absent, so retrying is pointless.
148///
149/// Two conditions, both required:
150///
151/// 1. *Unanimous*: every peer we managed to query responded with an
152/// authoritative NotFound (`not_found == queried`). An earlier
153/// version used a majority quorum (`not_found >= close_group_size /
154/// 2 + 1`), but production traffic disproved that: storage
155/// replicates to `CLOSE_GROUP_MAJORITY` (4) of the K=7 close-group
156/// peers, so up to 3 peers legitimately don't store any given chunk
157/// and a `not_found=4 timeout=3` result is "3 storers we couldn't
158/// reach" plus "4 non-storers," not data loss.
159///
160/// 2. *Well-sampled*: at least `CLOSE_GROUP_MAJORITY` peers were
161/// queried. `closest_peers` (via `find_closest_peers`) accepts
162/// any non-empty DHT result, so a thin/under-sampled walk can return
163/// 1 or 2 peers. A `1/1` or `3/3` NotFound from such a walk is NOT
164/// authoritative — the real replica majority may sit entirely
165/// outside that narrow view. Requiring a majority-sized sample means
166/// a thin lookup falls through to the retry (which re-walks the DHT)
167/// instead of being declared a final absence.
168fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
169 queried >= CLOSE_GROUP_MAJORITY && not_found == queried
170}
171
172/// Store-response timeout for non-merkle chunk PUTs.
173const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
174
175/// Extra waves allowed after the computed diagnostic peer-sweep deadline.
176const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
177
178/// Result of fetching one chunk address from one close-group peer.
179pub struct ChunkPeerGetResult {
180 /// Peer queried for the chunk.
181 pub peer_id: PeerId,
182 /// Known network addresses used for the peer.
183 pub peer_addrs: Vec<MultiAddr>,
184 /// XOR distance from `peer_id` to the chunk address.
185 pub xor_distance: [u8; 32],
186 /// Per-peer fetch result.
187 pub chunk_result: Result<Option<DataChunk>>,
188}
189
190#[derive(Clone)]
191struct ChunkPeerGetTarget {
192 index: usize,
193 peer_id: PeerId,
194 peer_addrs: Vec<MultiAddr>,
195 xor_distance: [u8; 32],
196}
197
198fn chunk_peer_get_targets(
199 peers: Vec<(PeerId, Vec<MultiAddr>)>,
200 address: &XorName,
201) -> Vec<ChunkPeerGetTarget> {
202 peers
203 .into_iter()
204 .enumerate()
205 .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
206 index,
207 peer_id,
208 peer_addrs,
209 xor_distance: peer_xor_distance(&peer_id, address),
210 })
211 .collect()
212}
213
214fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
215 results.sort_by_key(|result| result.xor_distance);
216}
217
218fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
219 peer_count.min(close_group_size.max(1))
220}
221
222fn diagnostic_peer_get_overall_timeout(
223 per_peer_timeout: Duration,
224 target_count: usize,
225 concurrency_limit: usize,
226) -> Duration {
227 let concurrency_limit = concurrency_limit.max(1);
228 let peer_get_waves = target_count.div_ceil(concurrency_limit);
229 let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
230 let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
231
232 per_peer_timeout.saturating_mul(timeout_waves)
233}
234
235fn timed_out_chunk_peer_get_result(
236 target: &ChunkPeerGetTarget,
237 address: &XorName,
238 timeout: Duration,
239) -> ChunkPeerGetResult {
240 let addr_hex = hex::encode(address);
241 let timeout_secs = timeout.as_secs();
242 ChunkPeerGetResult {
243 peer_id: target.peer_id,
244 peer_addrs: target.peer_addrs.clone(),
245 xor_distance: target.xor_distance,
246 chunk_result: Err(Error::Timeout(format!(
247 "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
248 target.peer_id
249 ))),
250 }
251}
252
253fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
254 match detect_proof_type(proof) {
255 Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
256 _ => STORE_RESPONSE_TIMEOUT,
257 }
258}
259
260impl Client {
261 /// Run `chunk_get` and feed one byte-aware observation per call to
262 /// the adaptive fetch limiter. Use this from any consumer that
263 /// drives chunk-fetch concurrency from `controller().fetch.current()`
264 /// — the controller's window relies on every call along the hot
265 /// path producing an observation.
266 ///
267 /// Classifier semantics: see `chunk_get_outcome`. Most importantly,
268 /// `Ok(None)` is treated as `Outcome::Timeout`, not Success, so a
269 /// sustained run of close-group exhaustions correctly drives the
270 /// cap down rather than silently inflating it.
271 pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
272 self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
273 .await
274 }
275
276 pub(crate) async fn chunk_get_observed_from_closest_peers(
277 &self,
278 address: &XorName,
279 peer_count: usize,
280 ) -> Result<Option<DataChunk>> {
281 let started = Instant::now();
282 let result = self.chunk_get_from_closest_peers(address, peer_count).await;
283 let latency = started.elapsed();
284 let bytes = result
285 .as_ref()
286 .ok()
287 .and_then(Option::as_ref)
288 .map_or(0, |chunk| chunk.content.len() as u64);
289 self.controller()
290 .fetch
291 .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
292 result
293 }
294}
295
296/// Map a `chunk_get` outcome to an adaptive controller `Outcome`.
297///
298/// This is the result-aware classifier used by the file-download paths.
299/// It differs from `classify_error` in one critical way: an `Ok(None)`
300/// from `chunk_get` is `Outcome::Timeout`, not `Outcome::Success`. By
301/// the time `chunk_get` returns `Ok(None)` it has already exhausted
302/// the close group across its first attempt + retry sweep, so
303/// `Ok(None)` is the controller's load-shedding signal — a sustained
304/// run of them on a saturated home link is exactly the case where the
305/// cap should shrink.
306///
307/// Healthy returns (`Ok(Some(_))`) are Success regardless of how many
308/// internal peer attempts the chunk_get had to make. The controller
309/// does not need to see internal peer noise; that's noise about the
310/// production network's natural peer-side variability, not about the
311/// client's effective capacity.
312pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
313 match result {
314 Ok(Some(_)) => Outcome::Success,
315 Ok(None) => Outcome::Timeout,
316 Err(Error::Timeout(_)) => Outcome::Timeout,
317 Err(Error::Network(_)) => Outcome::NetworkError,
318 Err(_) => Outcome::ApplicationError,
319 }
320}
321
322impl Client {
323 /// Store a chunk on the Autonomi network with payment.
324 ///
325 /// Checks if the chunk already exists before paying. If it does,
326 /// returns the address immediately without incurring on-chain costs.
327 /// Otherwise collects quotes, pays on-chain, then stores with proof
328 /// to `CLOSE_GROUP_MAJORITY` peers.
329 ///
330 /// # Errors
331 ///
332 /// Returns an error if payment or the network operation fails.
333 pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
334 let address = compute_address(&content);
335 let data_size = u64::try_from(content.len())
336 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
337
338 match self
339 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
340 .await
341 {
342 Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
343 Err(Error::AlreadyStored) => {
344 debug!(
345 "Chunk {} already stored on network, skipping payment",
346 hex::encode(address)
347 );
348 Ok(address)
349 }
350 Err(e) => Err(e),
351 }
352 }
353
354 /// Test-only: pay for `content`, then store it with `dead_count`
355 /// unreachable peers prepended to the real put-target set.
356 ///
357 /// Every initial send hits a dead peer and fails, so the store can only
358 /// reach quorum by falling back through the real put-targets (the closest-K
359 /// set the quote plan already returned), reusing the same `ProofOfPayment`.
360 /// Pass `dead_count >= CLOSE_GROUP_MAJORITY` so a full quorum's worth of
361 /// replacements must come from the fallback; a success proves the fallback
362 /// works end-to-end.
363 ///
364 /// # Errors
365 ///
366 /// Returns an error if payment fails or quorum cannot be reached.
367 #[cfg(feature = "test-utils")]
368 pub async fn chunk_put_with_dead_initial_peers(
369 &self,
370 content: Bytes,
371 dead_count: usize,
372 ) -> Result<XorName> {
373 let address = compute_address(&content);
374 let data_size = u64::try_from(content.len())
375 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
376 let (proof, real_peers) = self
377 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
378 .await?;
379 // Unreachable peers (random id, no addresses) first: every initial send
380 // fails, so quorum can only be reached by falling back through the real
381 // put-target set that follows.
382 let mut peers: Vec<(PeerId, Vec<MultiAddr>)> = (0..dead_count)
383 .map(|_| (PeerId::random(), Vec::new()))
384 .collect();
385 peers.extend(real_peers);
386 self.chunk_put_to_close_group(content, proof, &peers).await
387 }
388
389 /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers, falling back past full or
390 /// over-priced members of the supplied put-target set (ADR-0002).
391 ///
392 /// Sends the PUT concurrently to the first `CLOSE_GROUP_MAJORITY` peers. On
393 /// each failure it advances to the next peer in `peers` — which the caller
394 /// supplies as the chunk's closest ~K neighbourhood, so no further DHT
395 /// lookup is needed. Every peer reuses the same payment proof: a node
396 /// accepts it as long as one of the proof's quote issuers is within that
397 /// peer's own local closest view, so the client never needs to re-quote or
398 /// re-pay to route around a full node.
399 ///
400 /// # Errors
401 ///
402 /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
403 /// the chunk.
404 pub(crate) async fn chunk_put_to_close_group(
405 &self,
406 content: Bytes,
407 proof: Vec<u8>,
408 peers: &[(PeerId, Vec<MultiAddr>)],
409 ) -> Result<XorName> {
410 let address = compute_address(&content);
411
412 let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
413 let (initial_peers, fallback_peers) = peers.split_at(initial_count);
414 let mut fallback_iter = fallback_peers.iter();
415
416 let mut put_futures = FuturesUnordered::new();
417 for (peer_id, addrs) in initial_peers {
418 put_futures.push(self.spawn_chunk_put(
419 content.clone(),
420 proof.clone(),
421 *peer_id,
422 addrs.clone(),
423 ));
424 }
425
426 let mut success_count = 0usize;
427 let mut failures: Vec<String> = Vec::new();
428 // Tally the *cause* of each failure. The store AIMD limiter must only be
429 // pushed down by a transport shortfall (V2-468): a node that responds —
430 // a structured `RemotePut` decline, or `PaymentRequired` surfacing as
431 // `Error::Payment` — declined at the application layer and is not
432 // evidence the client is sending too fast. The per-cause counts also
433 // surface a legible aggregate reason; hold the first application-level
434 // rejection as the representative error.
435 let mut full = 0usize;
436 let mut price_floor = 0usize;
437 let mut other_remote = 0usize;
438 let mut timeout = 0usize;
439 let mut dial = 0usize;
440 let mut first_app_rejection: Option<Error> = None;
441
442 while let Some((peer_id, result)) = put_futures.next().await {
443 match result {
444 Ok(_) => {
445 success_count += 1;
446 if success_count >= CLOSE_GROUP_MAJORITY {
447 debug!(
448 "Chunk {} stored on {success_count} peers (majority reached)",
449 hex::encode(address)
450 );
451 return Ok(address);
452 }
453 }
454 Err(e) => {
455 warn!("Failed to store chunk on {peer_id}: {e}");
456 failures.push(format!("{peer_id}: {e}"));
457 match classify_put_failure(&e) {
458 PutRejection::Full => full += 1,
459 PutRejection::PriceFloor => price_floor += 1,
460 PutRejection::OtherRemote => other_remote += 1,
461 PutRejection::Timeout => timeout += 1,
462 PutRejection::Dial => dial += 1,
463 }
464 // An application-level decline is `RemotePut` (a structured
465 // node rejection) or `Error::Payment` (`PaymentRequired`):
466 // capture the first so an all-application shortfall surfaces
467 // as `ApplicationError`, not `InsufficientPeers`
468 // (`NetworkError`), and never suppresses the limiter.
469 if matches!(e, Error::RemotePut { .. } | Error::Payment(_))
470 && first_app_rejection.is_none()
471 {
472 first_app_rejection = Some(e);
473 }
474
475 // Advance to the next peer in the put-target set, reusing
476 // the same proof.
477 if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
478 debug!(
479 "Falling back to peer {fb_peer} for chunk {}",
480 hex::encode(address)
481 );
482 put_futures.push(self.spawn_chunk_put(
483 content.clone(),
484 proof.clone(),
485 *fb_peer,
486 fb_addrs.clone(),
487 ));
488 }
489 }
490 }
491 }
492
493 // Quorum not reached. A timeout-bearing shortfall is genuine local
494 // backpressure (capacity signal); an application-only shortfall surfaces
495 // the representative app error; a pure dial-churn shortfall surfaces a
496 // neutral `CloseGroupShortfall` (V2-554). See `put_shortfall_error`.
497 let aggregate = format!(
498 "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY} \
499 (full: {full}, price-floor: {price_floor}, other-rejection: {other_remote}, \
500 timeout: {timeout}, dial: {dial}). Failures: [{}]",
501 failures.join("; ")
502 );
503 Err(put_shortfall_error(
504 timeout,
505 dial,
506 first_app_rejection,
507 aggregate,
508 ))
509 }
510
511 /// Build a chunk PUT future for a single peer. Takes owned peer data so
512 /// the future can outlive a fallback queue entry popped per iteration.
513 async fn spawn_chunk_put(
514 &self,
515 content: Bytes,
516 proof: Vec<u8>,
517 peer_id: PeerId,
518 addrs: Vec<MultiAddr>,
519 ) -> (PeerId, Result<XorName>) {
520 let result = self
521 .chunk_put_with_proof(content, proof, &peer_id, &addrs)
522 .await;
523 (peer_id, result)
524 }
525
526 /// Store a chunk on the Autonomi network with a pre-built payment proof.
527 ///
528 /// Sends to a single peer. Callers that need replication across the
529 /// close group should use `chunk_put_to_close_group` instead.
530 ///
531 /// # Errors
532 ///
533 /// Returns an error if the network operation fails.
534 pub async fn chunk_put_with_proof(
535 &self,
536 content: Bytes,
537 proof: Vec<u8>,
538 target_peer: &PeerId,
539 peer_addrs: &[MultiAddr],
540 ) -> Result<XorName> {
541 let address = compute_address(&content);
542 let node = self.network().node();
543 let timeout =
544 store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
545 let timeout_secs = timeout.as_secs();
546
547 let request_id = self.next_request_id();
548 // `content` is a refcounted `Bytes` shared with the sibling
549 // close-group sends; pass it through directly so each peer shares
550 // the same backing buffer instead of deep-copying the 4 MB payload.
551 let request = ChunkPutRequest::with_payment(address, content, proof);
552 let message = ChunkMessage {
553 request_id,
554 body: ChunkMessageBody::PutRequest(request),
555 };
556 let message_bytes = message
557 .encode()
558 .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
559
560 let addr_hex = hex::encode(address);
561
562 let result = send_and_await_chunk_response(
563 node,
564 target_peer,
565 message_bytes,
566 request_id,
567 timeout,
568 peer_addrs,
569 |body| match body {
570 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
571 debug!("Chunk stored at {}", hex::encode(addr));
572 Some(Ok(addr))
573 }
574 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
575 address: addr,
576 }) => {
577 debug!("Chunk already exists at {}", hex::encode(addr));
578 Some(Ok(addr))
579 }
580 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
581 Some(Err(Error::Payment(format!("Payment required: {message}"))))
582 }
583 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
584 // Preserve the structured remote reason instead of
585 // flattening it into `Error::Protocol`. The node
586 // responded, so the transport round-trip succeeded —
587 // this is an application-level rejection and must not
588 // suppress the store AIMD limiter (V2-468).
589 Some(Err(Error::RemotePut {
590 address: addr_hex.clone(),
591 source: e,
592 }))
593 }
594 _ => None,
595 },
596 |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
597 || {
598 Error::Timeout(format!(
599 "Timeout waiting for store response after {timeout_secs}s"
600 ))
601 },
602 )
603 .await;
604
605 result
606 }
607
608 /// Retrieve a chunk from the Autonomi network.
609 ///
610 /// Queries all peers in the close group for the chunk address,
611 /// returning the first successful response. This handles the case
612 /// where the storing peer differs from the first peer returned by
613 /// DHT routing.
614 ///
615 /// ## Adaptive controller feedback
616 ///
617 /// Each per-peer GET attempt is fed individually to the adaptive
618 /// fetch limiter via `controller().fetch.observe(...)`. This is
619 /// deliberately finer-grained than wrapping the outer `chunk_get`
620 /// with `observe_op`: when a chunk takes 6 peer tries to land,
621 /// 5 of them are real capacity signals (timeouts / network errors)
622 /// that should pull the cap down even if the chunk eventually
623 /// succeeds. The outer `Ok(_)` would mask all five as a single
624 /// `Outcome::Success`. See `adaptive::Outcome` for the per-attempt
625 /// classification rules used below.
626 ///
627 /// Callers should therefore NOT wrap `chunk_get` in `observe_op`.
628 ///
629 /// # Errors
630 ///
631 /// Returns an error if the network operation fails.
632 pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
633 self.chunk_get_from_closest_peers(address, self.config().close_group_size)
634 .await
635 }
636
637 /// Retrieve a chunk from the requested number of closest peers.
638 ///
639 /// Queries peers in XOR-distance order for the chunk address,
640 /// returning the first successful response. This handles the case
641 /// where the storing peer differs from the first peer returned by
642 /// DHT routing.
643 ///
644 /// # Errors
645 ///
646 /// Returns an error if the network operation fails.
647 pub async fn chunk_get_from_closest_peers(
648 &self,
649 address: &XorName,
650 peer_count: usize,
651 ) -> Result<Option<DataChunk>> {
652 // Check cache first, with integrity verification.
653 if let Some(cached) = self.chunk_cache().get(address) {
654 let computed = compute_address(&cached);
655 if computed == *address {
656 debug!("Cache hit for chunk {}", hex::encode(address));
657 return Ok(Some(DataChunk::new(*address, cached)));
658 }
659 // Cache entry corrupted — evict and fall through to network fetch.
660 debug!(
661 "Cache corruption detected for {}: evicting",
662 hex::encode(address)
663 );
664 self.chunk_cache().remove(address);
665 }
666
667 let addr_hex = hex::encode(address);
668
669 // First attempt against the current close-group view. A
670 // lookup/transport error here (e.g. closest_peers' DHT walk
671 // momentarily returning an error, or InsufficientPeers from a
672 // thin routing table) is NOT fatal: fall through to the retry
673 // path exactly as a non-authoritative miss would. Otherwise one
674 // transient error on the *initial* close-group walk for a single
675 // chunk would fail an entire multi-hundred-chunk download. A
676 // zeroed outcome (queried=0) is never authoritative, so it flows
677 // straight to the retry below.
678 let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
679 Ok(outcome) => outcome,
680 Err(e) => {
681 info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
682 CloseGroupOutcome {
683 chunk: None,
684 queried: 0,
685 not_found: 0,
686 timeout: 0,
687 network_err: 0,
688 protocol_err: 0,
689 }
690 }
691 };
692 if let Some(chunk) = first.chunk {
693 self.chunk_cache().put(chunk.address, chunk.content.clone());
694 return Ok(Some(chunk));
695 }
696
697 // Only treat as authoritative absence when *every* queried peer
698 // responded NotFound. Anything less leaves the actual storer
699 // possibly in the timeout / network-error bucket, which a retry
700 // could reach.
701 if is_authoritative_not_found(first.not_found, first.queried) {
702 info!(
703 "chunk_get giving up on {addr_hex} (unanimous NotFound): \
704 queried={} not_found={} timeout={} network_err={} protocol_err={}",
705 first.queried,
706 first.not_found,
707 first.timeout,
708 first.network_err,
709 first.protocol_err,
710 );
711 return Ok(None);
712 }
713
714 // Otherwise the failure looks like reachability (most peers timed out
715 // or hit transport errors). The chunk is most likely still on the
716 // network but the current close-group view either (a) caught a
717 // transient transport blip or (b) converged on the wrong neighbourhood
718 // because the routing table is thin. One retry against a freshly
719 // re-walked close group is the cheapest defence against both.
720 info!(
721 "chunk_get retrying {addr_hex} after reachability failure: \
722 queried={} not_found={} timeout={} network_err={} protocol_err={}",
723 first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
724 );
725
726 // Brief settle so any in-flight transport state can quiesce before
727 // we re-walk the DHT. Keep this small so we don't add meaningful
728 // latency to the genuinely-lost case (we already paid for one full
729 // close-group sweep before getting here).
730 tokio::time::sleep(Duration::from_secs(1)).await;
731
732 // If the retry's DHT lookup itself fails, treat that as "still
733 // couldn't find" rather than escalating the error — matches the
734 // semantics of the first attempt when peers are unreachable.
735 let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
736 Ok(o) => o,
737 Err(e) => {
738 info!(
739 "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
740 first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
741 first.queried,
742 first.not_found,
743 first.timeout,
744 first.network_err,
745 first.protocol_err,
746 );
747 return Ok(None);
748 }
749 };
750 if let Some(chunk) = retry.chunk {
751 info!("chunk_get retry succeeded for {addr_hex}");
752 self.chunk_cache().put(chunk.address, chunk.content.clone());
753 return Ok(Some(chunk));
754 }
755
756 info!(
757 "chunk_get exhausted close group after retry for {addr_hex}: \
758 first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
759 retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
760 first.queried,
761 first.not_found,
762 first.timeout,
763 first.network_err,
764 first.protocol_err,
765 retry.queried,
766 retry.not_found,
767 retry.timeout,
768 retry.network_err,
769 retry.protocol_err,
770 );
771 Ok(None)
772 }
773
774 /// One sweep of the requested closest peers: fetch the closest peers
775 /// for `address` from the DHT and ask each for the chunk in turn,
776 /// returning on the first success.
777 async fn chunk_get_try_closest_peers(
778 &self,
779 address: &XorName,
780 peer_count: usize,
781 ) -> Result<CloseGroupOutcome> {
782 let peers = self.closest_peers(address, peer_count).await?;
783 let addr_hex = hex::encode(address);
784 let queried = peers.len();
785 let mut not_found = 0usize;
786 let mut timeout = 0usize;
787 let mut network_err = 0usize;
788 let mut protocol_err = 0usize;
789
790 for (peer, addrs) in &peers {
791 match self.chunk_get_from_peer(address, peer, addrs).await {
792 Ok(Some(chunk)) => {
793 return Ok(CloseGroupOutcome {
794 chunk: Some(chunk),
795 queried,
796 not_found,
797 timeout,
798 network_err,
799 protocol_err,
800 });
801 }
802 Ok(None) => {
803 not_found += 1;
804 debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
805 }
806 Err(Error::Timeout(_)) => {
807 timeout += 1;
808 debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
809 }
810 Err(Error::Network(_)) => {
811 network_err += 1;
812 debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
813 }
814 // A `Protocol` error here is the storer responding with
815 // `ChunkGetResponse::Error(...)` — e.g. "Chunk verification
816 // failed" from a peer that has a corrupted local copy.
817 // That's a per-peer problem, not a per-chunk one: the
818 // remaining peers might still have a clean copy, so
819 // continue the sweep rather than aborting it. Counted
820 // separately from network_err so the summary log still
821 // distinguishes "peer corrupted" from "peer unreachable".
822 Err(Error::Protocol(ref e)) => {
823 protocol_err += 1;
824 debug!(
825 "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
826 );
827 }
828 Err(e) => return Err(e),
829 }
830 }
831
832 Ok(CloseGroupOutcome {
833 chunk: None,
834 queried,
835 not_found,
836 timeout,
837 network_err,
838 protocol_err,
839 })
840 }
841
842 /// Retrieve a chunk from every peer in the close group.
843 ///
844 /// Unlike [`Client::chunk_get`], this method does not return early
845 /// after the first successful response. It returns one result per
846 /// close-group peer, sorted from closest XOR distance to furthest.
847 ///
848 /// # Errors
849 ///
850 /// Returns an error if the close-group lookup fails.
851 pub async fn chunk_get_from_close_group(
852 &self,
853 address: &XorName,
854 ) -> Result<Vec<ChunkPeerGetResult>> {
855 self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
856 .await
857 }
858
859 /// Retrieve a chunk from the requested number of closest peers.
860 ///
861 /// Unlike [`Client::chunk_get_from_closest_peers`], this method does
862 /// not return early after the first successful response. It returns
863 /// one result per queried peer, sorted from closest XOR distance to
864 /// furthest.
865 ///
866 /// # Errors
867 ///
868 /// Returns an error if the DHT lookup fails.
869 pub async fn chunk_get_from_closest_peer_group(
870 &self,
871 address: &XorName,
872 peer_count: usize,
873 ) -> Result<Vec<ChunkPeerGetResult>> {
874 let peers = self.closest_peers(address, peer_count).await?;
875 let targets = chunk_peer_get_targets(peers, address);
876 let concurrency_limit =
877 diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
878 let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
879 let overall_timeout =
880 diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
881
882 let mut completed = vec![false; targets.len()];
883 let mut results = Vec::with_capacity(targets.len());
884 let mut get_results = stream::iter(targets.iter().cloned())
885 .map(|target| async move {
886 let chunk_result = self
887 .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
888 .await;
889
890 if let Ok(Some(chunk)) = &chunk_result {
891 self.chunk_cache().put(chunk.address, chunk.content.clone());
892 }
893
894 (
895 target.index,
896 ChunkPeerGetResult {
897 peer_id: target.peer_id,
898 peer_addrs: target.peer_addrs,
899 xor_distance: target.xor_distance,
900 chunk_result,
901 },
902 )
903 })
904 .buffer_unordered(concurrency_limit);
905
906 let collect_results = async {
907 while let Some((index, result)) = get_results.next().await {
908 completed[index] = true;
909 results.push(result);
910 }
911 };
912
913 if tokio::time::timeout(overall_timeout, collect_results)
914 .await
915 .is_err()
916 {
917 for target in &targets {
918 if !completed[target.index] {
919 results.push(timed_out_chunk_peer_get_result(
920 target,
921 address,
922 overall_timeout,
923 ));
924 }
925 }
926 }
927
928 sort_chunk_peer_get_results(&mut results);
929 Ok(results)
930 }
931
932 /// Fetch a chunk from a specific peer.
933 async fn chunk_get_from_peer(
934 &self,
935 address: &XorName,
936 peer: &PeerId,
937 peer_addrs: &[MultiAddr],
938 ) -> Result<Option<DataChunk>> {
939 let node = self.network().node();
940 let request_id = self.next_request_id();
941 let request = ChunkGetRequest::new(*address);
942 let message = ChunkMessage {
943 request_id,
944 body: ChunkMessageBody::GetRequest(request),
945 };
946 let message_bytes = message
947 .encode()
948 .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
949
950 let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
951 let addr_hex = hex::encode(address);
952 let timeout_secs = self.config().chunk_get_timeout_secs;
953
954 let result = send_and_await_chunk_response(
955 node,
956 peer,
957 message_bytes,
958 request_id,
959 timeout,
960 peer_addrs,
961 |body| match body {
962 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
963 address: addr,
964 content,
965 }) => {
966 if addr != *address {
967 return Some(Err(Error::InvalidData(format!(
968 "Mismatched chunk address: expected {addr_hex}, got {}",
969 hex::encode(addr)
970 ))));
971 }
972
973 let computed = compute_address(&content);
974 if computed != addr {
975 return Some(Err(Error::InvalidData(format!(
976 "Invalid chunk content: expected hash {addr_hex}, got {}",
977 hex::encode(computed)
978 ))));
979 }
980
981 debug!(
982 "Retrieved chunk {} ({} bytes) from peer {peer}",
983 hex::encode(addr),
984 content.len()
985 );
986 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
987 }
988 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
989 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
990 Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
991 )),
992 _ => None,
993 },
994 |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
995 || {
996 Error::Timeout(format!(
997 "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
998 ))
999 },
1000 )
1001 .await;
1002
1003 result
1004 }
1005
1006 /// Check if a chunk exists on the network.
1007 ///
1008 /// # Errors
1009 ///
1010 /// Returns an error if the network operation fails.
1011 pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
1012 self.chunk_get(address).await.map(|opt| opt.is_some())
1013 }
1014
1015 /// Finalize a single-chunk publish after an external signer has paid.
1016 ///
1017 /// Single-chunk analogue of [`Client::finalize_upload`]. Takes a
1018 /// [`PreparedChunk`] (from [`Client::prepare_chunk_payment`]) and a
1019 /// `quote_hash -> tx_hash` map containing receipts for every non-zero
1020 /// quote in the chunk's payment. Builds the `PaymentProof` and stores
1021 /// the chunk on `CLOSE_GROUP_MAJORITY` peers, returning its address.
1022 ///
1023 /// Wave-batch payment shape only. Single-chunk publishes don't need
1024 /// Merkle batching: one chunk's worth of quotes is well below the
1025 /// wave-batch threshold.
1026 ///
1027 /// # Errors
1028 ///
1029 /// Returns an error if the proof construction fails (e.g. missing
1030 /// `tx_hash` for a non-zero quote) or if fewer than
1031 /// `CLOSE_GROUP_MAJORITY` peers accept the chunk.
1032 pub async fn finalize_chunk(
1033 &self,
1034 prepared: PreparedChunk,
1035 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1036 ) -> Result<XorName> {
1037 let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
1038 // finalize_batch_payment returns one PaidChunk per PreparedChunk
1039 // input; we passed exactly one. If that invariant is ever violated
1040 // it's an upstream bug — fail loudly rather than silently address-0.
1041 let chunk = paid.pop().ok_or_else(|| {
1042 Error::Payment(
1043 "finalize_batch_payment returned no paid chunks for a single \
1044 prepared chunk — internal invariant violated"
1045 .into(),
1046 )
1047 })?;
1048 self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
1049 .await
1050 }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 use super::*;
1056 use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
1057
1058 /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
1059 const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
1060 /// Sentinel byte used to represent an unknown/unrecognized proof tag.
1061 const UNKNOWN_PROOF_TAG: u8 = 0xff;
1062 /// XorName byte width used by test peer IDs and distances.
1063 const TEST_XORNAME_BYTE_LEN: usize = 32;
1064 /// Last byte position in the test XOR distance arrays.
1065 const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
1066
1067 #[test]
1068 fn classify_put_failure_maps_remote_timeout_and_dial_reasons() {
1069 let remote = |source| Error::RemotePut {
1070 address: "test-addr".to_string(),
1071 source,
1072 };
1073 assert!(matches!(
1074 classify_put_failure(&remote(ProtocolError::StorageFailed("full".to_string()))),
1075 PutRejection::Full
1076 ));
1077 assert!(matches!(
1078 classify_put_failure(&remote(ProtocolError::PaymentFailed(
1079 "below floor".to_string()
1080 ))),
1081 PutRejection::PriceFloor
1082 ));
1083 assert!(matches!(
1084 classify_put_failure(&remote(ProtocolError::Internal("boom".to_string()))),
1085 PutRejection::OtherRemote
1086 ));
1087 // A `PaymentRequired` PUT response surfaces as `Error::Payment` and is an
1088 // application-level decline, not a transport shortfall (ADR-0002).
1089 assert!(matches!(
1090 classify_put_failure(&Error::Payment("Payment required: more".to_string())),
1091 PutRejection::PriceFloor
1092 ));
1093 // A PUT-response timeout is genuine local backpressure (V2-554).
1094 assert!(matches!(
1095 classify_put_failure(&Error::Timeout("no response".to_string())),
1096 PutRejection::Timeout
1097 ));
1098 // A dial/relay failure (dead/stale relayed address) is remote churn.
1099 assert!(matches!(
1100 classify_put_failure(&Error::Network("dial failed".to_string())),
1101 PutRejection::Dial
1102 ));
1103 }
1104
1105 #[test]
1106 fn put_shortfall_routes_by_failure_mix() {
1107 let app = || Error::Payment("Payment required: more".to_string());
1108 let msg = || "shortfall".to_string();
1109
1110 // Every failure was an application-level decline (no timeout, no dial):
1111 // surface the app error so the limiter isn't driven down as a false
1112 // capacity signal (ADR-0002 / V2-468).
1113 assert!(matches!(
1114 put_shortfall_error(0, 0, Some(app()), msg()),
1115 Error::Payment(_)
1116 ));
1117 // Any PUT-response timeout in the mix is genuine local backpressure:
1118 // keep it a capacity signal so the store limiter still backs off (V2-554).
1119 assert!(matches!(
1120 put_shortfall_error(1, 0, Some(app()), msg()),
1121 Error::InsufficientPeers(_)
1122 ));
1123 assert!(matches!(
1124 put_shortfall_error(1, 3, None, msg()),
1125 Error::InsufficientPeers(_)
1126 ));
1127 // No timeouts but dial/relay churn present: remote peer churn, not local
1128 // capacity — surface a neutral CloseGroupShortfall (V2-554).
1129 assert!(matches!(
1130 put_shortfall_error(0, 2, None, msg()),
1131 Error::CloseGroupShortfall(_)
1132 ));
1133 // Dial churn alongside an app rejection, still no timeout: neutral.
1134 assert!(matches!(
1135 put_shortfall_error(0, 1, Some(app()), msg()),
1136 Error::CloseGroupShortfall(_)
1137 ));
1138 }
1139
1140 fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
1141 let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
1142 xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
1143
1144 ChunkPeerGetResult {
1145 peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
1146 peer_addrs: Vec::new(),
1147 xor_distance,
1148 chunk_result: Ok(None),
1149 }
1150 }
1151
1152 #[test]
1153 fn authoritative_not_found_requires_unanimous_well_sampled_response() {
1154 // Unanimous AND well-sampled: every queried peer of a full
1155 // close group said NotFound. The only safe stop.
1156 assert!(is_authoritative_not_found(7, 7));
1157 // Unanimous with exactly a majority-sized sample is also
1158 // authoritative.
1159 assert!(is_authoritative_not_found(
1160 CLOSE_GROUP_MAJORITY,
1161 CLOSE_GROUP_MAJORITY
1162 ));
1163
1164 // Unanimous but UNDER-sampled: a thin DHT walk returning 1 or 3
1165 // peers, all NotFound, is NOT authoritative — the real replica
1166 // majority may sit entirely outside that narrow view. Must
1167 // retry (re-walk the DHT).
1168 assert!(!is_authoritative_not_found(1, 1));
1169 assert!(!is_authoritative_not_found(3, 3));
1170 assert!(!is_authoritative_not_found(
1171 CLOSE_GROUP_MAJORITY - 1,
1172 CLOSE_GROUP_MAJORITY - 1
1173 ));
1174
1175 // Not unanimous: 4-of-7 / 6-of-7 NotFound leaves storers in the
1176 // timeout bucket. Must retry.
1177 assert!(!is_authoritative_not_found(4, 7));
1178 assert!(!is_authoritative_not_found(6, 7));
1179
1180 // Pure-reachability failure — must retry.
1181 assert!(!is_authoritative_not_found(0, 7));
1182
1183 // Defensive: a zeroed outcome (e.g. the first attempt's
1184 // close-group lookup errored) is never authoritative.
1185 assert!(!is_authoritative_not_found(0, 0));
1186 }
1187
1188 #[test]
1189 fn chunk_get_outcome_classifies_each_result_kind() {
1190 // Success: chunk_get returned a chunk, regardless of how many
1191 // internal peer attempts it took.
1192 let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
1193 assert_eq!(
1194 chunk_get_outcome(&Ok(Some(chunk))),
1195 Outcome::Success,
1196 "found-chunk must be Success",
1197 );
1198
1199 // Ok(None): chunk_get exhausted the close group across first
1200 // attempt + retry. This is the load-shedding signal — count it
1201 // as Timeout so a sustained run of them on a saturated link
1202 // shrinks the cap.
1203 assert_eq!(
1204 chunk_get_outcome(&Ok(None)),
1205 Outcome::Timeout,
1206 "Ok(None) must be Timeout — that's the controller's load-shedding signal",
1207 );
1208
1209 // Capacity signals from explicit error variants.
1210 assert_eq!(
1211 chunk_get_outcome(&Err(Error::Timeout("t".into()))),
1212 Outcome::Timeout,
1213 );
1214 assert_eq!(
1215 chunk_get_outcome(&Err(Error::Network("n".into()))),
1216 Outcome::NetworkError,
1217 );
1218
1219 // Unexpected error variant (e.g. Protocol) — propagates out of
1220 // chunk_get to the caller and is not a capacity signal.
1221 assert_eq!(
1222 chunk_get_outcome(&Err(Error::Protocol("p".into()))),
1223 Outcome::ApplicationError,
1224 );
1225 }
1226
1227 #[test]
1228 fn single_node_proof_uses_store_response_timeout() {
1229 let timeout =
1230 store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
1231
1232 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1233 }
1234
1235 #[test]
1236 fn unknown_proof_uses_store_response_timeout() {
1237 let timeout =
1238 store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
1239
1240 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1241 }
1242
1243 #[test]
1244 fn merkle_proof_uses_configured_store_timeout() {
1245 let timeout =
1246 store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
1247
1248 assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
1249 }
1250
1251 #[test]
1252 fn chunk_peer_get_results_sort_by_xor_distance() {
1253 let mut results = vec![
1254 chunk_peer_get_result(3, 3),
1255 chunk_peer_get_result(1, 1),
1256 chunk_peer_get_result(2, 2),
1257 ];
1258
1259 sort_chunk_peer_get_results(&mut results);
1260
1261 let ordered_distances = results
1262 .iter()
1263 .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
1264 .collect::<Vec<_>>();
1265 assert_eq!(ordered_distances, vec![1, 2, 3]);
1266 }
1267
1268 #[test]
1269 fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
1270 const PER_PEER_TIMEOUT_SECS: u64 = 10;
1271 const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
1272 const TARGET_COUNT: usize = 7;
1273 const CONCURRENCY_LIMIT: usize = 7;
1274
1275 let timeout = diagnostic_peer_get_overall_timeout(
1276 Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1277 TARGET_COUNT,
1278 CONCURRENCY_LIMIT,
1279 );
1280
1281 assert_eq!(
1282 timeout,
1283 Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1284 );
1285 }
1286
1287 #[test]
1288 fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
1289 const PER_PEER_TIMEOUT_SECS: u64 = 10;
1290 const TARGET_COUNT: usize = 20;
1291 const CLOSE_GROUP_SIZE: usize = 7;
1292 const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
1293
1294 let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
1295 let timeout = diagnostic_peer_get_overall_timeout(
1296 Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1297 TARGET_COUNT,
1298 concurrency_limit,
1299 );
1300
1301 assert_eq!(
1302 timeout,
1303 Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1304 );
1305 }
1306
1307 /// Regression: the default `merkle_store_timeout_secs` must be at
1308 /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus
1309 /// padding. If either side moves and this invariant breaks, the
1310 /// client will give up on chunks the storer is still verifying.
1311 /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the
1312 /// derivation.
1313 #[test]
1314 fn default_merkle_store_timeout_satisfies_storer_invariant() {
1315 use crate::data::client::ClientConfig;
1316 const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
1317 const MIN_PADDING_SECS: u64 = 30;
1318 let config = ClientConfig::default();
1319 assert!(
1320 config.merkle_store_timeout_secs
1321 >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
1322 "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
1323 config.merkle_store_timeout_secs,
1324 STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
1325 MIN_PADDING_SECS,
1326 );
1327 }
1328
1329 /// Regression: the non-merkle PUT path uses the hardcoded
1330 /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config
1331 /// `merkle_store_timeout_secs`. If a future refactor accidentally
1332 /// routes non-merkle PUTs through the merkle field they'd inherit
1333 /// the 270 s value and silently regress non-merkle latency.
1334 /// `store_response_timeout_for_proof` with a non-merkle proof tag
1335 /// must return the const regardless of what merkle timeout is
1336 /// passed.
1337 #[test]
1338 fn non_merkle_put_ignores_merkle_timeout_value() {
1339 let absurd_merkle_timeout = 9_999;
1340 for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
1341 let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
1342 assert_eq!(
1343 timeout, STORE_RESPONSE_TIMEOUT,
1344 "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
1345 );
1346 }
1347 }
1348}