1use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{
13 Amount, MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree,
14 MidpointProof, PoolCommitment, CANDIDATES_PER_POOL, MAX_LEAVES,
15};
16use ant_protocol::payment::{serialize_merkle_proof, verify_merkle_candidate_signature};
17use ant_protocol::transport::PeerId;
18use ant_protocol::{
19 compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
20 MerkleCandidateQuoteRequest, MerkleCandidateQuoteResponse,
21};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use rand::Rng;
25use std::collections::{HashMap, VecDeque};
26use std::time::Duration;
27use tokio::sync::mpsc;
28use tracing::{debug, info, warn};
29use xor_name::XorName;
30
31pub const DEFAULT_MERKLE_THRESHOLD: usize = 64;
33
34#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum PaymentMode {
38 #[default]
40 Auto,
41 Merkle,
43 Single,
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct MerkleBatchPaymentResult {
53 pub proofs: HashMap<[u8; 32], Vec<u8>>,
55 pub chunk_count: usize,
57 pub storage_cost_atto: String,
59 pub gas_cost_wei: u128,
61 #[serde(default)]
66 pub merkle_payment_timestamp: u64,
67}
68
69pub struct PreparedMerkleBatch {
74 pub depth: u8,
76 pub pool_commitments: Vec<PoolCommitment>,
78 pub merkle_payment_timestamp: u64,
80 candidate_pools: Vec<MerklePaymentCandidatePool>,
82 tree: MerkleTree,
84 addresses: Vec<[u8; 32]>,
86}
87
88#[derive(Debug, Clone, Default)]
90pub(crate) struct MerkleUploadPlan {
91 pub already_stored: Vec<[u8; 32]>,
93 pub to_upload: Vec<[u8; 32]>,
95 to_upload_total_bytes: u64,
97}
98
99impl MerkleUploadPlan {
100 #[must_use]
102 pub fn to_upload_avg_size(&self) -> u64 {
103 if self.to_upload.is_empty() {
104 return 0;
105 }
106
107 self.to_upload_total_bytes / self.to_upload.len() as u64
108 }
109}
110
111impl std::fmt::Debug for PreparedMerkleBatch {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("PreparedMerkleBatch")
114 .field("depth", &self.depth)
115 .field("pool_commitments", &self.pool_commitments.len())
116 .field("merkle_payment_timestamp", &self.merkle_payment_timestamp)
117 .field("candidate_pools", &self.candidate_pools.len())
118 .field("addresses", &self.addresses.len())
119 .finish()
120 }
121}
122
123pub(crate) fn chunk_contents_for_upload_addresses(
128 chunk_contents: Vec<Bytes>,
129 addresses: &[[u8; 32]],
130) -> Result<Vec<Bytes>> {
131 if addresses.is_empty() {
132 return Ok(Vec::new());
133 }
134
135 let mut needed_by_address: HashMap<[u8; 32], usize> = HashMap::new();
136 for address in addresses {
137 *needed_by_address.entry(*address).or_default() += 1;
138 }
139
140 let mut chunks_by_address: HashMap<[u8; 32], VecDeque<Bytes>> =
141 HashMap::with_capacity(needed_by_address.len());
142 let mut remaining = addresses.len();
143 for chunk in chunk_contents {
144 let address = compute_address(&chunk);
145 if let Some(needed) = needed_by_address.get_mut(&address) {
146 if *needed > 0 {
147 chunks_by_address
148 .entry(address)
149 .or_default()
150 .push_back(chunk);
151 *needed -= 1;
152 remaining -= 1;
153 if remaining == 0 {
154 break;
155 }
156 }
157 }
158 }
159
160 for (address, needed) in &needed_by_address {
161 if *needed == 0 {
162 continue;
163 }
164
165 if chunks_by_address.contains_key(address) {
166 return Err(Error::InvalidData(format!(
167 "missing duplicate chunk content for merkle address {}",
168 hex::encode(address)
169 )));
170 }
171
172 return Err(Error::InvalidData(format!(
173 "missing chunk content for merkle address {}",
174 hex::encode(address)
175 )));
176 }
177
178 let mut selected = Vec::with_capacity(addresses.len());
179 for address in addresses {
180 let chunks = chunks_by_address.get_mut(address).ok_or_else(|| {
181 Error::InvalidData(format!(
182 "missing chunk content for merkle address {}",
183 hex::encode(address)
184 ))
185 })?;
186 let chunk = chunks.pop_front().ok_or_else(|| {
187 Error::InvalidData(format!(
188 "missing duplicate chunk content for merkle address {}",
189 hex::encode(address)
190 ))
191 })?;
192 selected.push(chunk);
193 }
194
195 Ok(selected)
196}
197
198#[must_use]
201pub fn should_use_merkle(chunk_count: usize, mode: PaymentMode) -> bool {
202 match mode {
203 PaymentMode::Auto => chunk_count >= DEFAULT_MERKLE_THRESHOLD,
204 PaymentMode::Merkle => chunk_count >= 2,
205 PaymentMode::Single => false,
206 }
207}
208
209impl Client {
210 #[must_use]
212 pub fn should_use_merkle(&self, chunk_count: usize, mode: PaymentMode) -> bool {
213 should_use_merkle(chunk_count, mode)
214 }
215
216 pub async fn pay_for_merkle_batch(
230 &self,
231 addresses: &[[u8; 32]],
232 data_type: u32,
233 data_size: u64,
234 ) -> Result<MerkleBatchPaymentResult> {
235 let chunk_count = addresses.len();
236 if chunk_count < 2 {
237 return Err(Error::Payment(
238 "Merkle batch payment requires at least 2 chunks".to_string(),
239 ));
240 }
241
242 if chunk_count > MAX_LEAVES {
243 return self
244 .pay_for_merkle_multi_batch(addresses, data_type, data_size)
245 .await;
246 }
247
248 self.pay_for_merkle_single_batch(addresses, data_type, data_size)
249 .await
250 }
251
252 pub(crate) async fn plan_merkle_upload(
261 &self,
262 chunks: Vec<([u8; 32], u64)>,
263 data_type: u32,
264 progress: Option<&mpsc::Sender<UploadEvent>>,
265 ) -> Result<MerkleUploadPlan> {
266 let total_chunks = chunks.len();
267 if total_chunks == 0 {
268 return Ok(MerkleUploadPlan::default());
269 }
270
271 info!("Checking {total_chunks} merkle chunks for existing storage before payment");
272
273 let quote_limiter = self.controller().quote.clone();
274 let quote_concurrency = quote_limiter.current().min(total_chunks.max(1));
275 let mut check_stream = stream::iter(chunks.into_iter().enumerate())
276 .map(|(index, (address, data_size))| {
277 let limiter = quote_limiter.clone();
278 async move {
279 let result = observe_op(
280 &limiter,
281 || async move {
282 self.chunk_already_stored_for_merkle(&address, data_type, data_size)
283 .await
284 },
285 classify_error,
286 )
287 .await;
288 (index, address, data_size, result)
289 }
290 })
291 .buffer_unordered(quote_concurrency);
292
293 let mut already_stored: Vec<(usize, [u8; 32])> = Vec::new();
294 let mut to_upload: Vec<(usize, [u8; 32], u64)> = Vec::new();
295 let mut checked = 0usize;
296
297 while let Some((index, address, data_size, result)) = check_stream.next().await {
298 let is_already_stored = result?;
299 checked += 1;
300
301 if let Some(tx) = progress {
302 let _ = tx.try_send(UploadEvent::ChunkQuoted {
303 quoted: checked,
304 total: total_chunks,
305 });
306 }
307
308 if is_already_stored {
309 debug!(
310 "Merkle preflight {checked}/{total_chunks}: chunk {} already stored",
311 hex::encode(address)
312 );
313 already_stored.push((index, address));
314 if let Some(tx) = progress {
315 let _ = tx.try_send(UploadEvent::ChunkStored {
316 stored: already_stored.len(),
317 total: total_chunks,
318 });
319 }
320 } else {
321 debug!(
322 "Merkle preflight {checked}/{total_chunks}: chunk {} needs upload",
323 hex::encode(address)
324 );
325 to_upload.push((index, address, data_size));
326 }
327 }
328
329 already_stored.sort_by_key(|(index, _)| *index);
330 to_upload.sort_by_key(|(index, _, _)| *index);
331
332 let to_upload_total_bytes = to_upload.iter().fold(0u64, |acc, (_, _, data_size)| {
333 acc.saturating_add(*data_size)
334 });
335
336 let already_stored = already_stored
337 .into_iter()
338 .map(|(_, address)| address)
339 .collect::<Vec<_>>();
340 let to_upload = to_upload
341 .into_iter()
342 .map(|(_, address, _)| address)
343 .collect::<Vec<_>>();
344
345 info!(
346 "Merkle preflight complete: {} already stored, {} need upload",
347 already_stored.len(),
348 to_upload.len()
349 );
350
351 Ok(MerkleUploadPlan {
352 already_stored,
353 to_upload,
354 to_upload_total_bytes,
355 })
356 }
357
358 async fn chunk_already_stored_for_merkle(
359 &self,
360 address: &[u8; 32],
361 data_type: u32,
362 data_size: u64,
363 ) -> Result<bool> {
364 match self.get_store_quotes(address, data_size, data_type).await {
365 Ok(_) => Ok(false),
366 Err(Error::AlreadyStored) => Ok(true),
367 Err(e) => Err(e),
368 }
369 }
370
371 pub async fn prepare_merkle_batch_external(
377 &self,
378 addresses: &[[u8; 32]],
379 data_type: u32,
380 data_size: u64,
381 ) -> Result<PreparedMerkleBatch> {
382 let chunk_count = addresses.len();
383 let xornames: Vec<XorName> = addresses.iter().map(|a| XorName(*a)).collect();
384
385 debug!("Building merkle tree for {chunk_count} chunks");
386
387 let tree = MerkleTree::from_xornames(xornames)
389 .map_err(|e| Error::Payment(format!("Failed to build merkle tree: {e}")))?;
390
391 let depth = tree.depth();
392 let merkle_payment_timestamp = std::time::SystemTime::now()
393 .duration_since(std::time::UNIX_EPOCH)
394 .map_err(|e| Error::Payment(format!("System time error: {e}")))?
395 .as_secs();
396
397 debug!("Merkle tree: depth={depth}, leaves={chunk_count}, ts={merkle_payment_timestamp}");
398
399 let midpoint_proofs = tree
401 .reward_candidates(merkle_payment_timestamp)
402 .map_err(|e| Error::Payment(format!("Failed to generate reward candidates: {e}")))?;
403
404 debug!(
405 "Collecting candidate pools from {} midpoints (concurrent)",
406 midpoint_proofs.len()
407 );
408
409 let candidate_pools = self
411 .build_candidate_pools(
412 &midpoint_proofs,
413 data_type,
414 data_size,
415 merkle_payment_timestamp,
416 )
417 .await?;
418
419 let pool_commitments: Vec<PoolCommitment> = candidate_pools
421 .iter()
422 .map(MerklePaymentCandidatePool::to_commitment)
423 .collect();
424
425 Ok(PreparedMerkleBatch {
426 depth,
427 pool_commitments,
428 merkle_payment_timestamp,
429 candidate_pools,
430 tree,
431 addresses: addresses.to_vec(),
432 })
433 }
434
435 async fn pay_for_merkle_single_batch(
437 &self,
438 addresses: &[[u8; 32]],
439 data_type: u32,
440 data_size: u64,
441 ) -> Result<MerkleBatchPaymentResult> {
442 let wallet = self.require_wallet()?;
443 let prepared = self
444 .prepare_merkle_batch_external(addresses, data_type, data_size)
445 .await?;
446
447 info!(
448 "Submitting merkle batch payment on-chain (depth={})",
449 prepared.depth
450 );
451 let (winner_pool_hash, amount, gas_info) = wallet
452 .pay_for_merkle_tree(
453 prepared.depth,
454 prepared.pool_commitments.clone(),
455 prepared.merkle_payment_timestamp,
456 )
457 .await
458 .map_err(|e| Error::Payment(format!("Merkle batch payment failed: {e}")))?;
459
460 info!(
461 "Merkle payment succeeded: winner pool {}",
462 hex::encode(winner_pool_hash)
463 );
464
465 let mut result = finalize_merkle_batch(prepared, winner_pool_hash)?;
466 result.storage_cost_atto = amount.to_string();
467 result.gas_cost_wei = gas_info.gas_cost_wei;
468 Ok(result)
469 }
470
471 async fn pay_for_merkle_multi_batch(
473 &self,
474 addresses: &[[u8; 32]],
475 data_type: u32,
476 data_size: u64,
477 ) -> Result<MerkleBatchPaymentResult> {
478 let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect();
479 let total_sub_batches = sub_batches.len();
480 let mut all_proofs = HashMap::with_capacity(addresses.len());
481 let mut total_storage = Amount::ZERO;
482 let mut total_gas: u128 = 0;
483 let mut oldest_ts: u64 = 0;
487
488 for (i, chunk) in sub_batches.into_iter().enumerate() {
489 match self
490 .pay_for_merkle_single_batch(chunk, data_type, data_size)
491 .await
492 {
493 Ok(sub_result) => {
494 if let Ok(cost) = sub_result.storage_cost_atto.parse::<Amount>() {
495 total_storage += cost;
496 }
497 total_gas = total_gas.saturating_add(sub_result.gas_cost_wei);
498 if oldest_ts == 0
499 || (sub_result.merkle_payment_timestamp > 0
500 && sub_result.merkle_payment_timestamp < oldest_ts)
501 {
502 oldest_ts = sub_result.merkle_payment_timestamp;
503 }
504 all_proofs.extend(sub_result.proofs);
505 }
506 Err(e) => {
507 if all_proofs.is_empty() {
508 return Err(e);
510 }
511 warn!(
513 "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \
514 Returning {} proofs from prior sub-batches",
515 i + 1,
516 all_proofs.len()
517 );
518 return Ok(MerkleBatchPaymentResult {
519 chunk_count: all_proofs.len(),
520 proofs: all_proofs,
521 storage_cost_atto: total_storage.to_string(),
522 gas_cost_wei: total_gas,
523 merkle_payment_timestamp: oldest_ts,
524 });
525 }
526 }
527 }
528
529 Ok(MerkleBatchPaymentResult {
530 chunk_count: addresses.len(),
531 proofs: all_proofs,
532 storage_cost_atto: total_storage.to_string(),
533 gas_cost_wei: total_gas,
534 merkle_payment_timestamp: oldest_ts,
535 })
536 }
537
538 async fn build_candidate_pools(
540 &self,
541 midpoint_proofs: &[MidpointProof],
542 data_type: u32,
543 data_size: u64,
544 merkle_payment_timestamp: u64,
545 ) -> Result<Vec<MerklePaymentCandidatePool>> {
546 let mut pool_futures = FuturesUnordered::new();
547
548 for midpoint_proof in midpoint_proofs {
549 let pool_address = midpoint_proof.address();
550 let mp = midpoint_proof.clone();
551 pool_futures.push(async move {
552 let candidate_nodes = self
553 .get_merkle_candidate_pool(
554 &pool_address.0,
555 data_type,
556 data_size,
557 merkle_payment_timestamp,
558 )
559 .await?;
560 Ok::<_, Error>(MerklePaymentCandidatePool {
561 midpoint_proof: mp,
562 candidate_nodes,
563 })
564 });
565 }
566
567 let mut pools = Vec::with_capacity(midpoint_proofs.len());
568 while let Some(result) = pool_futures.next().await {
569 pools.push(result?);
570 }
571
572 Ok(pools)
573 }
574
575 #[allow(clippy::too_many_lines)]
577 async fn get_merkle_candidate_pool(
578 &self,
579 address: &[u8; 32],
580 data_type: u32,
581 data_size: u64,
582 merkle_payment_timestamp: u64,
583 ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
584 let node = self.network().node();
585 let timeout = Duration::from_secs(self.config().quote_timeout_secs);
586
587 let query_count = CANDIDATES_PER_POOL * 2;
589 let mut remote_peers = self
590 .network()
591 .find_closest_peers(address, query_count)
592 .await?;
593
594 if remote_peers.len() < CANDIDATES_PER_POOL {
598 let connected = self.network().connected_peers().await;
599 for peer in connected {
600 if !remote_peers.iter().any(|(id, _)| *id == peer) {
601 remote_peers.push((peer, vec![]));
602 }
603 }
604 }
605
606 if remote_peers.len() < CANDIDATES_PER_POOL {
607 return Err(Error::InsufficientPeers(format!(
608 "Found {} peers, need {CANDIDATES_PER_POOL} for merkle candidate pool. \
609 Use --no-merkle or a larger network.",
610 remote_peers.len()
611 )));
612 }
613
614 let mut candidate_futures = FuturesUnordered::new();
615
616 for (peer_id, peer_addrs) in &remote_peers {
617 let request_id = self.next_request_id();
618 let request = MerkleCandidateQuoteRequest {
619 address: *address,
620 data_type,
621 data_size,
622 merkle_payment_timestamp,
623 };
624 let message = ChunkMessage {
625 request_id,
626 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
627 };
628
629 let message_bytes = match message.encode() {
630 Ok(bytes) => bytes,
631 Err(e) => {
632 warn!("Failed to encode merkle candidate request for {peer_id}: {e}");
633 continue;
634 }
635 };
636
637 let peer_id_clone = *peer_id;
638 let addrs_clone = peer_addrs.clone();
639 let node_clone = node.clone();
640
641 let fut = async move {
642 let result = send_and_await_chunk_response(
643 &node_clone,
644 &peer_id_clone,
645 message_bytes,
646 request_id,
647 timeout,
648 &addrs_clone,
649 |body| match body {
650 ChunkMessageBody::MerkleCandidateQuoteResponse(
651 MerkleCandidateQuoteResponse::Success { candidate_node },
652 ) => {
653 match rmp_serde::from_slice::<MerklePaymentCandidateNode>(
654 &candidate_node,
655 ) {
656 Ok(node) => Some(Ok(node)),
657 Err(e) => Some(Err(Error::Serialization(format!(
658 "Failed to deserialize candidate node from {peer_id_clone}: {e}"
659 )))),
660 }
661 }
662 ChunkMessageBody::MerkleCandidateQuoteResponse(
663 MerkleCandidateQuoteResponse::Error(e),
664 ) => Some(Err(Error::Protocol(format!(
665 "Merkle quote error from {peer_id_clone}: {e}"
666 )))),
667 _ => None,
668 },
669 |e| {
670 Error::Network(format!(
671 "Failed to send merkle candidate request to {peer_id_clone}: {e}"
672 ))
673 },
674 || {
675 Error::Timeout(format!(
676 "Timeout waiting for merkle candidate from {peer_id_clone}"
677 ))
678 },
679 )
680 .await;
681
682 (peer_id_clone, result)
683 };
684
685 candidate_futures.push(fut);
686 }
687
688 self.collect_validated_candidates(&mut candidate_futures, address, merkle_payment_timestamp)
689 .await
690 }
691
692 async fn collect_validated_candidates(
703 &self,
704 futures: &mut FuturesUnordered<
705 impl std::future::Future<
706 Output = (
707 PeerId,
708 std::result::Result<MerklePaymentCandidateNode, Error>,
709 ),
710 >,
711 >,
712 target_address: &[u8; 32],
713 merkle_payment_timestamp: u64,
714 ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
715 let mut valid: Vec<(PeerId, MerklePaymentCandidateNode)> = Vec::new();
716 let mut failures: Vec<String> = Vec::new();
717
718 while let Some((peer_id, result)) = futures.next().await {
719 match result {
720 Ok(candidate) => {
721 if !verify_merkle_candidate_signature(&candidate) {
722 warn!("Invalid ML-DSA-65 signature from merkle candidate {peer_id}");
723 failures.push(format!("{peer_id}: invalid signature"));
724 continue;
725 }
726 if candidate.merkle_payment_timestamp != merkle_payment_timestamp {
727 warn!("Timestamp mismatch from merkle candidate {peer_id}");
728 failures.push(format!("{peer_id}: timestamp mismatch"));
729 continue;
730 }
731 valid.push((peer_id, candidate));
732 }
733 Err(e) => {
734 debug!("Failed to get merkle candidate from {peer_id}: {e}");
735 failures.push(format!("{peer_id}: {e}"));
736 }
737 }
738 }
739
740 if valid.len() < CANDIDATES_PER_POOL {
741 return Err(Error::InsufficientPeers(format!(
742 "Got {} merkle candidates, need {CANDIDATES_PER_POOL}. Failures: [{}]",
743 valid.len(),
744 failures.join("; ")
745 )));
746 }
747
748 let target_peer = PeerId::from_bytes(*target_address);
749 valid.sort_by_key(|(peer_id, _)| peer_id.xor_distance(&target_peer));
750
751 let candidates: Vec<MerklePaymentCandidateNode> = valid
752 .into_iter()
753 .take(CANDIDATES_PER_POOL)
754 .map(|(_, candidate)| candidate)
755 .collect();
756
757 candidates
758 .try_into()
759 .map_err(|_| Error::Payment("Failed to convert candidates to fixed array".to_string()))
760 }
761
762 pub(crate) async fn merkle_upload_chunks(
783 &self,
784 chunk_contents: Vec<Bytes>,
785 addresses: Vec<[u8; 32]>,
786 batch_result: &MerkleBatchPaymentResult,
787 progress: Option<&mpsc::Sender<UploadEvent>>,
788 stored_offset: usize,
789 total_chunks: usize,
790 ) -> Result<MerkleStoreOutcome> {
791 let store_limiter = self.controller().store.clone();
792 let batch_size = chunk_contents.len();
795 if batch_size != addresses.len() {
796 return Err(Error::InvalidData(format!(
797 "merkle upload has {batch_size} chunk contents but {} addresses",
798 addresses.len()
799 )));
800 }
801 let store_concurrency = store_limiter.current().min(batch_size.max(1));
802
803 let chunks: Vec<([u8; 32], Bytes)> = addresses.into_iter().zip(chunk_contents).collect();
804
805 let store_one = |addr: [u8; 32], content: Bytes| {
810 let limiter = store_limiter.clone();
811 let proof_bytes = batch_result.proofs.get(&addr).cloned();
812 async move {
813 let started = std::time::Instant::now();
814 let proof = proof_bytes.ok_or_else(|| {
815 Error::Payment(format!(
816 "Missing merkle proof for chunk {}",
817 hex::encode(addr)
818 ))
819 })?;
820 let peers = self.close_group_peers(&addr).await?;
821 observe_op(
822 &limiter,
823 || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
824 classify_error,
825 )
826 .await
827 .map(|_| started)
828 }
829 };
830
831 merkle_store_with_retry(
832 chunks,
833 store_concurrency,
834 MERKLE_STORE_MAX_ATTEMPTS,
835 MERKLE_RETRY_BACKOFF,
836 progress,
837 stored_offset,
838 total_chunks,
839 store_one,
840 )
841 .await
842 }
843}
844
845const MERKLE_STORE_MAX_ATTEMPTS: usize = 4;
857
858const MERKLE_RETRY_BACKOFF: Duration = Duration::from_secs(30);
864
865const MERKLE_RETRY_JITTER: f64 = 0.1;
868
869#[derive(Debug, Default)]
872pub(crate) struct MerkleStoreOutcome {
873 pub stored: usize,
876 pub failed: usize,
878 pub stats: crate::data::client::batch::WaveAggregateStats,
880}
881
882#[allow(clippy::too_many_arguments)]
894async fn merkle_store_with_retry<F, Fut>(
895 chunks: Vec<([u8; 32], Bytes)>,
896 store_concurrency: usize,
897 max_attempts: usize,
898 backoff: Duration,
899 progress: Option<&mpsc::Sender<UploadEvent>>,
900 stored_offset: usize,
901 total: usize,
902 store_one: F,
903) -> Result<MerkleStoreOutcome>
904where
905 F: Fn([u8; 32], Bytes) -> Fut,
906 Fut: std::future::Future<Output = Result<std::time::Instant>>,
907{
908 let attempts = max_attempts.max(1);
909 let mut outcome = MerkleStoreOutcome {
910 stored: stored_offset,
911 ..MerkleStoreOutcome::default()
912 };
913 let mut pending = chunks;
914
915 for attempt in 0..attempts {
916 let concurrency = store_concurrency.min(pending.len().max(1)).max(1);
917 let mut next_failed: Vec<([u8; 32], Bytes)> = Vec::new();
918
919 let mut upload_stream = stream::iter(pending.into_iter().map(|(addr, content)| {
920 let fut = store_one(addr, content.clone());
921 async move { (addr, content, fut.await) }
922 }))
923 .buffer_unordered(concurrency);
924
925 while let Some((addr, content, result)) = upload_stream.next().await {
926 outcome.stats.chunk_attempts_total =
927 outcome.stats.chunk_attempts_total.saturating_add(1);
928 match result {
929 Ok(started) => {
930 let duration_ms =
931 u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
932 outcome.stats.store_durations_ms.push(duration_ms);
933 let idx = attempt.min(outcome.stats.retries_histogram.len().saturating_sub(1));
934 outcome.stats.retries_histogram[idx] =
935 outcome.stats.retries_histogram[idx].saturating_add(1);
936 outcome.stored += 1;
937 if let Some(tx) = progress {
938 let _ = tx.try_send(UploadEvent::ChunkStored {
939 stored: outcome.stored,
940 total,
941 });
942 }
943 }
944 Err(Error::InsufficientPeers(_)) => next_failed.push((addr, content)),
945 Err(e) => return Err(e),
946 }
947 }
948
949 if next_failed.is_empty() {
950 break;
951 }
952
953 if attempt + 1 < attempts {
954 warn!(
955 failed = next_failed.len(),
956 attempt = attempt + 1,
957 "merkle chunks short of quorum, retrying after backoff"
958 );
959 pending = next_failed;
960 if backoff > Duration::ZERO {
961 let wait = {
966 let mut rng = rand::thread_rng();
967 let factor = 1.0 + rng.gen_range(-MERKLE_RETRY_JITTER..=MERKLE_RETRY_JITTER);
968 backoff.mul_f64(factor)
969 };
970 tokio::time::sleep(wait).await;
971 }
972 } else {
973 outcome.failed = next_failed.len();
974 break;
975 }
976 }
977
978 Ok(outcome)
979}
980
981pub fn finalize_merkle_batch(
986 prepared: PreparedMerkleBatch,
987 winner_pool_hash: [u8; 32],
988) -> Result<MerkleBatchPaymentResult> {
989 let chunk_count = prepared.addresses.len();
990 let xornames: Vec<XorName> = prepared.addresses.iter().map(|a| XorName(*a)).collect();
991
992 let winner_pool = prepared
994 .candidate_pools
995 .iter()
996 .find(|pool| pool.hash() == winner_pool_hash)
997 .ok_or_else(|| {
998 Error::Payment(format!(
999 "Winner pool {} not found in candidate pools",
1000 hex::encode(winner_pool_hash)
1001 ))
1002 })?;
1003
1004 info!("Generating merkle proofs for {chunk_count} chunks");
1006 let mut proofs = HashMap::with_capacity(chunk_count);
1007
1008 for (i, xorname) in xornames.iter().enumerate() {
1009 let address_proof = prepared
1010 .tree
1011 .generate_address_proof(i, *xorname)
1012 .map_err(|e| {
1013 Error::Payment(format!(
1014 "Failed to generate address proof for chunk {i}: {e}"
1015 ))
1016 })?;
1017
1018 let merkle_proof = MerklePaymentProof::new(*xorname, address_proof, winner_pool.clone());
1019
1020 let tagged_bytes = serialize_merkle_proof(&merkle_proof)
1021 .map_err(|e| Error::Serialization(format!("Failed to serialize merkle proof: {e}")))?;
1022
1023 proofs.insert(prepared.addresses[i], tagged_bytes);
1024 }
1025
1026 info!("Merkle batch payment complete: {chunk_count} proofs generated");
1027
1028 Ok(MerkleBatchPaymentResult {
1029 proofs,
1030 chunk_count,
1031 storage_cost_atto: "0".to_string(),
1032 gas_cost_wei: 0,
1033 merkle_payment_timestamp: prepared.merkle_payment_timestamp,
1034 })
1035}
1036
1037#[cfg(test)]
1039mod send_assertions {
1040 use super::*;
1041 use crate::data::client::Client;
1042
1043 fn _assert_send<T: Send>(_: &T) {}
1044
1045 #[allow(
1046 dead_code,
1047 unreachable_code,
1048 unused_variables,
1049 clippy::diverging_sub_expression
1050 )]
1051 async fn _merkle_upload_chunks_is_send(client: &Client) {
1052 let batch_result: MerkleBatchPaymentResult = todo!();
1053 let fut = client.merkle_upload_chunks(Vec::new(), Vec::new(), &batch_result, None, 0, 0);
1054 _assert_send(&fut);
1055 }
1056}
1057
1058#[cfg(test)]
1059#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1060mod tests {
1061 use super::*;
1062 use ant_protocol::evm::{Amount, MerkleTree, RewardsAddress, CANDIDATES_PER_POOL};
1063
1064 #[test]
1069 fn test_auto_below_threshold() {
1070 assert!(!should_use_merkle(1, PaymentMode::Auto));
1071 assert!(!should_use_merkle(10, PaymentMode::Auto));
1072 assert!(!should_use_merkle(63, PaymentMode::Auto));
1073 }
1074
1075 #[test]
1076 fn test_auto_at_and_above_threshold() {
1077 assert!(should_use_merkle(64, PaymentMode::Auto));
1078 assert!(should_use_merkle(65, PaymentMode::Auto));
1079 assert!(should_use_merkle(1000, PaymentMode::Auto));
1080 }
1081
1082 #[test]
1083 fn test_merkle_mode_forces_at_2() {
1084 assert!(!should_use_merkle(1, PaymentMode::Merkle));
1085 assert!(should_use_merkle(2, PaymentMode::Merkle));
1086 assert!(should_use_merkle(3, PaymentMode::Merkle));
1087 }
1088
1089 #[test]
1090 fn test_single_mode_always_false() {
1091 assert!(!should_use_merkle(0, PaymentMode::Single));
1092 assert!(!should_use_merkle(64, PaymentMode::Single));
1093 assert!(!should_use_merkle(1000, PaymentMode::Single));
1094 }
1095
1096 #[test]
1097 fn test_default_mode_is_auto() {
1098 assert_eq!(PaymentMode::default(), PaymentMode::Auto);
1099 }
1100
1101 #[test]
1102 fn test_threshold_value() {
1103 assert_eq!(DEFAULT_MERKLE_THRESHOLD, 64);
1104 }
1105
1106 #[test]
1107 fn chunk_contents_for_upload_addresses_preserves_requested_order() {
1108 let first = Bytes::from_static(b"first");
1109 let second = Bytes::from_static(b"second");
1110 let first_addr = compute_address(&first);
1111 let second_addr = compute_address(&second);
1112
1113 let selected = chunk_contents_for_upload_addresses(
1114 vec![first.clone(), second.clone()],
1115 &[second_addr, first_addr],
1116 )
1117 .unwrap();
1118
1119 assert_eq!(selected, vec![second, first]);
1120 }
1121
1122 #[test]
1123 fn chunk_contents_for_upload_addresses_preserves_duplicate_requests() {
1124 let repeated = Bytes::from_static(b"same-content");
1125 let other = Bytes::from_static(b"other-content");
1126 let repeated_addr = compute_address(&repeated);
1127
1128 let selected = chunk_contents_for_upload_addresses(
1129 vec![repeated.clone(), other, repeated.clone()],
1130 &[repeated_addr, repeated_addr],
1131 )
1132 .unwrap();
1133
1134 assert_eq!(selected, vec![repeated.clone(), repeated]);
1135 }
1136
1137 #[test]
1138 fn chunk_contents_for_upload_addresses_ignores_unrequested_duplicates() {
1139 let requested = Bytes::from_static(b"requested-content");
1140 let unrequested = Bytes::from_static(b"unrequested-content");
1141 let requested_addr = compute_address(&requested);
1142
1143 let selected = chunk_contents_for_upload_addresses(
1144 vec![
1145 unrequested.clone(),
1146 requested.clone(),
1147 unrequested.clone(),
1148 unrequested,
1149 ],
1150 &[requested_addr],
1151 )
1152 .unwrap();
1153
1154 assert_eq!(selected, vec![requested]);
1155 }
1156
1157 #[test]
1158 fn chunk_contents_for_upload_addresses_errors_for_missing_content() {
1159 let present = Bytes::from_static(b"present-content");
1160 let missing = Bytes::from_static(b"missing-content");
1161 let missing_addr = compute_address(&missing);
1162
1163 let result = chunk_contents_for_upload_addresses(vec![present], &[missing_addr]);
1164
1165 assert!(matches!(result, Err(Error::InvalidData(_))));
1166 }
1167
1168 fn make_test_addresses(count: usize) -> Vec<[u8; 32]> {
1173 (0..count)
1174 .map(|i| {
1175 let xn = XorName::from_content(&i.to_le_bytes());
1176 xn.0
1177 })
1178 .collect()
1179 }
1180
1181 #[test]
1182 fn test_tree_depth_for_known_sizes() {
1183 let cases = [(2, 1), (4, 2), (16, 4), (100, 7), (256, 8)];
1184 for (count, expected_depth) in cases {
1185 let addrs = make_test_addresses(count);
1186 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1187 let tree = MerkleTree::from_xornames(xornames).unwrap();
1188 assert_eq!(
1189 tree.depth(),
1190 expected_depth,
1191 "depth mismatch for {count} leaves"
1192 );
1193 }
1194 }
1195
1196 #[test]
1197 fn test_proof_generation_and_verification_for_all_leaves() {
1198 let addrs = make_test_addresses(16);
1199 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1200 let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1201
1202 for (i, xn) in xornames.iter().enumerate() {
1203 let proof = tree.generate_address_proof(i, *xn).unwrap();
1204 assert!(proof.verify(), "proof for leaf {i} should verify");
1205 assert_eq!(proof.depth(), tree.depth() as usize);
1206 }
1207 }
1208
1209 #[test]
1210 fn test_proof_fails_for_wrong_address() {
1211 let addrs = make_test_addresses(8);
1212 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1213 let tree = MerkleTree::from_xornames(xornames).unwrap();
1214
1215 let wrong = XorName::from_content(b"wrong");
1216 let proof = tree.generate_address_proof(0, wrong).unwrap();
1217 assert!(!proof.verify(), "proof with wrong address should fail");
1218 }
1219
1220 #[test]
1221 fn test_tree_too_few_leaves() {
1222 let xornames = vec![XorName::from_content(b"only_one")];
1223 let result = MerkleTree::from_xornames(xornames);
1224 assert!(result.is_err());
1225 }
1226
1227 #[test]
1228 fn test_tree_at_max_leaves() {
1229 let addrs = make_test_addresses(MAX_LEAVES);
1230 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1231 let tree = MerkleTree::from_xornames(xornames).unwrap();
1232 assert_eq!(tree.leaf_count(), MAX_LEAVES);
1233 }
1234
1235 #[test]
1240 fn test_merkle_proof_serialize_deserialize_roundtrip() {
1241 use ant_protocol::evm::{Amount, MerklePaymentCandidateNode, RewardsAddress};
1242 use ant_protocol::payment::{deserialize_merkle_proof, serialize_merkle_proof};
1243
1244 let addrs = make_test_addresses(4);
1245 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1246 let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1247
1248 let timestamp = std::time::SystemTime::now()
1249 .duration_since(std::time::UNIX_EPOCH)
1250 .unwrap()
1251 .as_secs();
1252
1253 let candidates = tree.reward_candidates(timestamp).unwrap();
1254 let midpoint = candidates.first().unwrap().clone();
1255
1256 #[allow(clippy::cast_possible_truncation)]
1258 let candidate_nodes: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
1259 std::array::from_fn(|i| MerklePaymentCandidateNode {
1260 pub_key: vec![i as u8; 32],
1261 price: Amount::from(1024u64),
1262 reward_address: RewardsAddress::new([i as u8; 20]),
1263 merkle_payment_timestamp: timestamp,
1264 signature: vec![i as u8; 64],
1265 });
1266
1267 let pool = MerklePaymentCandidatePool {
1268 midpoint_proof: midpoint,
1269 candidate_nodes,
1270 };
1271
1272 let address_proof = tree.generate_address_proof(0, xornames[0]).unwrap();
1273 let merkle_proof = MerklePaymentProof::new(xornames[0], address_proof, pool);
1274
1275 let tagged = serialize_merkle_proof(&merkle_proof).unwrap();
1276 assert_eq!(
1277 tagged.first().copied(),
1278 Some(0x02),
1279 "tag should be PROOF_TAG_MERKLE"
1280 );
1281
1282 let deserialized = deserialize_merkle_proof(&tagged).unwrap();
1283 assert_eq!(deserialized.address, merkle_proof.address);
1284 assert_eq!(
1285 deserialized.winner_pool.candidate_nodes.len(),
1286 CANDIDATES_PER_POOL
1287 );
1288 }
1289
1290 #[test]
1295 fn test_candidate_wrong_timestamp_rejected() {
1296 let candidate = MerklePaymentCandidateNode {
1298 pub_key: vec![0u8; 32],
1299 price: ant_protocol::evm::Amount::ZERO,
1300 reward_address: ant_protocol::evm::RewardsAddress::new([0u8; 20]),
1301 merkle_payment_timestamp: 1000,
1302 signature: vec![0u8; 64],
1303 };
1304
1305 assert_ne!(candidate.merkle_payment_timestamp, 2000);
1307 }
1308
1309 fn make_dummy_candidate_nodes(
1314 timestamp: u64,
1315 ) -> [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] {
1316 std::array::from_fn(|i| MerklePaymentCandidateNode {
1317 pub_key: vec![i as u8; 32],
1318 price: Amount::from(1024u64),
1319 reward_address: RewardsAddress::new([i as u8; 20]),
1320 merkle_payment_timestamp: timestamp,
1321 signature: vec![i as u8; 64],
1322 })
1323 }
1324
1325 fn make_prepared_merkle_batch(count: usize) -> PreparedMerkleBatch {
1326 let addrs = make_test_addresses(count);
1327 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1328 let tree = MerkleTree::from_xornames(xornames).unwrap();
1329
1330 let timestamp = std::time::SystemTime::now()
1331 .duration_since(std::time::UNIX_EPOCH)
1332 .unwrap()
1333 .as_secs();
1334
1335 let midpoints = tree.reward_candidates(timestamp).unwrap();
1336
1337 let candidate_pools: Vec<MerklePaymentCandidatePool> = midpoints
1338 .into_iter()
1339 .map(|mp| MerklePaymentCandidatePool {
1340 midpoint_proof: mp,
1341 candidate_nodes: make_dummy_candidate_nodes(timestamp),
1342 })
1343 .collect();
1344
1345 let pool_commitments = candidate_pools
1346 .iter()
1347 .map(MerklePaymentCandidatePool::to_commitment)
1348 .collect();
1349
1350 PreparedMerkleBatch {
1351 depth: tree.depth(),
1352 pool_commitments,
1353 merkle_payment_timestamp: timestamp,
1354 candidate_pools,
1355 tree,
1356 addresses: addrs,
1357 }
1358 }
1359
1360 #[test]
1361 fn test_finalize_merkle_batch_with_valid_winner() {
1362 let prepared = make_prepared_merkle_batch(4);
1363 let winner_hash = prepared.candidate_pools[0].hash();
1364
1365 let result = finalize_merkle_batch(prepared, winner_hash);
1366 assert!(
1367 result.is_ok(),
1368 "should succeed with valid winner: {result:?}"
1369 );
1370
1371 let batch = result.unwrap();
1372 assert_eq!(batch.chunk_count, 4);
1373 assert_eq!(batch.proofs.len(), 4);
1374
1375 for proof_bytes in batch.proofs.values() {
1377 assert!(!proof_bytes.is_empty());
1378 }
1379 }
1380
1381 #[test]
1382 fn test_finalize_merkle_batch_with_invalid_winner() {
1383 let prepared = make_prepared_merkle_batch(4);
1384 let bad_hash = [0xFF; 32];
1385
1386 let result = finalize_merkle_batch(prepared, bad_hash);
1387 assert!(result.is_err());
1388 let err = result.unwrap_err().to_string();
1389 assert!(err.contains("not found in candidate pools"), "got: {err}");
1390 }
1391
1392 #[test]
1393 fn test_finalize_merkle_batch_proofs_are_deserializable() {
1394 use ant_protocol::payment::deserialize_merkle_proof;
1395
1396 let prepared = make_prepared_merkle_batch(8);
1397 let winner_hash = prepared.candidate_pools[0].hash();
1398
1399 let batch = finalize_merkle_batch(prepared, winner_hash).unwrap();
1400
1401 for (addr, proof_bytes) in &batch.proofs {
1402 let proof = deserialize_merkle_proof(proof_bytes);
1403 assert!(
1404 proof.is_ok(),
1405 "proof for {} should deserialize: {:?}",
1406 hex::encode(addr),
1407 proof.err()
1408 );
1409 }
1410 }
1411
1412 #[test]
1417 fn test_batch_split_calculation() {
1418 let addrs = make_test_addresses(MAX_LEAVES);
1420 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 1);
1421
1422 let addrs = make_test_addresses(MAX_LEAVES + 1);
1424 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 2);
1425
1426 let addrs = make_test_addresses(3 * MAX_LEAVES);
1428 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 3);
1429 }
1430
1431 use std::sync::{Arc, Mutex};
1436
1437 fn make_chunks(count: usize) -> Vec<([u8; 32], Bytes)> {
1439 make_test_addresses(count)
1440 .into_iter()
1441 .map(|addr| (addr, Bytes::from_static(b"chunk")))
1442 .collect()
1443 }
1444
1445 #[tokio::test]
1449 async fn store_with_retry_collects_failures_instead_of_aborting() {
1450 let chunks = make_chunks(6);
1451 let failing: std::collections::HashSet<[u8; 32]> =
1452 chunks.iter().take(2).map(|(a, _)| *a).collect();
1453 let failing_for_closure = failing.clone();
1454
1455 let store_one = move |addr: [u8; 32], _content: Bytes| {
1456 let fail = failing_for_closure.contains(&addr);
1457 async move {
1458 if fail {
1459 Err(Error::InsufficientPeers("test shortfall".into()))
1460 } else {
1461 Ok(std::time::Instant::now())
1462 }
1463 }
1464 };
1465
1466 let outcome = merkle_store_with_retry(chunks, 8, 1, Duration::ZERO, None, 0, 6, store_one)
1467 .await
1468 .expect("quorum shortfalls must not abort the batch");
1469
1470 assert_eq!(outcome.stored, 4);
1471 assert_eq!(outcome.failed, 2);
1472 assert_eq!(outcome.stats.retries_histogram[0], 4);
1474 assert_eq!(outcome.stats.chunk_attempts_total, 6);
1475 }
1476
1477 #[tokio::test]
1479 async fn store_with_retry_propagates_non_quorum_errors() {
1480 let chunks = make_chunks(3);
1481 let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1482 Err::<std::time::Instant, _>(Error::Payment("missing proof".into()))
1483 };
1484
1485 let result =
1486 merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, 3, store_one).await;
1487 assert!(matches!(result, Err(Error::Payment(_))));
1488 }
1489
1490 #[tokio::test]
1492 async fn store_with_retry_retries_only_the_failed_set() {
1493 let chunks = make_chunks(5);
1494 let total = chunks.len();
1495 let failing: std::collections::HashSet<[u8; 32]> =
1496 chunks.iter().take(2).map(|(a, _)| *a).collect();
1497 let failing_for_closure = failing.clone();
1498
1499 let calls = Arc::new(Mutex::new(Vec::<[u8; 32]>::new()));
1501 let calls_for_closure = calls.clone();
1502
1503 let store_one = move |addr: [u8; 32], _content: Bytes| {
1504 let calls = calls_for_closure.clone();
1505 let already_seen = calls.lock().unwrap().iter().filter(|&&a| a == addr).count();
1507 let fail = failing_for_closure.contains(&addr) && already_seen == 0;
1508 calls.lock().unwrap().push(addr);
1509 async move {
1510 if fail {
1511 Err(Error::InsufficientPeers("round-1 shortfall".into()))
1512 } else {
1513 Ok(std::time::Instant::now())
1514 }
1515 }
1516 };
1517
1518 let outcome =
1519 merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1520 .await
1521 .expect("should converge after retry");
1522
1523 assert_eq!(outcome.stored, total);
1524 assert_eq!(outcome.failed, 0);
1525
1526 let calls = calls.lock().unwrap();
1530 assert_eq!(calls.len(), total + failing.len());
1531 let round_two: std::collections::HashSet<[u8; 32]> =
1532 calls[total..].iter().copied().collect();
1533 assert_eq!(round_two, failing);
1534 }
1535
1536 #[tokio::test]
1539 async fn store_with_retry_counts_retry_success_once_in_histogram() {
1540 let chunks = make_chunks(4);
1541 let total = chunks.len();
1542 let flaky_addr = chunks[0].0;
1543
1544 let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
1545 let attempts_for_closure = attempts.clone();
1546
1547 let store_one = move |addr: [u8; 32], _content: Bytes| {
1548 let attempts = attempts_for_closure.clone();
1549 let n = {
1550 let mut m = attempts.lock().unwrap();
1551 let entry = m.entry(addr).or_insert(0);
1552 *entry += 1;
1553 *entry
1554 };
1555 let fail = addr == flaky_addr && n == 1;
1556 async move {
1557 if fail {
1558 Err(Error::InsufficientPeers("transient".into()))
1559 } else {
1560 Ok(std::time::Instant::now())
1561 }
1562 }
1563 };
1564
1565 let outcome =
1566 merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1567 .await
1568 .expect("flaky chunk should recover on retry");
1569
1570 assert_eq!(outcome.stored, total);
1571 assert_eq!(outcome.failed, 0);
1572 assert_eq!(outcome.stats.retries_histogram[0], total - 1);
1574 assert_eq!(outcome.stats.retries_histogram[1], 1);
1575 assert_eq!(outcome.stats.chunk_attempts_total, total + 1);
1577 }
1578
1579 #[tokio::test]
1584 async fn store_with_retry_reports_all_failed_when_retries_exhausted() {
1585 let chunks = make_chunks(3);
1586 let total = chunks.len();
1587
1588 let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1589 Err::<std::time::Instant, _>(Error::InsufficientPeers("never converges".into()))
1590 };
1591
1592 let outcome = merkle_store_with_retry(
1593 chunks,
1594 8,
1595 MERKLE_STORE_MAX_ATTEMPTS,
1596 Duration::ZERO,
1597 None,
1598 0,
1599 total,
1600 store_one,
1601 )
1602 .await
1603 .expect("an exhausted retry budget is reported, not propagated as Err");
1604
1605 assert_eq!(outcome.stored, 0);
1606 assert_eq!(outcome.failed, total);
1607 assert_eq!(
1609 outcome.stats.chunk_attempts_total,
1610 total * MERKLE_STORE_MAX_ATTEMPTS
1611 );
1612 assert_eq!(outcome.stats.retries_histogram, [0; 4]);
1614 }
1615}