1use crate::data::client::adaptive::Outcome;
7use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
8use crate::data::client::peer_xor_distance;
9use crate::data::client::Client;
10use crate::data::error::{Error, Result};
11use ant_protocol::evm::{QuoteHash, TxHash};
12use ant_protocol::transport::{MultiAddr, PeerId};
13use ant_protocol::{
14 compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
15 ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
16 ProofType, XorName, CLOSE_GROUP_MAJORITY,
17};
18use bytes::Bytes;
19use futures::stream::{self, FuturesUnordered, StreamExt};
20use std::collections::HashMap;
21use std::future::Future;
22use std::time::{Duration, Instant};
23use tracing::{debug, info, warn};
24
25const CHUNK_DATA_TYPE: u32 = 0;
27
28struct CloseGroupOutcome {
40 chunk: Option<DataChunk>,
41 queried: usize,
42 not_found: usize,
43 timeout: usize,
44 network_err: usize,
45 protocol_err: usize,
52}
53
54fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
77 queried >= CLOSE_GROUP_MAJORITY && not_found == queried
78}
79
80const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
82
83const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
85
86pub struct ChunkPeerGetResult {
88 pub peer_id: PeerId,
90 pub peer_addrs: Vec<MultiAddr>,
92 pub xor_distance: [u8; 32],
94 pub chunk_result: Result<Option<DataChunk>>,
96}
97
98#[derive(Clone)]
99struct ChunkPeerGetTarget {
100 index: usize,
101 peer_id: PeerId,
102 peer_addrs: Vec<MultiAddr>,
103 xor_distance: [u8; 32],
104}
105
106fn chunk_peer_get_targets(
107 peers: Vec<(PeerId, Vec<MultiAddr>)>,
108 address: &XorName,
109) -> Vec<ChunkPeerGetTarget> {
110 peers
111 .into_iter()
112 .enumerate()
113 .map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
114 index,
115 peer_id,
116 peer_addrs,
117 xor_distance: peer_xor_distance(&peer_id, address),
118 })
119 .collect()
120}
121
122fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
123 results.sort_by_key(|result| result.xor_distance);
124}
125
126fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
127 peer_count.min(close_group_size.max(1))
128}
129
130fn diagnostic_peer_get_overall_timeout(
131 per_peer_timeout: Duration,
132 target_count: usize,
133 concurrency_limit: usize,
134) -> Duration {
135 let concurrency_limit = concurrency_limit.max(1);
136 let peer_get_waves = target_count.div_ceil(concurrency_limit);
137 let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
138 let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
139
140 per_peer_timeout.saturating_mul(timeout_waves)
141}
142
143fn timed_out_chunk_peer_get_result(
144 target: &ChunkPeerGetTarget,
145 address: &XorName,
146 timeout: Duration,
147) -> ChunkPeerGetResult {
148 let addr_hex = hex::encode(address);
149 let timeout_secs = timeout.as_secs();
150 ChunkPeerGetResult {
151 peer_id: target.peer_id,
152 peer_addrs: target.peer_addrs.clone(),
153 xor_distance: target.xor_distance,
154 chunk_result: Err(Error::Timeout(format!(
155 "Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
156 target.peer_id
157 ))),
158 }
159}
160
161fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
162 match detect_proof_type(proof) {
163 Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
164 _ => STORE_RESPONSE_TIMEOUT,
165 }
166}
167
168impl Client {
169 pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
180 self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
181 .await
182 }
183
184 pub(crate) async fn chunk_get_observed_from_closest_peers(
185 &self,
186 address: &XorName,
187 peer_count: usize,
188 ) -> Result<Option<DataChunk>> {
189 let started = Instant::now();
190 let result = self.chunk_get_from_closest_peers(address, peer_count).await;
191 let latency = started.elapsed();
192 let bytes = result
193 .as_ref()
194 .ok()
195 .and_then(Option::as_ref)
196 .map_or(0, |chunk| chunk.content.len() as u64);
197 self.controller()
198 .fetch
199 .observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
200 result
201 }
202}
203
204pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
221 match result {
222 Ok(Some(_)) => Outcome::Success,
223 Ok(None) => Outcome::Timeout,
224 Err(Error::Timeout(_)) => Outcome::Timeout,
225 Err(Error::Network(_)) => Outcome::NetworkError,
226 Err(_) => Outcome::ApplicationError,
227 }
228}
229
230impl Client {
231 pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
242 let address = compute_address(&content);
243 let data_size = u64::try_from(content.len())
244 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
245
246 match self
247 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
248 .await
249 {
250 Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
251 Err(Error::AlreadyStored) => {
252 debug!(
253 "Chunk {} already stored on network, skipping payment",
254 hex::encode(address)
255 );
256 Ok(address)
257 }
258 Err(e) => Err(e),
259 }
260 }
261
262 pub(crate) async fn chunk_put_to_close_group(
274 &self,
275 content: Bytes,
276 proof: Vec<u8>,
277 peers: &[(PeerId, Vec<MultiAddr>)],
278 ) -> Result<XorName> {
279 let address = compute_address(&content);
280
281 let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
282 let (initial_peers, fallback_peers) = peers.split_at(initial_count);
283
284 let mut put_futures = FuturesUnordered::new();
285 for (peer_id, addrs) in initial_peers {
286 put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
287 }
288
289 let mut success_count = 0usize;
290 let mut failures: Vec<String> = Vec::new();
291 let mut had_non_rejection_failure = false;
301 let mut first_remote_rejection: Option<Error> = None;
302 let mut fallback_iter = fallback_peers.iter();
303
304 while let Some((peer_id, result)) = put_futures.next().await {
305 match result {
306 Ok(_) => {
307 success_count += 1;
308 if success_count >= CLOSE_GROUP_MAJORITY {
309 debug!(
310 "Chunk {} stored on {success_count} peers (majority reached)",
311 hex::encode(address)
312 );
313 return Ok(address);
314 }
315 }
316 Err(e) => {
317 warn!("Failed to store chunk on {peer_id}: {e}");
318 failures.push(format!("{peer_id}: {e}"));
319 if matches!(e, Error::RemotePut { .. }) {
320 if first_remote_rejection.is_none() {
321 first_remote_rejection = Some(e);
322 }
323 } else {
324 had_non_rejection_failure = true;
325 }
326
327 if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
328 debug!(
329 "Falling back to peer {fb_peer} for chunk {}",
330 hex::encode(address)
331 );
332 put_futures.push(self.spawn_chunk_put(
333 content.clone(),
334 proof.clone(),
335 fb_peer,
336 fb_addrs,
337 ));
338 }
339 }
340 }
341 }
342
343 if !had_non_rejection_failure {
349 if let Some(remote_rejection) = first_remote_rejection {
350 return Err(remote_rejection);
351 }
352 }
353
354 Err(Error::InsufficientPeers(format!(
355 "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
356 failures.join("; ")
357 )))
358 }
359
360 fn spawn_chunk_put<'a>(
362 &'a self,
363 content: Bytes,
364 proof: Vec<u8>,
365 peer_id: &'a PeerId,
366 addrs: &'a [MultiAddr],
367 ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
368 let peer_id_owned = *peer_id;
369 async move {
370 let result = self
371 .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
372 .await;
373 (peer_id_owned, result)
374 }
375 }
376
377 pub async fn chunk_put_with_proof(
386 &self,
387 content: Bytes,
388 proof: Vec<u8>,
389 target_peer: &PeerId,
390 peer_addrs: &[MultiAddr],
391 ) -> Result<XorName> {
392 let address = compute_address(&content);
393 let node = self.network().node();
394 let timeout =
395 store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
396 let timeout_secs = timeout.as_secs();
397
398 let request_id = self.next_request_id();
399 let request = ChunkPutRequest::with_payment(address, content, proof);
403 let message = ChunkMessage {
404 request_id,
405 body: ChunkMessageBody::PutRequest(request),
406 };
407 let message_bytes = message
408 .encode()
409 .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
410
411 let addr_hex = hex::encode(address);
412
413 let result = send_and_await_chunk_response(
414 node,
415 target_peer,
416 message_bytes,
417 request_id,
418 timeout,
419 peer_addrs,
420 |body| match body {
421 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
422 debug!("Chunk stored at {}", hex::encode(addr));
423 Some(Ok(addr))
424 }
425 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
426 address: addr,
427 }) => {
428 debug!("Chunk already exists at {}", hex::encode(addr));
429 Some(Ok(addr))
430 }
431 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
432 Some(Err(Error::Payment(format!("Payment required: {message}"))))
433 }
434 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
435 Some(Err(Error::RemotePut {
441 address: addr_hex.clone(),
442 source: e,
443 }))
444 }
445 _ => None,
446 },
447 |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
448 || {
449 Error::Timeout(format!(
450 "Timeout waiting for store response after {timeout_secs}s"
451 ))
452 },
453 )
454 .await;
455
456 result
457 }
458
459 pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
484 self.chunk_get_from_closest_peers(address, self.config().close_group_size)
485 .await
486 }
487
488 pub async fn chunk_get_from_closest_peers(
499 &self,
500 address: &XorName,
501 peer_count: usize,
502 ) -> Result<Option<DataChunk>> {
503 if let Some(cached) = self.chunk_cache().get(address) {
505 let computed = compute_address(&cached);
506 if computed == *address {
507 debug!("Cache hit for chunk {}", hex::encode(address));
508 return Ok(Some(DataChunk::new(*address, cached)));
509 }
510 debug!(
512 "Cache corruption detected for {}: evicting",
513 hex::encode(address)
514 );
515 self.chunk_cache().remove(address);
516 }
517
518 let addr_hex = hex::encode(address);
519
520 let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
530 Ok(outcome) => outcome,
531 Err(e) => {
532 info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
533 CloseGroupOutcome {
534 chunk: None,
535 queried: 0,
536 not_found: 0,
537 timeout: 0,
538 network_err: 0,
539 protocol_err: 0,
540 }
541 }
542 };
543 if let Some(chunk) = first.chunk {
544 self.chunk_cache().put(chunk.address, chunk.content.clone());
545 return Ok(Some(chunk));
546 }
547
548 if is_authoritative_not_found(first.not_found, first.queried) {
553 info!(
554 "chunk_get giving up on {addr_hex} (unanimous NotFound): \
555 queried={} not_found={} timeout={} network_err={} protocol_err={}",
556 first.queried,
557 first.not_found,
558 first.timeout,
559 first.network_err,
560 first.protocol_err,
561 );
562 return Ok(None);
563 }
564
565 info!(
572 "chunk_get retrying {addr_hex} after reachability failure: \
573 queried={} not_found={} timeout={} network_err={} protocol_err={}",
574 first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
575 );
576
577 tokio::time::sleep(Duration::from_secs(1)).await;
582
583 let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
587 Ok(o) => o,
588 Err(e) => {
589 info!(
590 "chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
591 first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
592 first.queried,
593 first.not_found,
594 first.timeout,
595 first.network_err,
596 first.protocol_err,
597 );
598 return Ok(None);
599 }
600 };
601 if let Some(chunk) = retry.chunk {
602 info!("chunk_get retry succeeded for {addr_hex}");
603 self.chunk_cache().put(chunk.address, chunk.content.clone());
604 return Ok(Some(chunk));
605 }
606
607 info!(
608 "chunk_get exhausted close group after retry for {addr_hex}: \
609 first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
610 retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
611 first.queried,
612 first.not_found,
613 first.timeout,
614 first.network_err,
615 first.protocol_err,
616 retry.queried,
617 retry.not_found,
618 retry.timeout,
619 retry.network_err,
620 retry.protocol_err,
621 );
622 Ok(None)
623 }
624
625 async fn chunk_get_try_closest_peers(
629 &self,
630 address: &XorName,
631 peer_count: usize,
632 ) -> Result<CloseGroupOutcome> {
633 let peers = self.closest_peers(address, peer_count).await?;
634 let addr_hex = hex::encode(address);
635 let queried = peers.len();
636 let mut not_found = 0usize;
637 let mut timeout = 0usize;
638 let mut network_err = 0usize;
639 let mut protocol_err = 0usize;
640
641 for (peer, addrs) in &peers {
642 match self.chunk_get_from_peer(address, peer, addrs).await {
643 Ok(Some(chunk)) => {
644 return Ok(CloseGroupOutcome {
645 chunk: Some(chunk),
646 queried,
647 not_found,
648 timeout,
649 network_err,
650 protocol_err,
651 });
652 }
653 Ok(None) => {
654 not_found += 1;
655 debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
656 }
657 Err(Error::Timeout(_)) => {
658 timeout += 1;
659 debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
660 }
661 Err(Error::Network(_)) => {
662 network_err += 1;
663 debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
664 }
665 Err(Error::Protocol(ref e)) => {
674 protocol_err += 1;
675 debug!(
676 "Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
677 );
678 }
679 Err(e) => return Err(e),
680 }
681 }
682
683 Ok(CloseGroupOutcome {
684 chunk: None,
685 queried,
686 not_found,
687 timeout,
688 network_err,
689 protocol_err,
690 })
691 }
692
693 pub async fn chunk_get_from_close_group(
703 &self,
704 address: &XorName,
705 ) -> Result<Vec<ChunkPeerGetResult>> {
706 self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
707 .await
708 }
709
710 pub async fn chunk_get_from_closest_peer_group(
721 &self,
722 address: &XorName,
723 peer_count: usize,
724 ) -> Result<Vec<ChunkPeerGetResult>> {
725 let peers = self.closest_peers(address, peer_count).await?;
726 let targets = chunk_peer_get_targets(peers, address);
727 let concurrency_limit =
728 diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
729 let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
730 let overall_timeout =
731 diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
732
733 let mut completed = vec![false; targets.len()];
734 let mut results = Vec::with_capacity(targets.len());
735 let mut get_results = stream::iter(targets.iter().cloned())
736 .map(|target| async move {
737 let chunk_result = self
738 .chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
739 .await;
740
741 if let Ok(Some(chunk)) = &chunk_result {
742 self.chunk_cache().put(chunk.address, chunk.content.clone());
743 }
744
745 (
746 target.index,
747 ChunkPeerGetResult {
748 peer_id: target.peer_id,
749 peer_addrs: target.peer_addrs,
750 xor_distance: target.xor_distance,
751 chunk_result,
752 },
753 )
754 })
755 .buffer_unordered(concurrency_limit);
756
757 let collect_results = async {
758 while let Some((index, result)) = get_results.next().await {
759 completed[index] = true;
760 results.push(result);
761 }
762 };
763
764 if tokio::time::timeout(overall_timeout, collect_results)
765 .await
766 .is_err()
767 {
768 for target in &targets {
769 if !completed[target.index] {
770 results.push(timed_out_chunk_peer_get_result(
771 target,
772 address,
773 overall_timeout,
774 ));
775 }
776 }
777 }
778
779 sort_chunk_peer_get_results(&mut results);
780 Ok(results)
781 }
782
783 async fn chunk_get_from_peer(
785 &self,
786 address: &XorName,
787 peer: &PeerId,
788 peer_addrs: &[MultiAddr],
789 ) -> Result<Option<DataChunk>> {
790 let node = self.network().node();
791 let request_id = self.next_request_id();
792 let request = ChunkGetRequest::new(*address);
793 let message = ChunkMessage {
794 request_id,
795 body: ChunkMessageBody::GetRequest(request),
796 };
797 let message_bytes = message
798 .encode()
799 .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
800
801 let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
802 let addr_hex = hex::encode(address);
803 let timeout_secs = self.config().chunk_get_timeout_secs;
804
805 let result = send_and_await_chunk_response(
806 node,
807 peer,
808 message_bytes,
809 request_id,
810 timeout,
811 peer_addrs,
812 |body| match body {
813 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
814 address: addr,
815 content,
816 }) => {
817 if addr != *address {
818 return Some(Err(Error::InvalidData(format!(
819 "Mismatched chunk address: expected {addr_hex}, got {}",
820 hex::encode(addr)
821 ))));
822 }
823
824 let computed = compute_address(&content);
825 if computed != addr {
826 return Some(Err(Error::InvalidData(format!(
827 "Invalid chunk content: expected hash {addr_hex}, got {}",
828 hex::encode(computed)
829 ))));
830 }
831
832 debug!(
833 "Retrieved chunk {} ({} bytes) from peer {peer}",
834 hex::encode(addr),
835 content.len()
836 );
837 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
838 }
839 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
840 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
841 Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
842 )),
843 _ => None,
844 },
845 |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
846 || {
847 Error::Timeout(format!(
848 "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
849 ))
850 },
851 )
852 .await;
853
854 result
855 }
856
857 pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
863 self.chunk_get(address).await.map(|opt| opt.is_some())
864 }
865
866 pub async fn finalize_chunk(
884 &self,
885 prepared: PreparedChunk,
886 tx_hash_map: &HashMap<QuoteHash, TxHash>,
887 ) -> Result<XorName> {
888 let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
889 let chunk = paid.pop().ok_or_else(|| {
893 Error::Payment(
894 "finalize_batch_payment returned no paid chunks for a single \
895 prepared chunk — internal invariant violated"
896 .into(),
897 )
898 })?;
899 self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
900 .await
901 }
902}
903
904#[cfg(test)]
905mod tests {
906 use super::*;
907 use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
908
909 const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
911 const UNKNOWN_PROOF_TAG: u8 = 0xff;
913 const TEST_XORNAME_BYTE_LEN: usize = 32;
915 const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
917
918 fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
919 let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
920 xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
921
922 ChunkPeerGetResult {
923 peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
924 peer_addrs: Vec::new(),
925 xor_distance,
926 chunk_result: Ok(None),
927 }
928 }
929
930 #[test]
931 fn authoritative_not_found_requires_unanimous_well_sampled_response() {
932 assert!(is_authoritative_not_found(7, 7));
935 assert!(is_authoritative_not_found(
938 CLOSE_GROUP_MAJORITY,
939 CLOSE_GROUP_MAJORITY
940 ));
941
942 assert!(!is_authoritative_not_found(1, 1));
947 assert!(!is_authoritative_not_found(3, 3));
948 assert!(!is_authoritative_not_found(
949 CLOSE_GROUP_MAJORITY - 1,
950 CLOSE_GROUP_MAJORITY - 1
951 ));
952
953 assert!(!is_authoritative_not_found(4, 7));
956 assert!(!is_authoritative_not_found(6, 7));
957
958 assert!(!is_authoritative_not_found(0, 7));
960
961 assert!(!is_authoritative_not_found(0, 0));
964 }
965
966 #[test]
967 fn chunk_get_outcome_classifies_each_result_kind() {
968 let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
971 assert_eq!(
972 chunk_get_outcome(&Ok(Some(chunk))),
973 Outcome::Success,
974 "found-chunk must be Success",
975 );
976
977 assert_eq!(
982 chunk_get_outcome(&Ok(None)),
983 Outcome::Timeout,
984 "Ok(None) must be Timeout — that's the controller's load-shedding signal",
985 );
986
987 assert_eq!(
989 chunk_get_outcome(&Err(Error::Timeout("t".into()))),
990 Outcome::Timeout,
991 );
992 assert_eq!(
993 chunk_get_outcome(&Err(Error::Network("n".into()))),
994 Outcome::NetworkError,
995 );
996
997 assert_eq!(
1000 chunk_get_outcome(&Err(Error::Protocol("p".into()))),
1001 Outcome::ApplicationError,
1002 );
1003 }
1004
1005 #[test]
1006 fn single_node_proof_uses_store_response_timeout() {
1007 let timeout =
1008 store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
1009
1010 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1011 }
1012
1013 #[test]
1014 fn unknown_proof_uses_store_response_timeout() {
1015 let timeout =
1016 store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
1017
1018 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
1019 }
1020
1021 #[test]
1022 fn merkle_proof_uses_configured_store_timeout() {
1023 let timeout =
1024 store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
1025
1026 assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
1027 }
1028
1029 #[test]
1030 fn chunk_peer_get_results_sort_by_xor_distance() {
1031 let mut results = vec![
1032 chunk_peer_get_result(3, 3),
1033 chunk_peer_get_result(1, 1),
1034 chunk_peer_get_result(2, 2),
1035 ];
1036
1037 sort_chunk_peer_get_results(&mut results);
1038
1039 let ordered_distances = results
1040 .iter()
1041 .map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
1042 .collect::<Vec<_>>();
1043 assert_eq!(ordered_distances, vec![1, 2, 3]);
1044 }
1045
1046 #[test]
1047 fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
1048 const PER_PEER_TIMEOUT_SECS: u64 = 10;
1049 const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
1050 const TARGET_COUNT: usize = 7;
1051 const CONCURRENCY_LIMIT: usize = 7;
1052
1053 let timeout = diagnostic_peer_get_overall_timeout(
1054 Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1055 TARGET_COUNT,
1056 CONCURRENCY_LIMIT,
1057 );
1058
1059 assert_eq!(
1060 timeout,
1061 Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1062 );
1063 }
1064
1065 #[test]
1066 fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
1067 const PER_PEER_TIMEOUT_SECS: u64 = 10;
1068 const TARGET_COUNT: usize = 20;
1069 const CLOSE_GROUP_SIZE: usize = 7;
1070 const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
1071
1072 let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
1073 let timeout = diagnostic_peer_get_overall_timeout(
1074 Duration::from_secs(PER_PEER_TIMEOUT_SECS),
1075 TARGET_COUNT,
1076 concurrency_limit,
1077 );
1078
1079 assert_eq!(
1080 timeout,
1081 Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
1082 );
1083 }
1084
1085 #[test]
1092 fn default_merkle_store_timeout_satisfies_storer_invariant() {
1093 use crate::data::client::ClientConfig;
1094 const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
1095 const MIN_PADDING_SECS: u64 = 30;
1096 let config = ClientConfig::default();
1097 assert!(
1098 config.merkle_store_timeout_secs
1099 >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
1100 "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
1101 config.merkle_store_timeout_secs,
1102 STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
1103 MIN_PADDING_SECS,
1104 );
1105 }
1106
1107 #[test]
1116 fn non_merkle_put_ignores_merkle_timeout_value() {
1117 let absurd_merkle_timeout = 9_999;
1118 for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
1119 let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
1120 assert_eq!(
1121 timeout, STORE_RESPONSE_TIMEOUT,
1122 "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
1123 );
1124 }
1125 }
1126}