1use crate::data::client::adaptive::{observe_op, Outcome};
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{
13 Amount, MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree,
14 MidpointProof, PoolCommitment, CANDIDATES_PER_POOL, MAX_LEAVES,
15};
16use ant_protocol::payment::{serialize_merkle_proof, verify_merkle_candidate_signature};
17use ant_protocol::transport::PeerId;
18use ant_protocol::{
19 compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
20 MerkleCandidateQuoteRequest, MerkleCandidateQuoteResponse,
21};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use rand::Rng;
25use std::collections::{HashMap, VecDeque};
26use std::time::Duration;
27use tokio::sync::mpsc;
28use tracing::{debug, info, warn};
29use xor_name::XorName;
30
31pub const DEFAULT_MERKLE_THRESHOLD: usize = 64;
33
34#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum PaymentMode {
38 #[default]
40 Auto,
41 Merkle,
43 Single,
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct MerkleBatchPaymentResult {
53 pub proofs: HashMap<[u8; 32], Vec<u8>>,
55 pub chunk_count: usize,
57 pub storage_cost_atto: String,
59 pub gas_cost_wei: u128,
61 #[serde(default)]
66 pub merkle_payment_timestamp: u64,
67}
68
69pub struct PreparedMerkleBatch {
74 pub depth: u8,
76 pub pool_commitments: Vec<PoolCommitment>,
78 pub merkle_payment_timestamp: u64,
80 candidate_pools: Vec<MerklePaymentCandidatePool>,
82 tree: MerkleTree,
84 addresses: Vec<[u8; 32]>,
86}
87
88#[derive(Debug, Clone, Default)]
90pub(crate) struct MerkleUploadPlan {
91 pub already_stored: Vec<[u8; 32]>,
93 pub to_upload: Vec<[u8; 32]>,
95 to_upload_total_bytes: u64,
97}
98
99impl MerkleUploadPlan {
100 #[must_use]
102 pub fn to_upload_avg_size(&self) -> u64 {
103 if self.to_upload.is_empty() {
104 return 0;
105 }
106
107 self.to_upload_total_bytes / self.to_upload.len() as u64
108 }
109}
110
111impl std::fmt::Debug for PreparedMerkleBatch {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("PreparedMerkleBatch")
114 .field("depth", &self.depth)
115 .field("pool_commitments", &self.pool_commitments.len())
116 .field("merkle_payment_timestamp", &self.merkle_payment_timestamp)
117 .field("candidate_pools", &self.candidate_pools.len())
118 .field("addresses", &self.addresses.len())
119 .finish()
120 }
121}
122
123pub(crate) fn chunk_contents_for_upload_addresses(
128 chunk_contents: Vec<Bytes>,
129 addresses: &[[u8; 32]],
130) -> Result<Vec<Bytes>> {
131 if addresses.is_empty() {
132 return Ok(Vec::new());
133 }
134
135 let mut needed_by_address: HashMap<[u8; 32], usize> = HashMap::new();
136 for address in addresses {
137 *needed_by_address.entry(*address).or_default() += 1;
138 }
139
140 let mut chunks_by_address: HashMap<[u8; 32], VecDeque<Bytes>> =
141 HashMap::with_capacity(needed_by_address.len());
142 let mut remaining = addresses.len();
143 for chunk in chunk_contents {
144 let address = compute_address(&chunk);
145 if let Some(needed) = needed_by_address.get_mut(&address) {
146 if *needed > 0 {
147 chunks_by_address
148 .entry(address)
149 .or_default()
150 .push_back(chunk);
151 *needed -= 1;
152 remaining -= 1;
153 if remaining == 0 {
154 break;
155 }
156 }
157 }
158 }
159
160 for (address, needed) in &needed_by_address {
161 if *needed == 0 {
162 continue;
163 }
164
165 if chunks_by_address.contains_key(address) {
166 return Err(Error::InvalidData(format!(
167 "missing duplicate chunk content for merkle address {}",
168 hex::encode(address)
169 )));
170 }
171
172 return Err(Error::InvalidData(format!(
173 "missing chunk content for merkle address {}",
174 hex::encode(address)
175 )));
176 }
177
178 let mut selected = Vec::with_capacity(addresses.len());
179 for address in addresses {
180 let chunks = chunks_by_address.get_mut(address).ok_or_else(|| {
181 Error::InvalidData(format!(
182 "missing chunk content for merkle address {}",
183 hex::encode(address)
184 ))
185 })?;
186 let chunk = chunks.pop_front().ok_or_else(|| {
187 Error::InvalidData(format!(
188 "missing duplicate chunk content for merkle address {}",
189 hex::encode(address)
190 ))
191 })?;
192 selected.push(chunk);
193 }
194
195 Ok(selected)
196}
197
198fn preflight_stored_status<T>(result: Result<T>) -> Result<bool> {
217 match result {
218 Ok(_) => Ok(false),
219 Err(Error::AlreadyStored) => Ok(true),
220 Err(e) if matches!(classify_error(&e), Outcome::Timeout | Outcome::NetworkError) => {
221 Ok(false)
222 }
223 Err(e) => Err(e),
224 }
225}
226
227#[must_use]
230pub fn should_use_merkle(chunk_count: usize, mode: PaymentMode) -> bool {
231 match mode {
232 PaymentMode::Auto => chunk_count >= DEFAULT_MERKLE_THRESHOLD,
233 PaymentMode::Merkle => chunk_count >= 2,
234 PaymentMode::Single => false,
235 }
236}
237
238impl Client {
239 #[must_use]
241 pub fn should_use_merkle(&self, chunk_count: usize, mode: PaymentMode) -> bool {
242 should_use_merkle(chunk_count, mode)
243 }
244
245 pub async fn pay_for_merkle_batch(
259 &self,
260 addresses: &[[u8; 32]],
261 data_type: u32,
262 data_size: u64,
263 ) -> Result<MerkleBatchPaymentResult> {
264 let chunk_count = addresses.len();
265 if chunk_count < 2 {
266 return Err(Error::Payment(
267 "Merkle batch payment requires at least 2 chunks".to_string(),
268 ));
269 }
270
271 if chunk_count > MAX_LEAVES {
272 return self
273 .pay_for_merkle_multi_batch(addresses, data_type, data_size)
274 .await;
275 }
276
277 self.pay_for_merkle_single_batch(addresses, data_type, data_size)
278 .await
279 }
280
281 pub(crate) async fn plan_merkle_upload(
290 &self,
291 chunks: Vec<([u8; 32], u64)>,
292 data_type: u32,
293 progress: Option<&mpsc::Sender<UploadEvent>>,
294 ) -> Result<MerkleUploadPlan> {
295 let total_chunks = chunks.len();
296 if total_chunks == 0 {
297 return Ok(MerkleUploadPlan::default());
298 }
299
300 info!("Checking {total_chunks} merkle chunks for existing storage before payment");
301
302 let quote_limiter = self.controller().quote.clone();
303 let quote_concurrency = quote_limiter.current().min(total_chunks.max(1));
304 let mut check_stream = stream::iter(chunks.into_iter().enumerate())
305 .map(|(index, (address, data_size))| {
306 let limiter = quote_limiter.clone();
307 async move {
308 let result = observe_op(
309 &limiter,
310 || async move {
311 self.chunk_already_stored_for_merkle(&address, data_type, data_size)
312 .await
313 },
314 classify_error,
315 )
316 .await;
317 (index, address, data_size, result)
318 }
319 })
320 .buffer_unordered(quote_concurrency);
321
322 let mut already_stored: Vec<(usize, [u8; 32])> = Vec::new();
323 let mut to_upload: Vec<(usize, [u8; 32], u64)> = Vec::new();
324 let mut checked = 0usize;
325
326 while let Some((index, address, data_size, result)) = check_stream.next().await {
327 let is_already_stored = result?;
328 checked += 1;
329
330 if let Some(tx) = progress {
331 let _ = tx.try_send(UploadEvent::ChunkQuoted {
332 quoted: checked,
333 total: total_chunks,
334 });
335 }
336
337 if is_already_stored {
338 debug!(
339 "Merkle preflight {checked}/{total_chunks}: chunk {} already stored",
340 hex::encode(address)
341 );
342 already_stored.push((index, address));
343 if let Some(tx) = progress {
344 let _ = tx.try_send(UploadEvent::ChunkStored {
345 stored: already_stored.len(),
346 total: total_chunks,
347 });
348 }
349 } else {
350 debug!(
351 "Merkle preflight {checked}/{total_chunks}: chunk {} needs upload",
352 hex::encode(address)
353 );
354 to_upload.push((index, address, data_size));
355 }
356 }
357
358 already_stored.sort_by_key(|(index, _)| *index);
359 to_upload.sort_by_key(|(index, _, _)| *index);
360
361 let to_upload_total_bytes = to_upload.iter().fold(0u64, |acc, (_, _, data_size)| {
362 acc.saturating_add(*data_size)
363 });
364
365 let already_stored = already_stored
366 .into_iter()
367 .map(|(_, address)| address)
368 .collect::<Vec<_>>();
369 let to_upload = to_upload
370 .into_iter()
371 .map(|(_, address, _)| address)
372 .collect::<Vec<_>>();
373
374 info!(
375 "Merkle preflight complete: {} already stored, {} need upload",
376 already_stored.len(),
377 to_upload.len()
378 );
379
380 Ok(MerkleUploadPlan {
381 already_stored,
382 to_upload,
383 to_upload_total_bytes,
384 })
385 }
386
387 async fn chunk_already_stored_for_merkle(
388 &self,
389 address: &[u8; 32],
390 data_type: u32,
391 data_size: u64,
392 ) -> Result<bool> {
393 let result = self.get_store_quotes(address, data_size, data_type).await;
394 if let Err(e) = &result {
395 if matches!(classify_error(e), Outcome::Timeout | Outcome::NetworkError) {
396 debug!(
397 "Merkle preflight: could not determine stored status for {} ({e}); \
398 treating as not stored and queuing for upload",
399 hex::encode(address)
400 );
401 }
402 }
403 preflight_stored_status(result)
404 }
405
406 pub async fn prepare_merkle_batch_external(
412 &self,
413 addresses: &[[u8; 32]],
414 data_type: u32,
415 data_size: u64,
416 ) -> Result<PreparedMerkleBatch> {
417 let chunk_count = addresses.len();
418 let xornames: Vec<XorName> = addresses.iter().map(|a| XorName(*a)).collect();
419
420 debug!("Building merkle tree for {chunk_count} chunks");
421
422 let tree = MerkleTree::from_xornames(xornames)
424 .map_err(|e| Error::Payment(format!("Failed to build merkle tree: {e}")))?;
425
426 let depth = tree.depth();
427 let merkle_payment_timestamp = std::time::SystemTime::now()
428 .duration_since(std::time::UNIX_EPOCH)
429 .map_err(|e| Error::Payment(format!("System time error: {e}")))?
430 .as_secs();
431
432 debug!("Merkle tree: depth={depth}, leaves={chunk_count}, ts={merkle_payment_timestamp}");
433
434 let midpoint_proofs = tree
436 .reward_candidates(merkle_payment_timestamp)
437 .map_err(|e| Error::Payment(format!("Failed to generate reward candidates: {e}")))?;
438
439 debug!(
440 "Collecting candidate pools from {} midpoints (concurrent)",
441 midpoint_proofs.len()
442 );
443
444 let candidate_pools = self
446 .build_candidate_pools(
447 &midpoint_proofs,
448 data_type,
449 data_size,
450 merkle_payment_timestamp,
451 )
452 .await?;
453
454 let pool_commitments: Vec<PoolCommitment> = candidate_pools
456 .iter()
457 .map(MerklePaymentCandidatePool::to_commitment)
458 .collect();
459
460 Ok(PreparedMerkleBatch {
461 depth,
462 pool_commitments,
463 merkle_payment_timestamp,
464 candidate_pools,
465 tree,
466 addresses: addresses.to_vec(),
467 })
468 }
469
470 async fn pay_for_merkle_single_batch(
472 &self,
473 addresses: &[[u8; 32]],
474 data_type: u32,
475 data_size: u64,
476 ) -> Result<MerkleBatchPaymentResult> {
477 let wallet = self.require_wallet()?;
478 let prepared = self
479 .prepare_merkle_batch_external(addresses, data_type, data_size)
480 .await?;
481
482 info!(
483 "Submitting merkle batch payment on-chain (depth={})",
484 prepared.depth
485 );
486 let (winner_pool_hash, amount, gas_info) = wallet
487 .pay_for_merkle_tree(
488 prepared.depth,
489 prepared.pool_commitments.clone(),
490 prepared.merkle_payment_timestamp,
491 )
492 .await
493 .map_err(|e| Error::Payment(format!("Merkle batch payment failed: {e}")))?;
494
495 info!(
496 "Merkle payment succeeded: winner pool {}",
497 hex::encode(winner_pool_hash)
498 );
499
500 let mut result = finalize_merkle_batch(prepared, winner_pool_hash)?;
501 result.storage_cost_atto = amount.to_string();
502 result.gas_cost_wei = gas_info.gas_cost_wei;
503 Ok(result)
504 }
505
506 async fn pay_for_merkle_multi_batch(
508 &self,
509 addresses: &[[u8; 32]],
510 data_type: u32,
511 data_size: u64,
512 ) -> Result<MerkleBatchPaymentResult> {
513 let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect();
514 let total_sub_batches = sub_batches.len();
515 let mut all_proofs = HashMap::with_capacity(addresses.len());
516 let mut total_storage = Amount::ZERO;
517 let mut total_gas: u128 = 0;
518 let mut oldest_ts: u64 = 0;
522
523 for (i, chunk) in sub_batches.into_iter().enumerate() {
524 match self
525 .pay_for_merkle_single_batch(chunk, data_type, data_size)
526 .await
527 {
528 Ok(sub_result) => {
529 if let Ok(cost) = sub_result.storage_cost_atto.parse::<Amount>() {
530 total_storage += cost;
531 }
532 total_gas = total_gas.saturating_add(sub_result.gas_cost_wei);
533 if oldest_ts == 0
534 || (sub_result.merkle_payment_timestamp > 0
535 && sub_result.merkle_payment_timestamp < oldest_ts)
536 {
537 oldest_ts = sub_result.merkle_payment_timestamp;
538 }
539 all_proofs.extend(sub_result.proofs);
540 }
541 Err(e) => {
542 if all_proofs.is_empty() {
543 return Err(e);
545 }
546 warn!(
548 "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \
549 Returning {} proofs from prior sub-batches",
550 i + 1,
551 all_proofs.len()
552 );
553 return Ok(MerkleBatchPaymentResult {
554 chunk_count: all_proofs.len(),
555 proofs: all_proofs,
556 storage_cost_atto: total_storage.to_string(),
557 gas_cost_wei: total_gas,
558 merkle_payment_timestamp: oldest_ts,
559 });
560 }
561 }
562 }
563
564 Ok(MerkleBatchPaymentResult {
565 chunk_count: addresses.len(),
566 proofs: all_proofs,
567 storage_cost_atto: total_storage.to_string(),
568 gas_cost_wei: total_gas,
569 merkle_payment_timestamp: oldest_ts,
570 })
571 }
572
573 async fn build_candidate_pools(
575 &self,
576 midpoint_proofs: &[MidpointProof],
577 data_type: u32,
578 data_size: u64,
579 merkle_payment_timestamp: u64,
580 ) -> Result<Vec<MerklePaymentCandidatePool>> {
581 let mut pool_futures = FuturesUnordered::new();
582
583 for midpoint_proof in midpoint_proofs {
584 let pool_address = midpoint_proof.address();
585 let mp = midpoint_proof.clone();
586 pool_futures.push(async move {
587 let candidate_nodes = self
588 .get_merkle_candidate_pool(
589 &pool_address.0,
590 data_type,
591 data_size,
592 merkle_payment_timestamp,
593 )
594 .await?;
595 Ok::<_, Error>(MerklePaymentCandidatePool {
596 midpoint_proof: mp,
597 candidate_nodes,
598 })
599 });
600 }
601
602 let mut pools = Vec::with_capacity(midpoint_proofs.len());
603 while let Some(result) = pool_futures.next().await {
604 pools.push(result?);
605 }
606
607 Ok(pools)
608 }
609
610 #[allow(clippy::too_many_lines)]
612 async fn get_merkle_candidate_pool(
613 &self,
614 address: &[u8; 32],
615 data_type: u32,
616 data_size: u64,
617 merkle_payment_timestamp: u64,
618 ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
619 let node = self.network().node();
620 let timeout = Duration::from_secs(self.config().quote_timeout_secs);
621
622 let query_count = CANDIDATES_PER_POOL * 2;
624 let mut remote_peers = self
625 .network()
626 .find_closest_peers(address, query_count)
627 .await?;
628
629 if remote_peers.len() < CANDIDATES_PER_POOL {
633 let connected = self.network().connected_peers().await;
634 for peer in connected {
635 if !remote_peers.iter().any(|(id, _)| *id == peer) {
636 remote_peers.push((peer, vec![]));
637 }
638 }
639 }
640
641 if remote_peers.len() < CANDIDATES_PER_POOL {
642 return Err(Error::InsufficientPeers(format!(
643 "Found {} peers, need {CANDIDATES_PER_POOL} for merkle candidate pool. \
644 Use --no-merkle or a larger network.",
645 remote_peers.len()
646 )));
647 }
648
649 let mut candidate_futures = FuturesUnordered::new();
650
651 for (peer_id, peer_addrs) in &remote_peers {
652 let request_id = self.next_request_id();
653 let request = MerkleCandidateQuoteRequest {
654 address: *address,
655 data_type,
656 data_size,
657 merkle_payment_timestamp,
658 };
659 let message = ChunkMessage {
660 request_id,
661 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
662 };
663
664 let message_bytes = match message.encode() {
665 Ok(bytes) => bytes,
666 Err(e) => {
667 warn!("Failed to encode merkle candidate request for {peer_id}: {e}");
668 continue;
669 }
670 };
671
672 let peer_id_clone = *peer_id;
673 let addrs_clone = peer_addrs.clone();
674 let node_clone = node.clone();
675
676 let fut = async move {
677 let result = send_and_await_chunk_response(
678 &node_clone,
679 &peer_id_clone,
680 message_bytes,
681 request_id,
682 timeout,
683 &addrs_clone,
684 |body| match body {
685 ChunkMessageBody::MerkleCandidateQuoteResponse(
686 MerkleCandidateQuoteResponse::Success { candidate_node },
687 ) => {
688 match rmp_serde::from_slice::<MerklePaymentCandidateNode>(
689 &candidate_node,
690 ) {
691 Ok(node) => Some(Ok(node)),
692 Err(e) => Some(Err(Error::Serialization(format!(
693 "Failed to deserialize candidate node from {peer_id_clone}: {e}"
694 )))),
695 }
696 }
697 ChunkMessageBody::MerkleCandidateQuoteResponse(
698 MerkleCandidateQuoteResponse::Error(e),
699 ) => Some(Err(Error::Protocol(format!(
700 "Merkle quote error from {peer_id_clone}: {e}"
701 )))),
702 _ => None,
703 },
704 |e| {
705 Error::Network(format!(
706 "Failed to send merkle candidate request to {peer_id_clone}: {e}"
707 ))
708 },
709 || {
710 Error::Timeout(format!(
711 "Timeout waiting for merkle candidate from {peer_id_clone}"
712 ))
713 },
714 )
715 .await;
716
717 (peer_id_clone, result)
718 };
719
720 candidate_futures.push(fut);
721 }
722
723 self.collect_validated_candidates(&mut candidate_futures, address, merkle_payment_timestamp)
724 .await
725 }
726
727 async fn collect_validated_candidates(
738 &self,
739 futures: &mut FuturesUnordered<
740 impl std::future::Future<
741 Output = (
742 PeerId,
743 std::result::Result<MerklePaymentCandidateNode, Error>,
744 ),
745 >,
746 >,
747 target_address: &[u8; 32],
748 merkle_payment_timestamp: u64,
749 ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
750 let mut valid: Vec<(PeerId, MerklePaymentCandidateNode)> = Vec::new();
751 let mut failures: Vec<String> = Vec::new();
752
753 while let Some((peer_id, result)) = futures.next().await {
754 match result {
755 Ok(candidate) => {
756 if !verify_merkle_candidate_signature(&candidate) {
757 warn!("Invalid ML-DSA-65 signature from merkle candidate {peer_id}");
758 failures.push(format!("{peer_id}: invalid signature"));
759 continue;
760 }
761 if candidate.merkle_payment_timestamp != merkle_payment_timestamp {
762 warn!("Timestamp mismatch from merkle candidate {peer_id}");
763 failures.push(format!("{peer_id}: timestamp mismatch"));
764 continue;
765 }
766 valid.push((peer_id, candidate));
767 }
768 Err(e) => {
769 debug!("Failed to get merkle candidate from {peer_id}: {e}");
770 failures.push(format!("{peer_id}: {e}"));
771 }
772 }
773 }
774
775 if valid.len() < CANDIDATES_PER_POOL {
776 return Err(Error::InsufficientPeers(format!(
777 "Got {} merkle candidates, need {CANDIDATES_PER_POOL}. Failures: [{}]",
778 valid.len(),
779 failures.join("; ")
780 )));
781 }
782
783 let target_peer = PeerId::from_bytes(*target_address);
784 valid.sort_by_key(|(peer_id, _)| peer_id.xor_distance(&target_peer));
785
786 let candidates: Vec<MerklePaymentCandidateNode> = valid
787 .into_iter()
788 .take(CANDIDATES_PER_POOL)
789 .map(|(_, candidate)| candidate)
790 .collect();
791
792 candidates
793 .try_into()
794 .map_err(|_| Error::Payment("Failed to convert candidates to fixed array".to_string()))
795 }
796
797 pub(crate) async fn merkle_upload_chunks(
818 &self,
819 chunk_contents: Vec<Bytes>,
820 addresses: Vec<[u8; 32]>,
821 batch_result: &MerkleBatchPaymentResult,
822 progress: Option<&mpsc::Sender<UploadEvent>>,
823 stored_offset: usize,
824 total_chunks: usize,
825 ) -> Result<MerkleStoreOutcome> {
826 let store_limiter = self.controller().store.clone();
827 let batch_size = chunk_contents.len();
830 if batch_size != addresses.len() {
831 return Err(Error::InvalidData(format!(
832 "merkle upload has {batch_size} chunk contents but {} addresses",
833 addresses.len()
834 )));
835 }
836 let store_concurrency = store_limiter.current().min(batch_size.max(1));
837
838 let chunks: Vec<([u8; 32], Bytes)> = addresses.into_iter().zip(chunk_contents).collect();
839
840 let store_one = |addr: [u8; 32], content: Bytes| {
845 let limiter = store_limiter.clone();
846 let proof_bytes = batch_result.proofs.get(&addr).cloned();
847 async move {
848 let started = std::time::Instant::now();
849 let proof = proof_bytes.ok_or_else(|| {
850 Error::Payment(format!(
851 "Missing merkle proof for chunk {}",
852 hex::encode(addr)
853 ))
854 })?;
855 let peers = self.close_group_peers(&addr).await?;
856 observe_op(
857 &limiter,
858 || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
859 classify_error,
860 )
861 .await
862 .map(|_| started)
863 }
864 };
865
866 merkle_store_with_retry(
867 chunks,
868 store_concurrency,
869 MERKLE_STORE_MAX_ATTEMPTS,
870 MERKLE_RETRY_BACKOFF,
871 progress,
872 stored_offset,
873 total_chunks,
874 store_one,
875 )
876 .await
877 }
878}
879
880pub(crate) const MERKLE_STORE_MAX_ATTEMPTS: usize = 4;
892
893pub(crate) const MERKLE_RETRY_BACKOFF: Duration = Duration::from_secs(30);
899
900const MERKLE_RETRY_JITTER: f64 = 0.1;
903
904#[derive(Debug, Default)]
907pub(crate) struct MerkleStoreOutcome {
908 pub stored: usize,
911 pub failed: usize,
913 pub failed_addresses: Vec<([u8; 32], String)>,
918 pub stats: crate::data::client::batch::WaveAggregateStats,
920}
921
922#[allow(clippy::too_many_arguments)]
934pub(crate) async fn merkle_store_with_retry<F, Fut>(
935 chunks: Vec<([u8; 32], Bytes)>,
936 store_concurrency: usize,
937 max_attempts: usize,
938 backoff: Duration,
939 progress: Option<&mpsc::Sender<UploadEvent>>,
940 stored_offset: usize,
941 total: usize,
942 store_one: F,
943) -> Result<MerkleStoreOutcome>
944where
945 F: Fn([u8; 32], Bytes) -> Fut,
946 Fut: std::future::Future<Output = Result<std::time::Instant>>,
947{
948 let attempts = max_attempts.max(1);
949 let mut outcome = MerkleStoreOutcome {
950 stored: stored_offset,
951 ..MerkleStoreOutcome::default()
952 };
953 let mut pending = chunks;
954
955 for attempt in 0..attempts {
956 let concurrency = store_concurrency.min(pending.len().max(1)).max(1);
957 let mut next_failed: Vec<([u8; 32], Bytes, String)> = Vec::new();
961
962 let mut upload_stream = stream::iter(pending.into_iter().map(|(addr, content)| {
963 let fut = store_one(addr, content.clone());
964 async move { (addr, content, fut.await) }
965 }))
966 .buffer_unordered(concurrency);
967
968 while let Some((addr, content, result)) = upload_stream.next().await {
969 outcome.stats.chunk_attempts_total =
970 outcome.stats.chunk_attempts_total.saturating_add(1);
971 match result {
972 Ok(started) => {
973 let duration_ms =
974 u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
975 outcome.stats.store_durations_ms.push(duration_ms);
976 let idx = attempt.min(outcome.stats.retries_histogram.len().saturating_sub(1));
977 outcome.stats.retries_histogram[idx] =
978 outcome.stats.retries_histogram[idx].saturating_add(1);
979 outcome.stored += 1;
980 if let Some(tx) = progress {
981 let _ = tx.try_send(UploadEvent::ChunkStored {
982 stored: outcome.stored,
983 total,
984 });
985 }
986 }
987 Err(e @ Error::InsufficientPeers(_)) => {
988 next_failed.push((addr, content, e.to_string()));
989 }
990 Err(e) => return Err(e),
991 }
992 }
993
994 if next_failed.is_empty() {
995 break;
996 }
997
998 if attempt + 1 < attempts {
999 warn!(
1000 failed = next_failed.len(),
1001 attempt = attempt + 1,
1002 "merkle chunks short of quorum, retrying after backoff"
1003 );
1004 pending = next_failed
1005 .into_iter()
1006 .map(|(addr, content, _msg)| (addr, content))
1007 .collect();
1008 if backoff > Duration::ZERO {
1009 let wait = {
1014 let mut rng = rand::thread_rng();
1015 let factor = 1.0 + rng.gen_range(-MERKLE_RETRY_JITTER..=MERKLE_RETRY_JITTER);
1016 backoff.mul_f64(factor)
1017 };
1018 tokio::time::sleep(wait).await;
1019 }
1020 } else {
1021 outcome.failed = next_failed.len();
1022 outcome.failed_addresses = next_failed
1023 .into_iter()
1024 .map(|(addr, _content, msg)| (addr, msg))
1025 .collect();
1026 break;
1027 }
1028 }
1029
1030 Ok(outcome)
1031}
1032
1033pub fn finalize_merkle_batch(
1038 prepared: PreparedMerkleBatch,
1039 winner_pool_hash: [u8; 32],
1040) -> Result<MerkleBatchPaymentResult> {
1041 let chunk_count = prepared.addresses.len();
1042 let xornames: Vec<XorName> = prepared.addresses.iter().map(|a| XorName(*a)).collect();
1043
1044 let winner_pool = prepared
1046 .candidate_pools
1047 .iter()
1048 .find(|pool| pool.hash() == winner_pool_hash)
1049 .ok_or_else(|| {
1050 Error::Payment(format!(
1051 "Winner pool {} not found in candidate pools",
1052 hex::encode(winner_pool_hash)
1053 ))
1054 })?;
1055
1056 info!("Generating merkle proofs for {chunk_count} chunks");
1058 let mut proofs = HashMap::with_capacity(chunk_count);
1059
1060 for (i, xorname) in xornames.iter().enumerate() {
1061 let address_proof = prepared
1062 .tree
1063 .generate_address_proof(i, *xorname)
1064 .map_err(|e| {
1065 Error::Payment(format!(
1066 "Failed to generate address proof for chunk {i}: {e}"
1067 ))
1068 })?;
1069
1070 let merkle_proof = MerklePaymentProof::new(*xorname, address_proof, winner_pool.clone());
1071
1072 let tagged_bytes = serialize_merkle_proof(&merkle_proof)
1073 .map_err(|e| Error::Serialization(format!("Failed to serialize merkle proof: {e}")))?;
1074
1075 proofs.insert(prepared.addresses[i], tagged_bytes);
1076 }
1077
1078 info!("Merkle batch payment complete: {chunk_count} proofs generated");
1079
1080 Ok(MerkleBatchPaymentResult {
1081 proofs,
1082 chunk_count,
1083 storage_cost_atto: "0".to_string(),
1084 gas_cost_wei: 0,
1085 merkle_payment_timestamp: prepared.merkle_payment_timestamp,
1086 })
1087}
1088
1089#[cfg(test)]
1091mod send_assertions {
1092 use super::*;
1093 use crate::data::client::Client;
1094
1095 fn _assert_send<T: Send>(_: &T) {}
1096
1097 #[allow(
1098 dead_code,
1099 unreachable_code,
1100 unused_variables,
1101 clippy::diverging_sub_expression
1102 )]
1103 async fn _merkle_upload_chunks_is_send(client: &Client) {
1104 let batch_result: MerkleBatchPaymentResult = todo!();
1105 let fut = client.merkle_upload_chunks(Vec::new(), Vec::new(), &batch_result, None, 0, 0);
1106 _assert_send(&fut);
1107 }
1108}
1109
1110#[cfg(test)]
1111#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1112mod tests {
1113 use super::*;
1114 use ant_protocol::evm::{Amount, MerkleTree, RewardsAddress, CANDIDATES_PER_POOL};
1115
1116 #[test]
1121 fn test_auto_below_threshold() {
1122 assert!(!should_use_merkle(1, PaymentMode::Auto));
1123 assert!(!should_use_merkle(10, PaymentMode::Auto));
1124 assert!(!should_use_merkle(63, PaymentMode::Auto));
1125 }
1126
1127 #[test]
1128 fn test_auto_at_and_above_threshold() {
1129 assert!(should_use_merkle(64, PaymentMode::Auto));
1130 assert!(should_use_merkle(65, PaymentMode::Auto));
1131 assert!(should_use_merkle(1000, PaymentMode::Auto));
1132 }
1133
1134 #[test]
1135 fn test_merkle_mode_forces_at_2() {
1136 assert!(!should_use_merkle(1, PaymentMode::Merkle));
1137 assert!(should_use_merkle(2, PaymentMode::Merkle));
1138 assert!(should_use_merkle(3, PaymentMode::Merkle));
1139 }
1140
1141 #[test]
1142 fn test_single_mode_always_false() {
1143 assert!(!should_use_merkle(0, PaymentMode::Single));
1144 assert!(!should_use_merkle(64, PaymentMode::Single));
1145 assert!(!should_use_merkle(1000, PaymentMode::Single));
1146 }
1147
1148 #[test]
1149 fn test_default_mode_is_auto() {
1150 assert_eq!(PaymentMode::default(), PaymentMode::Auto);
1151 }
1152
1153 #[test]
1154 fn test_threshold_value() {
1155 assert_eq!(DEFAULT_MERKLE_THRESHOLD, 64);
1156 }
1157
1158 #[test]
1163 fn test_preflight_quotes_gathered_means_not_stored() {
1164 assert!(matches!(preflight_stored_status(Ok(())), Ok(false)));
1165 }
1166
1167 #[test]
1168 fn test_preflight_already_stored_is_stored() {
1169 let r: Result<()> = Err(Error::AlreadyStored);
1170 assert!(matches!(preflight_stored_status(r), Ok(true)));
1171 }
1172
1173 #[test]
1177 fn test_preflight_transient_quote_failure_does_not_abort() {
1178 let insufficient: Result<()> =
1180 Err(Error::InsufficientPeers("Got 5 quotes, need 7".to_string()));
1181 assert!(
1182 matches!(preflight_stored_status(insufficient), Ok(false)),
1183 "insufficient-peers during preflight must degrade to not-stored, not error"
1184 );
1185
1186 let timeout: Result<()> = Err(Error::Timeout("Timeout waiting for quote".to_string()));
1187 assert!(matches!(preflight_stored_status(timeout), Ok(false)));
1188
1189 let network: Result<()> = Err(Error::Network("connection reset".to_string()));
1190 assert!(matches!(preflight_stored_status(network), Ok(false)));
1191 }
1192
1193 #[test]
1196 fn test_preflight_application_error_propagates() {
1197 let payment: Result<()> = Err(Error::Payment("bad payment".to_string()));
1198 assert!(matches!(
1199 preflight_stored_status(payment),
1200 Err(Error::Payment(_))
1201 ));
1202 }
1203
1204 #[test]
1205 fn chunk_contents_for_upload_addresses_preserves_requested_order() {
1206 let first = Bytes::from_static(b"first");
1207 let second = Bytes::from_static(b"second");
1208 let first_addr = compute_address(&first);
1209 let second_addr = compute_address(&second);
1210
1211 let selected = chunk_contents_for_upload_addresses(
1212 vec![first.clone(), second.clone()],
1213 &[second_addr, first_addr],
1214 )
1215 .unwrap();
1216
1217 assert_eq!(selected, vec![second, first]);
1218 }
1219
1220 #[test]
1221 fn chunk_contents_for_upload_addresses_preserves_duplicate_requests() {
1222 let repeated = Bytes::from_static(b"same-content");
1223 let other = Bytes::from_static(b"other-content");
1224 let repeated_addr = compute_address(&repeated);
1225
1226 let selected = chunk_contents_for_upload_addresses(
1227 vec![repeated.clone(), other, repeated.clone()],
1228 &[repeated_addr, repeated_addr],
1229 )
1230 .unwrap();
1231
1232 assert_eq!(selected, vec![repeated.clone(), repeated]);
1233 }
1234
1235 #[test]
1236 fn chunk_contents_for_upload_addresses_ignores_unrequested_duplicates() {
1237 let requested = Bytes::from_static(b"requested-content");
1238 let unrequested = Bytes::from_static(b"unrequested-content");
1239 let requested_addr = compute_address(&requested);
1240
1241 let selected = chunk_contents_for_upload_addresses(
1242 vec![
1243 unrequested.clone(),
1244 requested.clone(),
1245 unrequested.clone(),
1246 unrequested,
1247 ],
1248 &[requested_addr],
1249 )
1250 .unwrap();
1251
1252 assert_eq!(selected, vec![requested]);
1253 }
1254
1255 #[test]
1256 fn chunk_contents_for_upload_addresses_errors_for_missing_content() {
1257 let present = Bytes::from_static(b"present-content");
1258 let missing = Bytes::from_static(b"missing-content");
1259 let missing_addr = compute_address(&missing);
1260
1261 let result = chunk_contents_for_upload_addresses(vec![present], &[missing_addr]);
1262
1263 assert!(matches!(result, Err(Error::InvalidData(_))));
1264 }
1265
1266 fn make_test_addresses(count: usize) -> Vec<[u8; 32]> {
1271 (0..count)
1272 .map(|i| {
1273 let xn = XorName::from_content(&i.to_le_bytes());
1274 xn.0
1275 })
1276 .collect()
1277 }
1278
1279 #[test]
1280 fn test_tree_depth_for_known_sizes() {
1281 let cases = [(2, 1), (4, 2), (16, 4), (100, 7), (256, 8)];
1282 for (count, expected_depth) in cases {
1283 let addrs = make_test_addresses(count);
1284 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1285 let tree = MerkleTree::from_xornames(xornames).unwrap();
1286 assert_eq!(
1287 tree.depth(),
1288 expected_depth,
1289 "depth mismatch for {count} leaves"
1290 );
1291 }
1292 }
1293
1294 #[test]
1295 fn test_proof_generation_and_verification_for_all_leaves() {
1296 let addrs = make_test_addresses(16);
1297 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1298 let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1299
1300 for (i, xn) in xornames.iter().enumerate() {
1301 let proof = tree.generate_address_proof(i, *xn).unwrap();
1302 assert!(proof.verify(), "proof for leaf {i} should verify");
1303 assert_eq!(proof.depth(), tree.depth() as usize);
1304 }
1305 }
1306
1307 #[test]
1308 fn test_proof_fails_for_wrong_address() {
1309 let addrs = make_test_addresses(8);
1310 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1311 let tree = MerkleTree::from_xornames(xornames).unwrap();
1312
1313 let wrong = XorName::from_content(b"wrong");
1314 let proof = tree.generate_address_proof(0, wrong).unwrap();
1315 assert!(!proof.verify(), "proof with wrong address should fail");
1316 }
1317
1318 #[test]
1319 fn test_tree_too_few_leaves() {
1320 let xornames = vec![XorName::from_content(b"only_one")];
1321 let result = MerkleTree::from_xornames(xornames);
1322 assert!(result.is_err());
1323 }
1324
1325 #[test]
1326 fn test_tree_at_max_leaves() {
1327 let addrs = make_test_addresses(MAX_LEAVES);
1328 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1329 let tree = MerkleTree::from_xornames(xornames).unwrap();
1330 assert_eq!(tree.leaf_count(), MAX_LEAVES);
1331 }
1332
1333 #[test]
1338 fn test_merkle_proof_serialize_deserialize_roundtrip() {
1339 use ant_protocol::evm::{Amount, MerklePaymentCandidateNode, RewardsAddress};
1340 use ant_protocol::payment::{deserialize_merkle_proof, serialize_merkle_proof};
1341
1342 let addrs = make_test_addresses(4);
1343 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1344 let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1345
1346 let timestamp = std::time::SystemTime::now()
1347 .duration_since(std::time::UNIX_EPOCH)
1348 .unwrap()
1349 .as_secs();
1350
1351 let candidates = tree.reward_candidates(timestamp).unwrap();
1352 let midpoint = candidates.first().unwrap().clone();
1353
1354 #[allow(clippy::cast_possible_truncation)]
1356 let candidate_nodes: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
1357 std::array::from_fn(|i| MerklePaymentCandidateNode {
1358 pub_key: vec![i as u8; 32],
1359 price: Amount::from(1024u64),
1360 reward_address: RewardsAddress::new([i as u8; 20]),
1361 merkle_payment_timestamp: timestamp,
1362 signature: vec![i as u8; 64],
1363 });
1364
1365 let pool = MerklePaymentCandidatePool {
1366 midpoint_proof: midpoint,
1367 candidate_nodes,
1368 };
1369
1370 let address_proof = tree.generate_address_proof(0, xornames[0]).unwrap();
1371 let merkle_proof = MerklePaymentProof::new(xornames[0], address_proof, pool);
1372
1373 let tagged = serialize_merkle_proof(&merkle_proof).unwrap();
1374 assert_eq!(
1375 tagged.first().copied(),
1376 Some(0x02),
1377 "tag should be PROOF_TAG_MERKLE"
1378 );
1379
1380 let deserialized = deserialize_merkle_proof(&tagged).unwrap();
1381 assert_eq!(deserialized.address, merkle_proof.address);
1382 assert_eq!(
1383 deserialized.winner_pool.candidate_nodes.len(),
1384 CANDIDATES_PER_POOL
1385 );
1386 }
1387
1388 #[test]
1393 fn test_candidate_wrong_timestamp_rejected() {
1394 let candidate = MerklePaymentCandidateNode {
1396 pub_key: vec![0u8; 32],
1397 price: ant_protocol::evm::Amount::ZERO,
1398 reward_address: ant_protocol::evm::RewardsAddress::new([0u8; 20]),
1399 merkle_payment_timestamp: 1000,
1400 signature: vec![0u8; 64],
1401 };
1402
1403 assert_ne!(candidate.merkle_payment_timestamp, 2000);
1405 }
1406
1407 fn make_dummy_candidate_nodes(
1412 timestamp: u64,
1413 ) -> [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] {
1414 std::array::from_fn(|i| MerklePaymentCandidateNode {
1415 pub_key: vec![i as u8; 32],
1416 price: Amount::from(1024u64),
1417 reward_address: RewardsAddress::new([i as u8; 20]),
1418 merkle_payment_timestamp: timestamp,
1419 signature: vec![i as u8; 64],
1420 })
1421 }
1422
1423 fn make_prepared_merkle_batch(count: usize) -> PreparedMerkleBatch {
1424 let addrs = make_test_addresses(count);
1425 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1426 let tree = MerkleTree::from_xornames(xornames).unwrap();
1427
1428 let timestamp = std::time::SystemTime::now()
1429 .duration_since(std::time::UNIX_EPOCH)
1430 .unwrap()
1431 .as_secs();
1432
1433 let midpoints = tree.reward_candidates(timestamp).unwrap();
1434
1435 let candidate_pools: Vec<MerklePaymentCandidatePool> = midpoints
1436 .into_iter()
1437 .map(|mp| MerklePaymentCandidatePool {
1438 midpoint_proof: mp,
1439 candidate_nodes: make_dummy_candidate_nodes(timestamp),
1440 })
1441 .collect();
1442
1443 let pool_commitments = candidate_pools
1444 .iter()
1445 .map(MerklePaymentCandidatePool::to_commitment)
1446 .collect();
1447
1448 PreparedMerkleBatch {
1449 depth: tree.depth(),
1450 pool_commitments,
1451 merkle_payment_timestamp: timestamp,
1452 candidate_pools,
1453 tree,
1454 addresses: addrs,
1455 }
1456 }
1457
1458 #[test]
1459 fn test_finalize_merkle_batch_with_valid_winner() {
1460 let prepared = make_prepared_merkle_batch(4);
1461 let winner_hash = prepared.candidate_pools[0].hash();
1462
1463 let result = finalize_merkle_batch(prepared, winner_hash);
1464 assert!(
1465 result.is_ok(),
1466 "should succeed with valid winner: {result:?}"
1467 );
1468
1469 let batch = result.unwrap();
1470 assert_eq!(batch.chunk_count, 4);
1471 assert_eq!(batch.proofs.len(), 4);
1472
1473 for proof_bytes in batch.proofs.values() {
1475 assert!(!proof_bytes.is_empty());
1476 }
1477 }
1478
1479 #[test]
1480 fn test_finalize_merkle_batch_with_invalid_winner() {
1481 let prepared = make_prepared_merkle_batch(4);
1482 let bad_hash = [0xFF; 32];
1483
1484 let result = finalize_merkle_batch(prepared, bad_hash);
1485 assert!(result.is_err());
1486 let err = result.unwrap_err().to_string();
1487 assert!(err.contains("not found in candidate pools"), "got: {err}");
1488 }
1489
1490 #[test]
1491 fn test_finalize_merkle_batch_proofs_are_deserializable() {
1492 use ant_protocol::payment::deserialize_merkle_proof;
1493
1494 let prepared = make_prepared_merkle_batch(8);
1495 let winner_hash = prepared.candidate_pools[0].hash();
1496
1497 let batch = finalize_merkle_batch(prepared, winner_hash).unwrap();
1498
1499 for (addr, proof_bytes) in &batch.proofs {
1500 let proof = deserialize_merkle_proof(proof_bytes);
1501 assert!(
1502 proof.is_ok(),
1503 "proof for {} should deserialize: {:?}",
1504 hex::encode(addr),
1505 proof.err()
1506 );
1507 }
1508 }
1509
1510 #[test]
1515 fn test_batch_split_calculation() {
1516 let addrs = make_test_addresses(MAX_LEAVES);
1518 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 1);
1519
1520 let addrs = make_test_addresses(MAX_LEAVES + 1);
1522 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 2);
1523
1524 let addrs = make_test_addresses(3 * MAX_LEAVES);
1526 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 3);
1527 }
1528
1529 use std::sync::{Arc, Mutex};
1534
1535 fn make_chunks(count: usize) -> Vec<([u8; 32], Bytes)> {
1537 make_test_addresses(count)
1538 .into_iter()
1539 .map(|addr| (addr, Bytes::from_static(b"chunk")))
1540 .collect()
1541 }
1542
1543 #[tokio::test]
1547 async fn store_with_retry_collects_failures_instead_of_aborting() {
1548 let chunks = make_chunks(6);
1549 let failing: std::collections::HashSet<[u8; 32]> =
1550 chunks.iter().take(2).map(|(a, _)| *a).collect();
1551 let failing_for_closure = failing.clone();
1552
1553 let store_one = move |addr: [u8; 32], _content: Bytes| {
1554 let fail = failing_for_closure.contains(&addr);
1555 async move {
1556 if fail {
1557 Err(Error::InsufficientPeers("test shortfall".into()))
1558 } else {
1559 Ok(std::time::Instant::now())
1560 }
1561 }
1562 };
1563
1564 let outcome = merkle_store_with_retry(chunks, 8, 1, Duration::ZERO, None, 0, 6, store_one)
1565 .await
1566 .expect("quorum shortfalls must not abort the batch");
1567
1568 assert_eq!(outcome.stored, 4);
1569 assert_eq!(outcome.failed, 2);
1570 assert_eq!(outcome.stats.retries_histogram[0], 4);
1572 assert_eq!(outcome.stats.chunk_attempts_total, 6);
1573 }
1574
1575 #[tokio::test]
1577 async fn store_with_retry_propagates_non_quorum_errors() {
1578 let chunks = make_chunks(3);
1579 let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1580 Err::<std::time::Instant, _>(Error::Payment("missing proof".into()))
1581 };
1582
1583 let result =
1584 merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, 3, store_one).await;
1585 assert!(matches!(result, Err(Error::Payment(_))));
1586 }
1587
1588 #[tokio::test]
1590 async fn store_with_retry_retries_only_the_failed_set() {
1591 let chunks = make_chunks(5);
1592 let total = chunks.len();
1593 let failing: std::collections::HashSet<[u8; 32]> =
1594 chunks.iter().take(2).map(|(a, _)| *a).collect();
1595 let failing_for_closure = failing.clone();
1596
1597 let calls = Arc::new(Mutex::new(Vec::<[u8; 32]>::new()));
1599 let calls_for_closure = calls.clone();
1600
1601 let store_one = move |addr: [u8; 32], _content: Bytes| {
1602 let calls = calls_for_closure.clone();
1603 let already_seen = calls.lock().unwrap().iter().filter(|&&a| a == addr).count();
1605 let fail = failing_for_closure.contains(&addr) && already_seen == 0;
1606 calls.lock().unwrap().push(addr);
1607 async move {
1608 if fail {
1609 Err(Error::InsufficientPeers("round-1 shortfall".into()))
1610 } else {
1611 Ok(std::time::Instant::now())
1612 }
1613 }
1614 };
1615
1616 let outcome =
1617 merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1618 .await
1619 .expect("should converge after retry");
1620
1621 assert_eq!(outcome.stored, total);
1622 assert_eq!(outcome.failed, 0);
1623
1624 let calls = calls.lock().unwrap();
1628 assert_eq!(calls.len(), total + failing.len());
1629 let round_two: std::collections::HashSet<[u8; 32]> =
1630 calls[total..].iter().copied().collect();
1631 assert_eq!(round_two, failing);
1632 }
1633
1634 #[tokio::test]
1637 async fn store_with_retry_counts_retry_success_once_in_histogram() {
1638 let chunks = make_chunks(4);
1639 let total = chunks.len();
1640 let flaky_addr = chunks[0].0;
1641
1642 let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
1643 let attempts_for_closure = attempts.clone();
1644
1645 let store_one = move |addr: [u8; 32], _content: Bytes| {
1646 let attempts = attempts_for_closure.clone();
1647 let n = {
1648 let mut m = attempts.lock().unwrap();
1649 let entry = m.entry(addr).or_insert(0);
1650 *entry += 1;
1651 *entry
1652 };
1653 let fail = addr == flaky_addr && n == 1;
1654 async move {
1655 if fail {
1656 Err(Error::InsufficientPeers("transient".into()))
1657 } else {
1658 Ok(std::time::Instant::now())
1659 }
1660 }
1661 };
1662
1663 let outcome =
1664 merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1665 .await
1666 .expect("flaky chunk should recover on retry");
1667
1668 assert_eq!(outcome.stored, total);
1669 assert_eq!(outcome.failed, 0);
1670 assert_eq!(outcome.stats.retries_histogram[0], total - 1);
1672 assert_eq!(outcome.stats.retries_histogram[1], 1);
1673 assert_eq!(outcome.stats.chunk_attempts_total, total + 1);
1675 }
1676
1677 #[tokio::test]
1682 async fn store_with_retry_reports_all_failed_when_retries_exhausted() {
1683 let chunks = make_chunks(3);
1684 let total = chunks.len();
1685
1686 let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1687 Err::<std::time::Instant, _>(Error::InsufficientPeers("never converges".into()))
1688 };
1689
1690 let outcome = merkle_store_with_retry(
1691 chunks,
1692 8,
1693 MERKLE_STORE_MAX_ATTEMPTS,
1694 Duration::ZERO,
1695 None,
1696 0,
1697 total,
1698 store_one,
1699 )
1700 .await
1701 .expect("an exhausted retry budget is reported, not propagated as Err");
1702
1703 assert_eq!(outcome.stored, 0);
1704 assert_eq!(outcome.failed, total);
1705 assert_eq!(
1707 outcome.stats.chunk_attempts_total,
1708 total * MERKLE_STORE_MAX_ATTEMPTS
1709 );
1710 assert_eq!(outcome.stats.retries_histogram, [0; 4]);
1712 }
1713
1714 #[tokio::test]
1719 async fn store_with_retry_records_failed_addresses_when_exhausted() {
1720 let chunks = make_chunks(6);
1721 let failing: std::collections::HashSet<[u8; 32]> =
1722 chunks.iter().take(2).map(|(a, _)| *a).collect();
1723 let failing_for_closure = failing.clone();
1724
1725 let store_one = move |addr: [u8; 32], _content: Bytes| {
1726 let fail = failing_for_closure.contains(&addr);
1727 async move {
1728 if fail {
1729 Err(Error::InsufficientPeers("permanent shortfall".into()))
1730 } else {
1731 Ok(std::time::Instant::now())
1732 }
1733 }
1734 };
1735
1736 let outcome = merkle_store_with_retry(
1737 chunks,
1738 8,
1739 MERKLE_STORE_MAX_ATTEMPTS,
1740 Duration::ZERO,
1741 None,
1742 0,
1743 6,
1744 store_one,
1745 )
1746 .await
1747 .expect("quorum shortfalls must not abort the batch");
1748
1749 assert_eq!(outcome.stored, 4);
1750 assert_eq!(outcome.failed, 2);
1751 assert_eq!(outcome.failed_addresses.len(), 2);
1753 let reported: std::collections::HashSet<[u8; 32]> =
1754 outcome.failed_addresses.iter().map(|(a, _)| *a).collect();
1755 assert_eq!(reported, failing);
1756 for (_, msg) in &outcome.failed_addresses {
1758 assert!(msg.contains("permanent shortfall"));
1759 }
1760 }
1761
1762 #[tokio::test]
1765 async fn store_with_retry_failed_addresses_empty_on_full_success() {
1766 let chunks = make_chunks(4);
1767 let total = chunks.len();
1768 let store_one =
1769 |_addr: [u8; 32], _content: Bytes| async move { Ok(std::time::Instant::now()) };
1770
1771 let outcome = merkle_store_with_retry(
1772 chunks,
1773 8,
1774 MERKLE_STORE_MAX_ATTEMPTS,
1775 Duration::ZERO,
1776 None,
1777 0,
1778 total,
1779 store_one,
1780 )
1781 .await
1782 .expect("all chunks store");
1783
1784 assert_eq!(outcome.stored, total);
1785 assert_eq!(outcome.failed, 0);
1786 assert!(outcome.failed_addresses.is_empty());
1787 }
1788}