1use crate::data::client::adaptive::Outcome;
7use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
8use crate::data::client::peer_cache::record_peer_outcome;
9use crate::data::client::peer_xor_distance;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{QuoteHash, TxHash};
13use ant_protocol::transport::{MultiAddr, PeerId};
14use ant_protocol::{
15 compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
16 ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
17 ProofType, XorName, CLOSE_GROUP_MAJORITY,
18};
19use bytes::Bytes;
20use futures::stream::{self, FuturesUnordered, StreamExt};
21use std::collections::HashMap;
22use std::future::Future;
23use std::time::{Duration, Instant};
24use tracing::{debug, info, warn};
25
26const CHUNK_DATA_TYPE: u32 = 0;
28
29struct CloseGroupOutcome {
41 chunk: Option<DataChunk>,
42 queried: usize,
43 not_found: usize,
44 timeout: usize,
45 network_err: usize,
46 protocol_err: usize,
53}
54
55fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
78 queried >= CLOSE_GROUP_MAJORITY && not_found == queried
79}
80
81const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
83
84const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
86
87pub struct ChunkPeerGetResult {
89 pub peer_id: PeerId,
91 pub peer_addrs: Vec<MultiAddr>,
93 pub xor_distance: [u8; 32],
95 pub chunk_result: Result<Option<DataChunk>>,
97}
98
99#[derive(Clone)]
100struct ChunkPeerGetTarget {
101 index: usize,
102 peer_id: PeerId,
103 peer_addrs: Vec<MultiAddr>,
104 xor_distance: [u8; 32],
105}
106
107fn chunk_peer_get_targets(
108 peers: Vec<(PeerId, Vec<MultiAddr>)>,
109 address: &XorName,
110) -> Vec<ChunkPeerGetTarget> {
111 peers
112 .into_iter()
113 .enumerate()
114 .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
115 index,
116 peer_id,
117 peer_addrs,
118 xor_distance: peer_xor_distance(&peer_id, address),
119 })
120 .collect()
121}
122
123fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
124 results.sort_by_key(|result| result.xor_distance);
125}
126
127fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
128 peer_count.min(close_group_size.max(1))
129}
130
131fn diagnostic_peer_get_overall_timeout(
132 per_peer_timeout: Duration,
133 target_count: usize,
134 concurrency_limit: usize,
135) -> Duration {
136 let concurrency_limit = concurrency_limit.max(1);
137 let peer_get_waves = target_count.div_ceil(concurrency_limit);
138 let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
139 let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
140
141 per_peer_timeout.saturating_mul(timeout_waves)
142}
143
144fn timed_out_chunk_peer_get_result(
145 target: &ChunkPeerGetTarget,
146 address: &XorName,
147 timeout: Duration,
148) -> ChunkPeerGetResult {
149 let addr_hex = hex::encode(address);
150 let timeout_secs = timeout.as_secs();
151 ChunkPeerGetResult {
152 peer_id: target.peer_id,
153 peer_addrs: target.peer_addrs.clone(),
154 xor_distance: target.xor_distance,
155 chunk_result: Err(Error::Timeout(format!(
156 "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
157 target.peer_id
158 ))),
159 }
160}
161
162fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
163 match detect_proof_type(proof) {
164 Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
165 _ => STORE_RESPONSE_TIMEOUT,
166 }
167}
168
169impl Client {
170 pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
181 let started = Instant::now();
182 let result = self.chunk_get(address).await;
183 let latency = started.elapsed();
184 let bytes = result
185 .as_ref()
186 .ok()
187 .and_then(Option::as_ref)
188 .map_or(0, |chunk| chunk.content.len() as u64);
189 self.controller()
190 .fetch
191 .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
192 result
193 }
194}
195
196pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
213 match result {
214 Ok(Some(_)) => Outcome::Success,
215 Ok(None) => Outcome::Timeout,
216 Err(Error::Timeout(_)) => Outcome::Timeout,
217 Err(Error::Network(_)) => Outcome::NetworkError,
218 Err(_) => Outcome::ApplicationError,
219 }
220}
221
222impl Client {
223 pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
234 let address = compute_address(&content);
235 let data_size = u64::try_from(content.len())
236 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
237
238 match self
239 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
240 .await
241 {
242 Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
243 Err(Error::AlreadyStored) => {
244 debug!(
245 "Chunk {} already stored on network, skipping payment",
246 hex::encode(address)
247 );
248 Ok(address)
249 }
250 Err(e) => Err(e),
251 }
252 }
253
254 pub(crate) async fn chunk_put_to_close_group(
266 &self,
267 content: Bytes,
268 proof: Vec<u8>,
269 peers: &[(PeerId, Vec<MultiAddr>)],
270 ) -> Result<XorName> {
271 let address = compute_address(&content);
272
273 let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
274 let (initial_peers, fallback_peers) = peers.split_at(initial_count);
275
276 let mut put_futures = FuturesUnordered::new();
277 for (peer_id, addrs) in initial_peers {
278 put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
279 }
280
281 let mut success_count = 0usize;
282 let mut failures: Vec<String> = Vec::new();
283 let mut fallback_iter = fallback_peers.iter();
284
285 while let Some((peer_id, result)) = put_futures.next().await {
286 match result {
287 Ok(_) => {
288 success_count += 1;
289 if success_count >= CLOSE_GROUP_MAJORITY {
290 debug!(
291 "Chunk {} stored on {success_count} peers (majority reached)",
292 hex::encode(address)
293 );
294 return Ok(address);
295 }
296 }
297 Err(e) => {
298 warn!("Failed to store chunk on {peer_id}: {e}");
299 failures.push(format!("{peer_id}: {e}"));
300
301 if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
302 debug!(
303 "Falling back to peer {fb_peer} for chunk {}",
304 hex::encode(address)
305 );
306 put_futures.push(self.spawn_chunk_put(
307 content.clone(),
308 proof.clone(),
309 fb_peer,
310 fb_addrs,
311 ));
312 }
313 }
314 }
315 }
316
317 Err(Error::InsufficientPeers(format!(
318 "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
319 failures.join("; ")
320 )))
321 }
322
323 fn spawn_chunk_put<'a>(
325 &'a self,
326 content: Bytes,
327 proof: Vec<u8>,
328 peer_id: &'a PeerId,
329 addrs: &'a [MultiAddr],
330 ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
331 let peer_id_owned = *peer_id;
332 async move {
333 let result = self
334 .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
335 .await;
336 (peer_id_owned, result)
337 }
338 }
339
340 pub async fn chunk_put_with_proof(
349 &self,
350 content: Bytes,
351 proof: Vec<u8>,
352 target_peer: &PeerId,
353 peer_addrs: &[MultiAddr],
354 ) -> Result<XorName> {
355 let address = compute_address(&content);
356 let node = self.network().node();
357 let timeout =
358 store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
359 let timeout_secs = timeout.as_secs();
360
361 let request_id = self.next_request_id();
362 let request = ChunkPutRequest::with_payment(address, content, proof);
366 let message = ChunkMessage {
367 request_id,
368 body: ChunkMessageBody::PutRequest(request),
369 };
370 let message_bytes = message
371 .encode()
372 .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
373
374 let addr_hex = hex::encode(address);
375
376 let result = send_and_await_chunk_response(
377 node,
378 target_peer,
379 message_bytes,
380 request_id,
381 timeout,
382 peer_addrs,
383 |body| match body {
384 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
385 debug!("Chunk stored at {}", hex::encode(addr));
386 Some(Ok(addr))
387 }
388 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
389 address: addr,
390 }) => {
391 debug!("Chunk already exists at {}", hex::encode(addr));
392 Some(Ok(addr))
393 }
394 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
395 Some(Err(Error::Payment(format!("Payment required: {message}"))))
396 }
397 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
398 Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
399 )),
400 _ => None,
401 },
402 |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
403 || {
404 Error::Timeout(format!(
405 "Timeout waiting for store response after {timeout_secs}s"
406 ))
407 },
408 )
409 .await;
410
411 record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
416
417 result
418 }
419
420 pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
445 self.chunk_get_from_closest_peers(address, self.config().close_group_size)
446 .await
447 }
448
449 pub async fn chunk_get_from_closest_peers(
460 &self,
461 address: &XorName,
462 peer_count: usize,
463 ) -> Result<Option<DataChunk>> {
464 if let Some(cached) = self.chunk_cache().get(address) {
466 let computed = compute_address(&cached);
467 if computed == *address {
468 debug!("Cache hit for chunk {}", hex::encode(address));
469 return Ok(Some(DataChunk::new(*address, cached)));
470 }
471 debug!(
473 "Cache corruption detected for {}: evicting",
474 hex::encode(address)
475 );
476 self.chunk_cache().remove(address);
477 }
478
479 let addr_hex = hex::encode(address);
480
481 let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
491 Ok(outcome) => outcome,
492 Err(e) => {
493 info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
494 CloseGroupOutcome {
495 chunk: None,
496 queried: 0,
497 not_found: 0,
498 timeout: 0,
499 network_err: 0,
500 protocol_err: 0,
501 }
502 }
503 };
504 if let Some(chunk) = first.chunk {
505 self.chunk_cache().put(chunk.address, chunk.content.clone());
506 return Ok(Some(chunk));
507 }
508
509 if is_authoritative_not_found(first.not_found, first.queried) {
514 info!(
515 "chunk_get giving up on {addr_hex} (unanimous NotFound): \
516 queried={} not_found={} timeout={} network_err={} protocol_err={}",
517 first.queried,
518 first.not_found,
519 first.timeout,
520 first.network_err,
521 first.protocol_err,
522 );
523 return Ok(None);
524 }
525
526 info!(
533 "chunk_get retrying {addr_hex} after reachability failure: \
534 queried={} not_found={} timeout={} network_err={} protocol_err={}",
535 first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
536 );
537
538 tokio::time::sleep(Duration::from_secs(1)).await;
543
544 let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
548 Ok(o) => o,
549 Err(e) => {
550 info!(
551 "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
552 first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
553 first.queried,
554 first.not_found,
555 first.timeout,
556 first.network_err,
557 first.protocol_err,
558 );
559 return Ok(None);
560 }
561 };
562 if let Some(chunk) = retry.chunk {
563 info!("chunk_get retry succeeded for {addr_hex}");
564 self.chunk_cache().put(chunk.address, chunk.content.clone());
565 return Ok(Some(chunk));
566 }
567
568 info!(
569 "chunk_get exhausted close group after retry for {addr_hex}: \
570 first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
571 retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
572 first.queried,
573 first.not_found,
574 first.timeout,
575 first.network_err,
576 first.protocol_err,
577 retry.queried,
578 retry.not_found,
579 retry.timeout,
580 retry.network_err,
581 retry.protocol_err,
582 );
583 Ok(None)
584 }
585
586 async fn chunk_get_try_closest_peers(
590 &self,
591 address: &XorName,
592 peer_count: usize,
593 ) -> Result<CloseGroupOutcome> {
594 let peers = self.closest_peers(address, peer_count).await?;
595 let addr_hex = hex::encode(address);
596 let queried = peers.len();
597 let mut not_found = 0usize;
598 let mut timeout = 0usize;
599 let mut network_err = 0usize;
600 let mut protocol_err = 0usize;
601
602 for (peer, addrs) in &peers {
603 match self.chunk_get_from_peer(address, peer, addrs).await {
604 Ok(Some(chunk)) => {
605 return Ok(CloseGroupOutcome {
606 chunk: Some(chunk),
607 queried,
608 not_found,
609 timeout,
610 network_err,
611 protocol_err,
612 });
613 }
614 Ok(None) => {
615 not_found += 1;
616 debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
617 }
618 Err(Error::Timeout(_)) => {
619 timeout += 1;
620 debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
621 }
622 Err(Error::Network(_)) => {
623 network_err += 1;
624 debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
625 }
626 Err(Error::Protocol(ref e)) => {
635 protocol_err += 1;
636 debug!(
637 "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
638 );
639 }
640 Err(e) => return Err(e),
641 }
642 }
643
644 Ok(CloseGroupOutcome {
645 chunk: None,
646 queried,
647 not_found,
648 timeout,
649 network_err,
650 protocol_err,
651 })
652 }
653
654 pub async fn chunk_get_from_close_group(
664 &self,
665 address: &XorName,
666 ) -> Result<Vec<ChunkPeerGetResult>> {
667 self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
668 .await
669 }
670
671 pub async fn chunk_get_from_closest_peer_group(
682 &self,
683 address: &XorName,
684 peer_count: usize,
685 ) -> Result<Vec<ChunkPeerGetResult>> {
686 let peers = self.closest_peers(address, peer_count).await?;
687 let targets = chunk_peer_get_targets(peers, address);
688 let concurrency_limit =
689 diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
690 let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
691 let overall_timeout =
692 diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
693
694 let mut completed = vec![false; targets.len()];
695 let mut results = Vec::with_capacity(targets.len());
696 let mut get_results = stream::iter(targets.iter().cloned())
697 .map(|target| async move {
698 let chunk_result = self
699 .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
700 .await;
701
702 if let Ok(Some(chunk)) = &chunk_result {
703 self.chunk_cache().put(chunk.address, chunk.content.clone());
704 }
705
706 (
707 target.index,
708 ChunkPeerGetResult {
709 peer_id: target.peer_id,
710 peer_addrs: target.peer_addrs,
711 xor_distance: target.xor_distance,
712 chunk_result,
713 },
714 )
715 })
716 .buffer_unordered(concurrency_limit);
717
718 let collect_results = async {
719 while let Some((index, result)) = get_results.next().await {
720 completed[index] = true;
721 results.push(result);
722 }
723 };
724
725 if tokio::time::timeout(overall_timeout, collect_results)
726 .await
727 .is_err()
728 {
729 for target in &targets {
730 if !completed[target.index] {
731 results.push(timed_out_chunk_peer_get_result(
732 target,
733 address,
734 overall_timeout,
735 ));
736 }
737 }
738 }
739
740 sort_chunk_peer_get_results(&mut results);
741 Ok(results)
742 }
743
744 async fn chunk_get_from_peer(
746 &self,
747 address: &XorName,
748 peer: &PeerId,
749 peer_addrs: &[MultiAddr],
750 ) -> Result<Option<DataChunk>> {
751 let node = self.network().node();
752 let request_id = self.next_request_id();
753 let request = ChunkGetRequest::new(*address);
754 let message = ChunkMessage {
755 request_id,
756 body: ChunkMessageBody::GetRequest(request),
757 };
758 let message_bytes = message
759 .encode()
760 .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
761
762 let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
763 let addr_hex = hex::encode(address);
764 let timeout_secs = self.config().chunk_get_timeout_secs;
765
766 let start = Instant::now();
767 let result = send_and_await_chunk_response(
768 node,
769 peer,
770 message_bytes,
771 request_id,
772 timeout,
773 peer_addrs,
774 |body| match body {
775 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
776 address: addr,
777 content,
778 }) => {
779 if addr != *address {
780 return Some(Err(Error::InvalidData(format!(
781 "Mismatched chunk address: expected {addr_hex}, got {}",
782 hex::encode(addr)
783 ))));
784 }
785
786 let computed = compute_address(&content);
787 if computed != addr {
788 return Some(Err(Error::InvalidData(format!(
789 "Invalid chunk content: expected hash {addr_hex}, got {}",
790 hex::encode(computed)
791 ))));
792 }
793
794 debug!(
795 "Retrieved chunk {} ({} bytes) from peer {peer}",
796 hex::encode(addr),
797 content.len()
798 );
799 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
800 }
801 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
802 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
803 Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
804 )),
805 _ => None,
806 },
807 |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
808 || {
809 Error::Timeout(format!(
810 "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
811 ))
812 },
813 )
814 .await;
815
816 let success = result.is_ok();
817 let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
818 record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
819
820 result
821 }
822
823 pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
829 self.chunk_get(address).await.map(|opt| opt.is_some())
830 }
831
832 pub async fn finalize_chunk(
850 &self,
851 prepared: PreparedChunk,
852 tx_hash_map: &HashMap<QuoteHash, TxHash>,
853 ) -> Result<XorName> {
854 let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
855 let chunk = paid.pop().ok_or_else(|| {
859 Error::Payment(
860 "finalize_batch_payment returned no paid chunks for a single \
861 prepared chunk — internal invariant violated"
862 .into(),
863 )
864 })?;
865 self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
866 .await
867 }
868}
869
870#[cfg(test)]
871mod tests {
872 use super::*;
873 use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
874
875 const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
877 const UNKNOWN_PROOF_TAG: u8 = 0xff;
879 const TEST_XORNAME_BYTE_LEN: usize = 32;
881 const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
883
884 fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
885 let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
886 xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
887
888 ChunkPeerGetResult {
889 peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
890 peer_addrs: Vec::new(),
891 xor_distance,
892 chunk_result: Ok(None),
893 }
894 }
895
896 #[test]
897 fn authoritative_not_found_requires_unanimous_well_sampled_response() {
898 assert!(is_authoritative_not_found(7, 7));
901 assert!(is_authoritative_not_found(
904 CLOSE_GROUP_MAJORITY,
905 CLOSE_GROUP_MAJORITY
906 ));
907
908 assert!(!is_authoritative_not_found(1, 1));
913 assert!(!is_authoritative_not_found(3, 3));
914 assert!(!is_authoritative_not_found(
915 CLOSE_GROUP_MAJORITY - 1,
916 CLOSE_GROUP_MAJORITY - 1
917 ));
918
919 assert!(!is_authoritative_not_found(4, 7));
922 assert!(!is_authoritative_not_found(6, 7));
923
924 assert!(!is_authoritative_not_found(0, 7));
926
927 assert!(!is_authoritative_not_found(0, 0));
930 }
931
932 #[test]
933 fn chunk_get_outcome_classifies_each_result_kind() {
934 let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
937 assert_eq!(
938 chunk_get_outcome(&Ok(Some(chunk))),
939 Outcome::Success,
940 "found-chunk must be Success",
941 );
942
943 assert_eq!(
948 chunk_get_outcome(&Ok(None)),
949 Outcome::Timeout,
950 "Ok(None) must be Timeout — that's the controller's load-shedding signal",
951 );
952
953 assert_eq!(
955 chunk_get_outcome(&Err(Error::Timeout("t".into()))),
956 Outcome::Timeout,
957 );
958 assert_eq!(
959 chunk_get_outcome(&Err(Error::Network("n".into()))),
960 Outcome::NetworkError,
961 );
962
963 assert_eq!(
966 chunk_get_outcome(&Err(Error::Protocol("p".into()))),
967 Outcome::ApplicationError,
968 );
969 }
970
971 #[test]
972 fn single_node_proof_uses_store_response_timeout() {
973 let timeout =
974 store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
975
976 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
977 }
978
979 #[test]
980 fn unknown_proof_uses_store_response_timeout() {
981 let timeout =
982 store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
983
984 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
985 }
986
987 #[test]
988 fn merkle_proof_uses_configured_store_timeout() {
989 let timeout =
990 store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
991
992 assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
993 }
994
995 #[test]
996 fn chunk_peer_get_results_sort_by_xor_distance() {
997 let mut results = vec![
998 chunk_peer_get_result(3, 3),
999 chunk_peer_get_result(1, 1),
1000 chunk_peer_get_result(2, 2),
1001 ];
1002
1003 sort_chunk_peer_get_results(&mut results);
1004
1005 let ordered_distances = results
1006 .iter()
1007 .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
1008 .collect::<Vec<_>>();
1009 assert_eq!(ordered_distances, vec![1, 2, 3]);
1010 }
1011
1012 #[test]
1013 fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
1014 const PER_PEER_TIMEOUT_SECS: u64 = 10;
1015 const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
1016 const TARGET_COUNT: usize = 7;
1017 const CONCURRENCY_LIMIT: usize = 7;
1018
1019 let timeout = diagnostic_peer_get_overall_timeout(
1020 Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1021 TARGET_COUNT,
1022 CONCURRENCY_LIMIT,
1023 );
1024
1025 assert_eq!(
1026 timeout,
1027 Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1028 );
1029 }
1030
1031 #[test]
1032 fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
1033 const PER_PEER_TIMEOUT_SECS: u64 = 10;
1034 const TARGET_COUNT: usize = 20;
1035 const CLOSE_GROUP_SIZE: usize = 7;
1036 const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
1037
1038 let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
1039 let timeout = diagnostic_peer_get_overall_timeout(
1040 Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1041 TARGET_COUNT,
1042 concurrency_limit,
1043 );
1044
1045 assert_eq!(
1046 timeout,
1047 Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1048 );
1049 }
1050
1051 #[test]
1058 fn default_merkle_store_timeout_satisfies_storer_invariant() {
1059 use crate::data::client::ClientConfig;
1060 const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
1061 const MIN_PADDING_SECS: u64 = 30;
1062 let config = ClientConfig::default();
1063 assert!(
1064 config.merkle_store_timeout_secs
1065 >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
1066 "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
1067 config.merkle_store_timeout_secs,
1068 STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
1069 MIN_PADDING_SECS,
1070 );
1071 }
1072
1073 #[test]
1082 fn non_merkle_put_ignores_merkle_timeout_value() {
1083 let absurd_merkle_timeout = 9_999;
1084 for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
1085 let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
1086 assert_eq!(
1087 timeout, STORE_RESPONSE_TIMEOUT,
1088 "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
1089 );
1090 }
1091 }
1092}