1use crate::data::client::adaptive::{observe_op, Outcome};
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{
13 Amount, MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree,
14 MidpointProof, PoolCommitment, CANDIDATES_PER_POOL, MAX_LEAVES,
15};
16use ant_protocol::payment::{serialize_merkle_proof, verify_merkle_candidate_signature};
17use ant_protocol::transport::PeerId;
18use ant_protocol::{
19 compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
20 MerkleCandidateQuoteRequest, MerkleCandidateQuoteResponse,
21};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use rand::Rng;
25use std::collections::{HashMap, VecDeque};
26use std::time::Duration;
27use tokio::sync::mpsc;
28use tracing::{debug, info, warn};
29use xor_name::XorName;
30
31pub const DEFAULT_MERKLE_THRESHOLD: usize = 64;
33
34#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum PaymentMode {
38 #[default]
40 Auto,
41 Merkle,
43 Single,
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct MerkleBatchPaymentResult {
53 pub proofs: HashMap<[u8; 32], Vec<u8>>,
55 pub chunk_count: usize,
57 pub storage_cost_atto: String,
59 pub gas_cost_wei: u128,
61 #[serde(default)]
66 pub merkle_payment_timestamp: u64,
67}
68
69pub struct PreparedMerkleBatch {
74 pub depth: u8,
76 pub pool_commitments: Vec<PoolCommitment>,
78 pub merkle_payment_timestamp: u64,
80 candidate_pools: Vec<MerklePaymentCandidatePool>,
82 tree: MerkleTree,
84 addresses: Vec<[u8; 32]>,
86}
87
88#[derive(Debug, Clone, Default)]
90pub(crate) struct MerkleUploadPlan {
91 pub already_stored: Vec<[u8; 32]>,
93 pub to_upload: Vec<[u8; 32]>,
95 to_upload_total_bytes: u64,
97}
98
99impl MerkleUploadPlan {
100 #[must_use]
102 pub fn to_upload_avg_size(&self) -> u64 {
103 if self.to_upload.is_empty() {
104 return 0;
105 }
106
107 self.to_upload_total_bytes / self.to_upload.len() as u64
108 }
109}
110
111impl std::fmt::Debug for PreparedMerkleBatch {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("PreparedMerkleBatch")
114 .field("depth", &self.depth)
115 .field("pool_commitments", &self.pool_commitments.len())
116 .field("merkle_payment_timestamp", &self.merkle_payment_timestamp)
117 .field("candidate_pools", &self.candidate_pools.len())
118 .field("addresses", &self.addresses.len())
119 .finish()
120 }
121}
122
123pub(crate) fn chunk_contents_for_upload_addresses(
128 chunk_contents: Vec<Bytes>,
129 addresses: &[[u8; 32]],
130) -> Result<Vec<Bytes>> {
131 if addresses.is_empty() {
132 return Ok(Vec::new());
133 }
134
135 let mut needed_by_address: HashMap<[u8; 32], usize> = HashMap::new();
136 for address in addresses {
137 *needed_by_address.entry(*address).or_default() += 1;
138 }
139
140 let mut chunks_by_address: HashMap<[u8; 32], VecDeque<Bytes>> =
141 HashMap::with_capacity(needed_by_address.len());
142 let mut remaining = addresses.len();
143 for chunk in chunk_contents {
144 let address = compute_address(&chunk);
145 if let Some(needed) = needed_by_address.get_mut(&address) {
146 if *needed > 0 {
147 chunks_by_address
148 .entry(address)
149 .or_default()
150 .push_back(chunk);
151 *needed -= 1;
152 remaining -= 1;
153 if remaining == 0 {
154 break;
155 }
156 }
157 }
158 }
159
160 for (address, needed) in &needed_by_address {
161 if *needed == 0 {
162 continue;
163 }
164
165 if chunks_by_address.contains_key(address) {
166 return Err(Error::InvalidData(format!(
167 "missing duplicate chunk content for merkle address {}",
168 hex::encode(address)
169 )));
170 }
171
172 return Err(Error::InvalidData(format!(
173 "missing chunk content for merkle address {}",
174 hex::encode(address)
175 )));
176 }
177
178 let mut selected = Vec::with_capacity(addresses.len());
179 for address in addresses {
180 let chunks = chunks_by_address.get_mut(address).ok_or_else(|| {
181 Error::InvalidData(format!(
182 "missing chunk content for merkle address {}",
183 hex::encode(address)
184 ))
185 })?;
186 let chunk = chunks.pop_front().ok_or_else(|| {
187 Error::InvalidData(format!(
188 "missing duplicate chunk content for merkle address {}",
189 hex::encode(address)
190 ))
191 })?;
192 selected.push(chunk);
193 }
194
195 Ok(selected)
196}
197
198fn preflight_stored_status<T>(result: Result<T>) -> Result<bool> {
217 match result {
218 Ok(_) => Ok(false),
219 Err(Error::AlreadyStored) => Ok(true),
220 Err(e) if matches!(classify_error(&e), Outcome::Timeout | Outcome::NetworkError) => {
221 Ok(false)
222 }
223 Err(e) => Err(e),
224 }
225}
226
227#[must_use]
230pub fn should_use_merkle(chunk_count: usize, mode: PaymentMode) -> bool {
231 match mode {
232 PaymentMode::Auto => chunk_count >= DEFAULT_MERKLE_THRESHOLD,
233 PaymentMode::Merkle => chunk_count >= 2,
234 PaymentMode::Single => false,
235 }
236}
237
238impl Client {
239 #[must_use]
241 pub fn should_use_merkle(&self, chunk_count: usize, mode: PaymentMode) -> bool {
242 should_use_merkle(chunk_count, mode)
243 }
244
245 pub async fn pay_for_merkle_batch(
259 &self,
260 addresses: &[[u8; 32]],
261 data_type: u32,
262 data_size: u64,
263 ) -> Result<MerkleBatchPaymentResult> {
264 let chunk_count = addresses.len();
265 if chunk_count < 2 {
266 return Err(Error::Payment(
267 "Merkle batch payment requires at least 2 chunks".to_string(),
268 ));
269 }
270
271 if chunk_count > MAX_LEAVES {
272 return self
273 .pay_for_merkle_multi_batch(addresses, data_type, data_size)
274 .await;
275 }
276
277 self.pay_for_merkle_single_batch(addresses, data_type, data_size)
278 .await
279 }
280
281 pub(crate) async fn plan_merkle_upload(
290 &self,
291 chunks: Vec<([u8; 32], u64)>,
292 data_type: u32,
293 progress: Option<&mpsc::Sender<UploadEvent>>,
294 ) -> Result<MerkleUploadPlan> {
295 let total_chunks = chunks.len();
296 if total_chunks == 0 {
297 return Ok(MerkleUploadPlan::default());
298 }
299
300 info!("Checking {total_chunks} merkle chunks for existing storage before payment");
301
302 let quote_limiter = self.controller().quote.clone();
303 let quote_concurrency = quote_limiter.current().min(total_chunks.max(1));
304 let mut check_stream = stream::iter(chunks.into_iter().enumerate())
305 .map(|(index, (address, data_size))| {
306 let limiter = quote_limiter.clone();
307 async move {
308 let result = observe_op(
309 &limiter,
310 || async move {
311 self.chunk_already_stored_for_merkle(&address, data_type, data_size)
312 .await
313 },
314 classify_error,
315 )
316 .await;
317 (index, address, data_size, result)
318 }
319 })
320 .buffer_unordered(quote_concurrency);
321
322 let mut already_stored: Vec<(usize, [u8; 32])> = Vec::new();
323 let mut to_upload: Vec<(usize, [u8; 32], u64)> = Vec::new();
324 let mut checked = 0usize;
325
326 while let Some((index, address, data_size, result)) = check_stream.next().await {
327 let is_already_stored = result?;
328 checked += 1;
329
330 if let Some(tx) = progress {
331 let _ = tx.try_send(UploadEvent::ChunkQuoted {
332 quoted: checked,
333 total: total_chunks,
334 });
335 }
336
337 if is_already_stored {
338 debug!(
339 "Merkle preflight {checked}/{total_chunks}: chunk {} already stored",
340 hex::encode(address)
341 );
342 already_stored.push((index, address));
343 if let Some(tx) = progress {
344 let _ = tx.try_send(UploadEvent::ChunkStored {
345 stored: already_stored.len(),
346 total: total_chunks,
347 });
348 }
349 } else {
350 debug!(
351 "Merkle preflight {checked}/{total_chunks}: chunk {} needs upload",
352 hex::encode(address)
353 );
354 to_upload.push((index, address, data_size));
355 }
356 }
357
358 already_stored.sort_by_key(|(index, _)| *index);
359 to_upload.sort_by_key(|(index, _, _)| *index);
360
361 let to_upload_total_bytes = to_upload.iter().fold(0u64, |acc, (_, _, data_size)| {
362 acc.saturating_add(*data_size)
363 });
364
365 let already_stored = already_stored
366 .into_iter()
367 .map(|(_, address)| address)
368 .collect::<Vec<_>>();
369 let to_upload = to_upload
370 .into_iter()
371 .map(|(_, address, _)| address)
372 .collect::<Vec<_>>();
373
374 info!(
375 "Merkle preflight complete: {} already stored, {} need upload",
376 already_stored.len(),
377 to_upload.len()
378 );
379
380 Ok(MerkleUploadPlan {
381 already_stored,
382 to_upload,
383 to_upload_total_bytes,
384 })
385 }
386
387 async fn chunk_already_stored_for_merkle(
388 &self,
389 address: &[u8; 32],
390 data_type: u32,
391 data_size: u64,
392 ) -> Result<bool> {
393 let result = self
394 .get_store_quotes_with_fault_tolerance(address, data_size, data_type)
395 .await;
396 if let Err(e) = &result {
397 if matches!(classify_error(e), Outcome::Timeout | Outcome::NetworkError) {
398 debug!(
399 "Merkle preflight: could not determine stored status for {} ({e}); \
400 treating as not stored and queuing for upload",
401 hex::encode(address)
402 );
403 }
404 }
405 preflight_stored_status(result)
406 }
407
408 pub async fn prepare_merkle_batch_external(
414 &self,
415 addresses: &[[u8; 32]],
416 data_type: u32,
417 data_size: u64,
418 ) -> Result<PreparedMerkleBatch> {
419 let chunk_count = addresses.len();
420 let xornames: Vec<XorName> = addresses.iter().map(|a| XorName(*a)).collect();
421
422 debug!("Building merkle tree for {chunk_count} chunks");
423
424 let tree = MerkleTree::from_xornames(xornames)
426 .map_err(|e| Error::Payment(format!("Failed to build merkle tree: {e}")))?;
427
428 let depth = tree.depth();
429 let merkle_payment_timestamp = std::time::SystemTime::now()
430 .duration_since(std::time::UNIX_EPOCH)
431 .map_err(|e| Error::Payment(format!("System time error: {e}")))?
432 .as_secs();
433
434 debug!("Merkle tree: depth={depth}, leaves={chunk_count}, ts={merkle_payment_timestamp}");
435
436 let midpoint_proofs = tree
438 .reward_candidates(merkle_payment_timestamp)
439 .map_err(|e| Error::Payment(format!("Failed to generate reward candidates: {e}")))?;
440
441 debug!(
442 "Collecting candidate pools from {} midpoints (concurrent)",
443 midpoint_proofs.len()
444 );
445
446 let candidate_pools = self
448 .build_candidate_pools(
449 &midpoint_proofs,
450 data_type,
451 data_size,
452 merkle_payment_timestamp,
453 )
454 .await?;
455
456 let pool_commitments: Vec<PoolCommitment> = candidate_pools
458 .iter()
459 .map(MerklePaymentCandidatePool::to_commitment)
460 .collect();
461
462 Ok(PreparedMerkleBatch {
463 depth,
464 pool_commitments,
465 merkle_payment_timestamp,
466 candidate_pools,
467 tree,
468 addresses: addresses.to_vec(),
469 })
470 }
471
472 async fn pay_for_merkle_single_batch(
474 &self,
475 addresses: &[[u8; 32]],
476 data_type: u32,
477 data_size: u64,
478 ) -> Result<MerkleBatchPaymentResult> {
479 let wallet = self.require_wallet()?;
480 let prepared = self
481 .prepare_merkle_batch_external(addresses, data_type, data_size)
482 .await?;
483
484 info!(
485 "Submitting merkle batch payment on-chain (depth={})",
486 prepared.depth
487 );
488 let (winner_pool_hash, amount, gas_info) = wallet
489 .pay_for_merkle_tree(
490 prepared.depth,
491 prepared.pool_commitments.clone(),
492 prepared.merkle_payment_timestamp,
493 )
494 .await
495 .map_err(|e| Error::Payment(format!("Merkle batch payment failed: {e}")))?;
496
497 info!(
498 "Merkle payment succeeded: winner pool {}",
499 hex::encode(winner_pool_hash)
500 );
501
502 let mut result = finalize_merkle_batch(prepared, winner_pool_hash)?;
503 result.storage_cost_atto = amount.to_string();
504 result.gas_cost_wei = gas_info.gas_cost_wei;
505 Ok(result)
506 }
507
508 async fn pay_for_merkle_multi_batch(
510 &self,
511 addresses: &[[u8; 32]],
512 data_type: u32,
513 data_size: u64,
514 ) -> Result<MerkleBatchPaymentResult> {
515 let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect();
516 let total_sub_batches = sub_batches.len();
517 let mut all_proofs = HashMap::with_capacity(addresses.len());
518 let mut total_storage = Amount::ZERO;
519 let mut total_gas: u128 = 0;
520 let mut oldest_ts: u64 = 0;
524
525 for (i, chunk) in sub_batches.into_iter().enumerate() {
526 match self
527 .pay_for_merkle_single_batch(chunk, data_type, data_size)
528 .await
529 {
530 Ok(sub_result) => {
531 if let Ok(cost) = sub_result.storage_cost_atto.parse::<Amount>() {
532 total_storage += cost;
533 }
534 total_gas = total_gas.saturating_add(sub_result.gas_cost_wei);
535 if oldest_ts == 0
536 || (sub_result.merkle_payment_timestamp > 0
537 && sub_result.merkle_payment_timestamp < oldest_ts)
538 {
539 oldest_ts = sub_result.merkle_payment_timestamp;
540 }
541 all_proofs.extend(sub_result.proofs);
542 }
543 Err(e) => {
544 if all_proofs.is_empty() {
545 return Err(e);
547 }
548 warn!(
550 "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \
551 Returning {} proofs from prior sub-batches",
552 i + 1,
553 all_proofs.len()
554 );
555 return Ok(MerkleBatchPaymentResult {
556 chunk_count: all_proofs.len(),
557 proofs: all_proofs,
558 storage_cost_atto: total_storage.to_string(),
559 gas_cost_wei: total_gas,
560 merkle_payment_timestamp: oldest_ts,
561 });
562 }
563 }
564 }
565
566 Ok(MerkleBatchPaymentResult {
567 chunk_count: addresses.len(),
568 proofs: all_proofs,
569 storage_cost_atto: total_storage.to_string(),
570 gas_cost_wei: total_gas,
571 merkle_payment_timestamp: oldest_ts,
572 })
573 }
574
575 async fn build_candidate_pools(
577 &self,
578 midpoint_proofs: &[MidpointProof],
579 data_type: u32,
580 data_size: u64,
581 merkle_payment_timestamp: u64,
582 ) -> Result<Vec<MerklePaymentCandidatePool>> {
583 let mut pool_futures = FuturesUnordered::new();
584
585 for midpoint_proof in midpoint_proofs {
586 let pool_address = midpoint_proof.address();
587 let mp = midpoint_proof.clone();
588 pool_futures.push(async move {
589 let candidate_nodes = self
590 .get_merkle_candidate_pool(
591 &pool_address.0,
592 data_type,
593 data_size,
594 merkle_payment_timestamp,
595 )
596 .await?;
597 Ok::<_, Error>(MerklePaymentCandidatePool {
598 midpoint_proof: mp,
599 candidate_nodes,
600 })
601 });
602 }
603
604 let mut pools = Vec::with_capacity(midpoint_proofs.len());
605 while let Some(result) = pool_futures.next().await {
606 pools.push(result?);
607 }
608
609 Ok(pools)
610 }
611
612 #[allow(clippy::too_many_lines)]
614 async fn get_merkle_candidate_pool(
615 &self,
616 address: &[u8; 32],
617 data_type: u32,
618 data_size: u64,
619 merkle_payment_timestamp: u64,
620 ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
621 let node = self.network().node();
622 let timeout = Duration::from_secs(self.config().quote_timeout_secs);
623
624 let query_count = CANDIDATES_PER_POOL * 2;
626 let mut remote_peers = self
627 .network()
628 .find_closest_peers(address, query_count)
629 .await?;
630
631 if remote_peers.len() < CANDIDATES_PER_POOL {
635 let connected = self.network().connected_peers().await;
636 for peer in connected {
637 if !remote_peers.iter().any(|(id, _)| *id == peer) {
638 remote_peers.push((peer, vec![]));
639 }
640 }
641 }
642
643 if remote_peers.len() < CANDIDATES_PER_POOL {
644 return Err(Error::InsufficientPeers(format!(
645 "Found {} peers, need {CANDIDATES_PER_POOL} for merkle candidate pool. \
646 Use --no-merkle or a larger network.",
647 remote_peers.len()
648 )));
649 }
650
651 let mut candidate_futures = FuturesUnordered::new();
652
653 for (peer_id, peer_addrs) in &remote_peers {
654 let request_id = self.next_request_id();
655 let request = MerkleCandidateQuoteRequest {
656 address: *address,
657 data_type,
658 data_size,
659 merkle_payment_timestamp,
660 };
661 let message = ChunkMessage {
662 request_id,
663 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
664 };
665
666 let message_bytes = match message.encode() {
667 Ok(bytes) => bytes,
668 Err(e) => {
669 warn!("Failed to encode merkle candidate request for {peer_id}: {e}");
670 continue;
671 }
672 };
673
674 let peer_id_clone = *peer_id;
675 let addrs_clone = peer_addrs.clone();
676 let node_clone = node.clone();
677
678 let fut = async move {
679 let result = send_and_await_chunk_response(
680 &node_clone,
681 &peer_id_clone,
682 message_bytes,
683 request_id,
684 timeout,
685 &addrs_clone,
686 |body| match body {
687 ChunkMessageBody::MerkleCandidateQuoteResponse(
688 MerkleCandidateQuoteResponse::Success { candidate_node },
689 ) => {
690 match rmp_serde::from_slice::<MerklePaymentCandidateNode>(
691 &candidate_node,
692 ) {
693 Ok(node) => Some(Ok(node)),
694 Err(e) => Some(Err(Error::Serialization(format!(
695 "Failed to deserialize candidate node from {peer_id_clone}: {e}"
696 )))),
697 }
698 }
699 ChunkMessageBody::MerkleCandidateQuoteResponse(
700 MerkleCandidateQuoteResponse::Error(e),
701 ) => Some(Err(Error::Protocol(format!(
702 "Merkle quote error from {peer_id_clone}: {e}"
703 )))),
704 _ => None,
705 },
706 |e| {
707 Error::Network(format!(
708 "Failed to send merkle candidate request to {peer_id_clone}: {e}"
709 ))
710 },
711 || {
712 Error::Timeout(format!(
713 "Timeout waiting for merkle candidate from {peer_id_clone}"
714 ))
715 },
716 )
717 .await;
718
719 (peer_id_clone, result)
720 };
721
722 candidate_futures.push(fut);
723 }
724
725 self.collect_validated_candidates(&mut candidate_futures, address, merkle_payment_timestamp)
726 .await
727 }
728
729 async fn collect_validated_candidates(
740 &self,
741 futures: &mut FuturesUnordered<
742 impl std::future::Future<
743 Output = (
744 PeerId,
745 std::result::Result<MerklePaymentCandidateNode, Error>,
746 ),
747 >,
748 >,
749 target_address: &[u8; 32],
750 merkle_payment_timestamp: u64,
751 ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
752 let mut valid: Vec<(PeerId, MerklePaymentCandidateNode)> = Vec::new();
753 let mut failures: Vec<String> = Vec::new();
754
755 while let Some((peer_id, result)) = futures.next().await {
756 match result {
757 Ok(candidate) => {
758 if !verify_merkle_candidate_signature(&candidate) {
759 warn!("Invalid ML-DSA-65 signature from merkle candidate {peer_id}");
760 failures.push(format!("{peer_id}: invalid signature"));
761 continue;
762 }
763 if candidate.merkle_payment_timestamp != merkle_payment_timestamp {
764 warn!("Timestamp mismatch from merkle candidate {peer_id}");
765 failures.push(format!("{peer_id}: timestamp mismatch"));
766 continue;
767 }
768 valid.push((peer_id, candidate));
769 }
770 Err(e) => {
771 debug!("Failed to get merkle candidate from {peer_id}: {e}");
772 failures.push(format!("{peer_id}: {e}"));
773 }
774 }
775 }
776
777 if valid.len() < CANDIDATES_PER_POOL {
778 return Err(Error::InsufficientPeers(format!(
779 "Got {} merkle candidates, need {CANDIDATES_PER_POOL}. Failures: [{}]",
780 valid.len(),
781 failures.join("; ")
782 )));
783 }
784
785 let target_peer = PeerId::from_bytes(*target_address);
786 valid.sort_by_key(|(peer_id, _)| peer_id.xor_distance(&target_peer));
787
788 let candidates: Vec<MerklePaymentCandidateNode> = valid
789 .into_iter()
790 .take(CANDIDATES_PER_POOL)
791 .map(|(_, candidate)| candidate)
792 .collect();
793
794 candidates
795 .try_into()
796 .map_err(|_| Error::Payment("Failed to convert candidates to fixed array".to_string()))
797 }
798
799 pub(crate) async fn merkle_upload_chunks(
820 &self,
821 chunk_contents: Vec<Bytes>,
822 addresses: Vec<[u8; 32]>,
823 batch_result: &MerkleBatchPaymentResult,
824 progress: Option<&mpsc::Sender<UploadEvent>>,
825 stored_offset: usize,
826 total_chunks: usize,
827 ) -> Result<MerkleStoreOutcome> {
828 let store_limiter = self.controller().store.clone();
829 let batch_size = chunk_contents.len();
832 if batch_size != addresses.len() {
833 return Err(Error::InvalidData(format!(
834 "merkle upload has {batch_size} chunk contents but {} addresses",
835 addresses.len()
836 )));
837 }
838 let store_concurrency = store_limiter.current().min(batch_size.max(1));
839
840 let chunks: Vec<([u8; 32], Bytes)> = addresses.into_iter().zip(chunk_contents).collect();
841
842 let store_one = |addr: [u8; 32], content: Bytes| {
847 let limiter = store_limiter.clone();
848 let proof_bytes = batch_result.proofs.get(&addr).cloned();
849 async move {
850 let started = std::time::Instant::now();
851 let proof = proof_bytes.ok_or_else(|| {
852 Error::Payment(format!(
853 "Missing merkle proof for chunk {}",
854 hex::encode(addr)
855 ))
856 })?;
857 let peers = self.close_group_peers(&addr).await?;
858 observe_op(
859 &limiter,
860 || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
861 classify_error,
862 )
863 .await
864 .map(|_| started)
865 }
866 };
867
868 let outcome = merkle_store_with_retry(
869 chunks,
870 store_concurrency,
871 MERKLE_STORE_MAX_ATTEMPTS,
872 MERKLE_RETRY_BACKOFF,
873 progress,
874 stored_offset,
875 total_chunks,
876 store_one,
877 )
878 .await?;
879
880 if let Some(e) = outcome.fatal {
886 return Err(e);
887 }
888 Ok(outcome)
889 }
890}
891
892pub(crate) const MERKLE_STORE_MAX_ATTEMPTS: usize = 4;
904
905pub(crate) const MERKLE_RETRY_BACKOFF: Duration = Duration::from_secs(30);
911
912const MERKLE_RETRY_JITTER: f64 = 0.1;
915
916#[derive(Debug, Default)]
919pub(crate) struct MerkleStoreOutcome {
920 pub stored: usize,
923 pub stored_addresses: Vec<[u8; 32]>,
930 pub failed: usize,
932 pub failed_addresses: Vec<([u8; 32], String)>,
937 pub fatal: Option<Error>,
944 pub stats: crate::data::client::batch::WaveAggregateStats,
946}
947
948#[allow(clippy::too_many_arguments)]
967pub(crate) async fn merkle_store_with_retry<F, Fut>(
968 chunks: Vec<([u8; 32], Bytes)>,
969 store_concurrency: usize,
970 max_attempts: usize,
971 backoff: Duration,
972 progress: Option<&mpsc::Sender<UploadEvent>>,
973 stored_offset: usize,
974 total: usize,
975 store_one: F,
976) -> Result<MerkleStoreOutcome>
977where
978 F: Fn([u8; 32], Bytes) -> Fut,
979 Fut: std::future::Future<Output = Result<std::time::Instant>>,
980{
981 let attempts = max_attempts.max(1);
982 let mut outcome = MerkleStoreOutcome {
983 stored: stored_offset,
984 ..MerkleStoreOutcome::default()
985 };
986 let mut pending = chunks;
987
988 for attempt in 0..attempts {
989 let concurrency = store_concurrency.min(pending.len().max(1)).max(1);
990 let mut next_failed: Vec<([u8; 32], Bytes, String)> = Vec::new();
994
995 let mut upload_stream = stream::iter(pending.into_iter().map(|(addr, content)| {
996 let fut = store_one(addr, content.clone());
997 async move { (addr, content, fut.await) }
998 }))
999 .buffer_unordered(concurrency);
1000
1001 while let Some((addr, content, result)) = upload_stream.next().await {
1002 outcome.stats.chunk_attempts_total =
1003 outcome.stats.chunk_attempts_total.saturating_add(1);
1004 match result {
1005 Ok(started) => {
1006 let duration_ms =
1007 u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
1008 outcome.stats.store_durations_ms.push(duration_ms);
1009 let idx = attempt.min(outcome.stats.retries_histogram.len().saturating_sub(1));
1010 outcome.stats.retries_histogram[idx] =
1011 outcome.stats.retries_histogram[idx].saturating_add(1);
1012 outcome.stored += 1;
1013 outcome.stored_addresses.push(addr);
1014 if let Some(tx) = progress {
1015 let _ = tx.try_send(UploadEvent::ChunkStored {
1016 stored: outcome.stored,
1017 total,
1018 });
1019 }
1020 }
1021 Err(e @ (Error::InsufficientPeers(_) | Error::RemotePut { .. })) => {
1027 next_failed.push((addr, content, e.to_string()));
1028 }
1029 Err(e) => {
1030 next_failed.push((addr, content, e.to_string()));
1038 outcome.fatal = Some(e);
1039 break;
1040 }
1041 }
1042 }
1043
1044 if outcome.fatal.is_some() {
1045 outcome.failed = next_failed.len();
1046 outcome.failed_addresses = next_failed
1047 .into_iter()
1048 .map(|(addr, _content, msg)| (addr, msg))
1049 .collect();
1050 return Ok(outcome);
1051 }
1052
1053 if next_failed.is_empty() {
1054 break;
1055 }
1056
1057 if attempt + 1 < attempts {
1058 warn!(
1059 failed = next_failed.len(),
1060 attempt = attempt + 1,
1061 "merkle chunks short of quorum, retrying after backoff"
1062 );
1063 pending = next_failed
1064 .into_iter()
1065 .map(|(addr, content, _msg)| (addr, content))
1066 .collect();
1067 if backoff > Duration::ZERO {
1068 let wait = {
1073 let mut rng = rand::thread_rng();
1074 let factor = 1.0 + rng.gen_range(-MERKLE_RETRY_JITTER..=MERKLE_RETRY_JITTER);
1075 backoff.mul_f64(factor)
1076 };
1077 tokio::time::sleep(wait).await;
1078 }
1079 } else {
1080 outcome.failed = next_failed.len();
1081 outcome.failed_addresses = next_failed
1082 .into_iter()
1083 .map(|(addr, _content, msg)| (addr, msg))
1084 .collect();
1085 break;
1086 }
1087 }
1088
1089 Ok(outcome)
1090}
1091
1092pub(crate) const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
1101
1102pub(crate) fn deferred_round_histogram_slot(round: usize, hist_len: usize) -> usize {
1109 (round + 1).min(hist_len.saturating_sub(1))
1110}
1111
1112#[derive(Debug, Default)]
1114pub(crate) struct DeferredRetryOutcome {
1115 pub stored: usize,
1119 pub stored_addresses: Vec<[u8; 32]>,
1122 pub failed: usize,
1124 pub failed_addresses: Vec<([u8; 32], String)>,
1128 pub fatal: Option<String>,
1132 pub stats: crate::data::client::batch::WaveAggregateStats,
1135}
1136
1137#[allow(clippy::too_many_arguments)]
1157pub(crate) async fn merkle_deferred_retry<RB, CF, SF, Fut>(
1158 deferred: Vec<([u8; 32], String)>,
1159 round_delays_secs: &[u64],
1160 batch_size: usize,
1161 read_bodies: RB,
1162 concurrency_for: CF,
1163 progress: Option<&mpsc::Sender<UploadEvent>>,
1164 stored_offset: usize,
1165 total: usize,
1166 store_one: SF,
1167) -> Result<DeferredRetryOutcome>
1168where
1169 RB: Fn(&[[u8; 32]]) -> Result<Vec<([u8; 32], Bytes)>>,
1170 CF: Fn(usize) -> usize,
1171 SF: Fn([u8; 32], Bytes) -> Fut,
1172 Fut: std::future::Future<Output = Result<std::time::Instant>>,
1173{
1174 let batch_size = batch_size.max(1);
1175 let mut outcome = DeferredRetryOutcome {
1176 stored: stored_offset,
1177 ..DeferredRetryOutcome::default()
1178 };
1179 let mut remaining = deferred;
1180 let rounds = round_delays_secs.len();
1181
1182 for (round, &delay_secs) in round_delays_secs.iter().enumerate() {
1183 if remaining.is_empty() {
1184 break;
1185 }
1186 if delay_secs > 0 {
1187 tokio::time::sleep(Duration::from_secs(delay_secs)).await;
1188 }
1189 info!(
1190 "Deferred merkle retry round {}/{}: {} chunk(s) short of quorum",
1191 round + 1,
1192 rounds,
1193 remaining.len(),
1194 );
1195
1196 let slot = deferred_round_histogram_slot(round, outcome.stats.retries_histogram.len());
1201 let round_input = std::mem::take(&mut remaining);
1202 let mut input_iter = round_input.into_iter();
1203
1204 loop {
1205 let batch: Vec<([u8; 32], String)> = input_iter.by_ref().take(batch_size).collect();
1206 if batch.is_empty() {
1207 break;
1208 }
1209 let batch_addrs: Vec<[u8; 32]> = batch.iter().map(|(addr, _)| *addr).collect();
1210 let chunks = read_bodies(&batch_addrs)?;
1214 let concurrency = concurrency_for(batch_addrs.len());
1215
1216 let batch_outcome = merkle_store_with_retry(
1217 chunks,
1218 concurrency,
1219 1,
1220 Duration::ZERO,
1221 progress,
1222 outcome.stored,
1223 total,
1224 &store_one,
1225 )
1226 .await?;
1227
1228 outcome.stored = batch_outcome.stored;
1229 outcome
1230 .stored_addresses
1231 .extend(batch_outcome.stored_addresses);
1232
1233 outcome.stats.chunk_attempts_total = outcome
1235 .stats
1236 .chunk_attempts_total
1237 .saturating_add(batch_outcome.stats.chunk_attempts_total);
1238 outcome
1239 .stats
1240 .store_durations_ms
1241 .extend(batch_outcome.stats.store_durations_ms);
1242 let landed: usize = batch_outcome.stats.retries_histogram.iter().sum();
1243 outcome.stats.retries_histogram[slot] =
1244 outcome.stats.retries_histogram[slot].saturating_add(landed);
1245
1246 if let Some(fatal) = batch_outcome.fatal {
1247 outcome.fatal = Some(fatal.to_string());
1252 let mut failed = batch_outcome.failed_addresses;
1253 failed.extend(input_iter);
1254 failed.extend(std::mem::take(&mut remaining));
1255 outcome.failed = failed.len();
1256 outcome.failed_addresses = failed;
1257 return Ok(outcome);
1258 }
1259
1260 remaining.extend(batch_outcome.failed_addresses);
1262 }
1263 }
1264
1265 outcome.failed = remaining.len();
1266 outcome.failed_addresses = remaining;
1267 Ok(outcome)
1268}
1269
1270pub fn finalize_merkle_batch(
1275 prepared: PreparedMerkleBatch,
1276 winner_pool_hash: [u8; 32],
1277) -> Result<MerkleBatchPaymentResult> {
1278 let chunk_count = prepared.addresses.len();
1279 let xornames: Vec<XorName> = prepared.addresses.iter().map(|a| XorName(*a)).collect();
1280
1281 let winner_pool = prepared
1283 .candidate_pools
1284 .iter()
1285 .find(|pool| pool.hash() == winner_pool_hash)
1286 .ok_or_else(|| {
1287 Error::Payment(format!(
1288 "Winner pool {} not found in candidate pools",
1289 hex::encode(winner_pool_hash)
1290 ))
1291 })?;
1292
1293 info!("Generating merkle proofs for {chunk_count} chunks");
1295 let mut proofs = HashMap::with_capacity(chunk_count);
1296
1297 for (i, xorname) in xornames.iter().enumerate() {
1298 let address_proof = prepared
1299 .tree
1300 .generate_address_proof(i, *xorname)
1301 .map_err(|e| {
1302 Error::Payment(format!(
1303 "Failed to generate address proof for chunk {i}: {e}"
1304 ))
1305 })?;
1306
1307 let merkle_proof = MerklePaymentProof::new(*xorname, address_proof, winner_pool.clone());
1308
1309 let tagged_bytes = serialize_merkle_proof(&merkle_proof)
1310 .map_err(|e| Error::Serialization(format!("Failed to serialize merkle proof: {e}")))?;
1311
1312 proofs.insert(prepared.addresses[i], tagged_bytes);
1313 }
1314
1315 info!("Merkle batch payment complete: {chunk_count} proofs generated");
1316
1317 Ok(MerkleBatchPaymentResult {
1318 proofs,
1319 chunk_count,
1320 storage_cost_atto: "0".to_string(),
1321 gas_cost_wei: 0,
1322 merkle_payment_timestamp: prepared.merkle_payment_timestamp,
1323 })
1324}
1325
1326#[cfg(test)]
1328mod send_assertions {
1329 use super::*;
1330 use crate::data::client::Client;
1331
1332 fn _assert_send<T: Send>(_: &T) {}
1333
1334 #[allow(
1335 dead_code,
1336 unreachable_code,
1337 unused_variables,
1338 clippy::diverging_sub_expression
1339 )]
1340 async fn _merkle_upload_chunks_is_send(client: &Client) {
1341 let batch_result: MerkleBatchPaymentResult = todo!();
1342 let fut = client.merkle_upload_chunks(Vec::new(), Vec::new(), &batch_result, None, 0, 0);
1343 _assert_send(&fut);
1344 }
1345}
1346
1347#[cfg(test)]
1348#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1349mod tests {
1350 use super::*;
1351 use ant_protocol::evm::{Amount, MerkleTree, RewardsAddress, CANDIDATES_PER_POOL};
1352
1353 #[test]
1358 fn test_auto_below_threshold() {
1359 assert!(!should_use_merkle(1, PaymentMode::Auto));
1360 assert!(!should_use_merkle(10, PaymentMode::Auto));
1361 assert!(!should_use_merkle(63, PaymentMode::Auto));
1362 }
1363
1364 #[test]
1365 fn test_auto_at_and_above_threshold() {
1366 assert!(should_use_merkle(64, PaymentMode::Auto));
1367 assert!(should_use_merkle(65, PaymentMode::Auto));
1368 assert!(should_use_merkle(1000, PaymentMode::Auto));
1369 }
1370
1371 #[test]
1372 fn test_merkle_mode_forces_at_2() {
1373 assert!(!should_use_merkle(1, PaymentMode::Merkle));
1374 assert!(should_use_merkle(2, PaymentMode::Merkle));
1375 assert!(should_use_merkle(3, PaymentMode::Merkle));
1376 }
1377
1378 #[test]
1379 fn test_single_mode_always_false() {
1380 assert!(!should_use_merkle(0, PaymentMode::Single));
1381 assert!(!should_use_merkle(64, PaymentMode::Single));
1382 assert!(!should_use_merkle(1000, PaymentMode::Single));
1383 }
1384
1385 #[test]
1386 fn test_default_mode_is_auto() {
1387 assert_eq!(PaymentMode::default(), PaymentMode::Auto);
1388 }
1389
1390 #[test]
1391 fn test_threshold_value() {
1392 assert_eq!(DEFAULT_MERKLE_THRESHOLD, 64);
1393 }
1394
1395 #[test]
1400 fn test_preflight_quotes_gathered_means_not_stored() {
1401 assert!(matches!(preflight_stored_status(Ok(())), Ok(false)));
1402 }
1403
1404 #[test]
1405 fn test_preflight_already_stored_is_stored() {
1406 let r: Result<()> = Err(Error::AlreadyStored);
1407 assert!(matches!(preflight_stored_status(r), Ok(true)));
1408 }
1409
1410 #[test]
1414 fn test_preflight_transient_quote_failure_does_not_abort() {
1415 let insufficient: Result<()> =
1417 Err(Error::InsufficientPeers("Got 5 quotes, need 7".to_string()));
1418 assert!(
1419 matches!(preflight_stored_status(insufficient), Ok(false)),
1420 "insufficient-peers during preflight must degrade to not-stored, not error"
1421 );
1422
1423 let timeout: Result<()> = Err(Error::Timeout("Timeout waiting for quote".to_string()));
1424 assert!(matches!(preflight_stored_status(timeout), Ok(false)));
1425
1426 let network: Result<()> = Err(Error::Network("connection reset".to_string()));
1427 assert!(matches!(preflight_stored_status(network), Ok(false)));
1428 }
1429
1430 #[test]
1433 fn test_preflight_application_error_propagates() {
1434 let payment: Result<()> = Err(Error::Payment("bad payment".to_string()));
1435 assert!(matches!(
1436 preflight_stored_status(payment),
1437 Err(Error::Payment(_))
1438 ));
1439 }
1440
1441 #[test]
1442 fn chunk_contents_for_upload_addresses_preserves_requested_order() {
1443 let first = Bytes::from_static(b"first");
1444 let second = Bytes::from_static(b"second");
1445 let first_addr = compute_address(&first);
1446 let second_addr = compute_address(&second);
1447
1448 let selected = chunk_contents_for_upload_addresses(
1449 vec![first.clone(), second.clone()],
1450 &[second_addr, first_addr],
1451 )
1452 .unwrap();
1453
1454 assert_eq!(selected, vec![second, first]);
1455 }
1456
1457 #[test]
1458 fn chunk_contents_for_upload_addresses_preserves_duplicate_requests() {
1459 let repeated = Bytes::from_static(b"same-content");
1460 let other = Bytes::from_static(b"other-content");
1461 let repeated_addr = compute_address(&repeated);
1462
1463 let selected = chunk_contents_for_upload_addresses(
1464 vec![repeated.clone(), other, repeated.clone()],
1465 &[repeated_addr, repeated_addr],
1466 )
1467 .unwrap();
1468
1469 assert_eq!(selected, vec![repeated.clone(), repeated]);
1470 }
1471
1472 #[test]
1473 fn chunk_contents_for_upload_addresses_ignores_unrequested_duplicates() {
1474 let requested = Bytes::from_static(b"requested-content");
1475 let unrequested = Bytes::from_static(b"unrequested-content");
1476 let requested_addr = compute_address(&requested);
1477
1478 let selected = chunk_contents_for_upload_addresses(
1479 vec![
1480 unrequested.clone(),
1481 requested.clone(),
1482 unrequested.clone(),
1483 unrequested,
1484 ],
1485 &[requested_addr],
1486 )
1487 .unwrap();
1488
1489 assert_eq!(selected, vec![requested]);
1490 }
1491
1492 #[test]
1493 fn chunk_contents_for_upload_addresses_errors_for_missing_content() {
1494 let present = Bytes::from_static(b"present-content");
1495 let missing = Bytes::from_static(b"missing-content");
1496 let missing_addr = compute_address(&missing);
1497
1498 let result = chunk_contents_for_upload_addresses(vec![present], &[missing_addr]);
1499
1500 assert!(matches!(result, Err(Error::InvalidData(_))));
1501 }
1502
1503 fn make_test_addresses(count: usize) -> Vec<[u8; 32]> {
1508 (0..count)
1509 .map(|i| {
1510 let xn = XorName::from_content(&i.to_le_bytes());
1511 xn.0
1512 })
1513 .collect()
1514 }
1515
1516 #[test]
1517 fn test_tree_depth_for_known_sizes() {
1518 let cases = [(2, 1), (4, 2), (16, 4), (100, 7), (256, 8)];
1519 for (count, expected_depth) in cases {
1520 let addrs = make_test_addresses(count);
1521 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1522 let tree = MerkleTree::from_xornames(xornames).unwrap();
1523 assert_eq!(
1524 tree.depth(),
1525 expected_depth,
1526 "depth mismatch for {count} leaves"
1527 );
1528 }
1529 }
1530
1531 #[test]
1532 fn test_proof_generation_and_verification_for_all_leaves() {
1533 let addrs = make_test_addresses(16);
1534 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1535 let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1536
1537 for (i, xn) in xornames.iter().enumerate() {
1538 let proof = tree.generate_address_proof(i, *xn).unwrap();
1539 assert!(proof.verify(), "proof for leaf {i} should verify");
1540 assert_eq!(proof.depth(), tree.depth() as usize);
1541 }
1542 }
1543
1544 #[test]
1545 fn test_proof_fails_for_wrong_address() {
1546 let addrs = make_test_addresses(8);
1547 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1548 let tree = MerkleTree::from_xornames(xornames).unwrap();
1549
1550 let wrong = XorName::from_content(b"wrong");
1551 let proof = tree.generate_address_proof(0, wrong).unwrap();
1552 assert!(!proof.verify(), "proof with wrong address should fail");
1553 }
1554
1555 #[test]
1556 fn test_tree_too_few_leaves() {
1557 let xornames = vec![XorName::from_content(b"only_one")];
1558 let result = MerkleTree::from_xornames(xornames);
1559 assert!(result.is_err());
1560 }
1561
1562 #[test]
1563 fn test_tree_at_max_leaves() {
1564 let addrs = make_test_addresses(MAX_LEAVES);
1565 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1566 let tree = MerkleTree::from_xornames(xornames).unwrap();
1567 assert_eq!(tree.leaf_count(), MAX_LEAVES);
1568 }
1569
1570 #[test]
1575 fn test_merkle_proof_serialize_deserialize_roundtrip() {
1576 use ant_protocol::evm::{Amount, MerklePaymentCandidateNode, RewardsAddress};
1577 use ant_protocol::payment::{deserialize_merkle_proof, serialize_merkle_proof};
1578
1579 let addrs = make_test_addresses(4);
1580 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1581 let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1582
1583 let timestamp = std::time::SystemTime::now()
1584 .duration_since(std::time::UNIX_EPOCH)
1585 .unwrap()
1586 .as_secs();
1587
1588 let candidates = tree.reward_candidates(timestamp).unwrap();
1589 let midpoint = candidates.first().unwrap().clone();
1590
1591 #[allow(clippy::cast_possible_truncation)]
1593 let candidate_nodes: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
1594 std::array::from_fn(|i| MerklePaymentCandidateNode {
1595 pub_key: vec![i as u8; 32],
1596 price: Amount::from(1024u64),
1597 reward_address: RewardsAddress::new([i as u8; 20]),
1598 merkle_payment_timestamp: timestamp,
1599 signature: vec![i as u8; 64],
1600 });
1601
1602 let pool = MerklePaymentCandidatePool {
1603 midpoint_proof: midpoint,
1604 candidate_nodes,
1605 };
1606
1607 let address_proof = tree.generate_address_proof(0, xornames[0]).unwrap();
1608 let merkle_proof = MerklePaymentProof::new(xornames[0], address_proof, pool);
1609
1610 let tagged = serialize_merkle_proof(&merkle_proof).unwrap();
1611 assert_eq!(
1612 tagged.first().copied(),
1613 Some(0x02),
1614 "tag should be PROOF_TAG_MERKLE"
1615 );
1616
1617 let deserialized = deserialize_merkle_proof(&tagged).unwrap();
1618 assert_eq!(deserialized.address, merkle_proof.address);
1619 assert_eq!(
1620 deserialized.winner_pool.candidate_nodes.len(),
1621 CANDIDATES_PER_POOL
1622 );
1623 }
1624
1625 #[test]
1630 fn test_candidate_wrong_timestamp_rejected() {
1631 let candidate = MerklePaymentCandidateNode {
1633 pub_key: vec![0u8; 32],
1634 price: ant_protocol::evm::Amount::ZERO,
1635 reward_address: ant_protocol::evm::RewardsAddress::new([0u8; 20]),
1636 merkle_payment_timestamp: 1000,
1637 signature: vec![0u8; 64],
1638 };
1639
1640 assert_ne!(candidate.merkle_payment_timestamp, 2000);
1642 }
1643
1644 fn make_dummy_candidate_nodes(
1649 timestamp: u64,
1650 ) -> [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] {
1651 std::array::from_fn(|i| MerklePaymentCandidateNode {
1652 pub_key: vec![i as u8; 32],
1653 price: Amount::from(1024u64),
1654 reward_address: RewardsAddress::new([i as u8; 20]),
1655 merkle_payment_timestamp: timestamp,
1656 signature: vec![i as u8; 64],
1657 })
1658 }
1659
1660 fn make_prepared_merkle_batch(count: usize) -> PreparedMerkleBatch {
1661 let addrs = make_test_addresses(count);
1662 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1663 let tree = MerkleTree::from_xornames(xornames).unwrap();
1664
1665 let timestamp = std::time::SystemTime::now()
1666 .duration_since(std::time::UNIX_EPOCH)
1667 .unwrap()
1668 .as_secs();
1669
1670 let midpoints = tree.reward_candidates(timestamp).unwrap();
1671
1672 let candidate_pools: Vec<MerklePaymentCandidatePool> = midpoints
1673 .into_iter()
1674 .map(|mp| MerklePaymentCandidatePool {
1675 midpoint_proof: mp,
1676 candidate_nodes: make_dummy_candidate_nodes(timestamp),
1677 })
1678 .collect();
1679
1680 let pool_commitments = candidate_pools
1681 .iter()
1682 .map(MerklePaymentCandidatePool::to_commitment)
1683 .collect();
1684
1685 PreparedMerkleBatch {
1686 depth: tree.depth(),
1687 pool_commitments,
1688 merkle_payment_timestamp: timestamp,
1689 candidate_pools,
1690 tree,
1691 addresses: addrs,
1692 }
1693 }
1694
1695 #[test]
1696 fn test_finalize_merkle_batch_with_valid_winner() {
1697 let prepared = make_prepared_merkle_batch(4);
1698 let winner_hash = prepared.candidate_pools[0].hash();
1699
1700 let result = finalize_merkle_batch(prepared, winner_hash);
1701 assert!(
1702 result.is_ok(),
1703 "should succeed with valid winner: {result:?}"
1704 );
1705
1706 let batch = result.unwrap();
1707 assert_eq!(batch.chunk_count, 4);
1708 assert_eq!(batch.proofs.len(), 4);
1709
1710 for proof_bytes in batch.proofs.values() {
1712 assert!(!proof_bytes.is_empty());
1713 }
1714 }
1715
1716 #[test]
1717 fn test_finalize_merkle_batch_with_invalid_winner() {
1718 let prepared = make_prepared_merkle_batch(4);
1719 let bad_hash = [0xFF; 32];
1720
1721 let result = finalize_merkle_batch(prepared, bad_hash);
1722 assert!(result.is_err());
1723 let err = result.unwrap_err().to_string();
1724 assert!(err.contains("not found in candidate pools"), "got: {err}");
1725 }
1726
1727 #[test]
1728 fn test_finalize_merkle_batch_proofs_are_deserializable() {
1729 use ant_protocol::payment::deserialize_merkle_proof;
1730
1731 let prepared = make_prepared_merkle_batch(8);
1732 let winner_hash = prepared.candidate_pools[0].hash();
1733
1734 let batch = finalize_merkle_batch(prepared, winner_hash).unwrap();
1735
1736 for (addr, proof_bytes) in &batch.proofs {
1737 let proof = deserialize_merkle_proof(proof_bytes);
1738 assert!(
1739 proof.is_ok(),
1740 "proof for {} should deserialize: {:?}",
1741 hex::encode(addr),
1742 proof.err()
1743 );
1744 }
1745 }
1746
1747 #[test]
1752 fn test_batch_split_calculation() {
1753 let addrs = make_test_addresses(MAX_LEAVES);
1755 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 1);
1756
1757 let addrs = make_test_addresses(MAX_LEAVES + 1);
1759 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 2);
1760
1761 let addrs = make_test_addresses(3 * MAX_LEAVES);
1763 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 3);
1764 }
1765
1766 use std::sync::{Arc, Mutex};
1771
1772 fn make_chunks(count: usize) -> Vec<([u8; 32], Bytes)> {
1774 make_test_addresses(count)
1775 .into_iter()
1776 .map(|addr| (addr, Bytes::from_static(b"chunk")))
1777 .collect()
1778 }
1779
1780 #[tokio::test]
1784 async fn store_with_retry_collects_failures_instead_of_aborting() {
1785 let chunks = make_chunks(6);
1786 let failing: std::collections::HashSet<[u8; 32]> =
1787 chunks.iter().take(2).map(|(a, _)| *a).collect();
1788 let failing_for_closure = failing.clone();
1789
1790 let store_one = move |addr: [u8; 32], _content: Bytes| {
1791 let fail = failing_for_closure.contains(&addr);
1792 async move {
1793 if fail {
1794 Err(Error::InsufficientPeers("test shortfall".into()))
1795 } else {
1796 Ok(std::time::Instant::now())
1797 }
1798 }
1799 };
1800
1801 let outcome = merkle_store_with_retry(chunks, 8, 1, Duration::ZERO, None, 0, 6, store_one)
1802 .await
1803 .expect("quorum shortfalls must not abort the batch");
1804
1805 assert_eq!(outcome.stored, 4);
1806 assert_eq!(outcome.failed, 2);
1807 assert_eq!(outcome.stats.retries_histogram[0], 4);
1809 assert_eq!(outcome.stats.chunk_attempts_total, 6);
1810 }
1811
1812 #[tokio::test]
1817 async fn store_with_retry_treats_remote_put_as_recoverable() {
1818 let chunks = make_chunks(6);
1819 let failing: std::collections::HashSet<[u8; 32]> =
1820 chunks.iter().take(2).map(|(a, _)| *a).collect();
1821 let failing_for_closure = failing.clone();
1822
1823 let store_one = move |addr: [u8; 32], _content: Bytes| {
1824 let fail = failing_for_closure.contains(&addr);
1825 async move {
1826 if fail {
1827 Err(Error::RemotePut {
1828 address: hex::encode(addr),
1829 source: ant_protocol::ProtocolError::StorageFailed(
1830 "insufficient disk space".into(),
1831 ),
1832 })
1833 } else {
1834 Ok(std::time::Instant::now())
1835 }
1836 }
1837 };
1838
1839 let outcome = merkle_store_with_retry(chunks, 8, 1, Duration::ZERO, None, 0, 6, store_one)
1840 .await
1841 .expect("remote app-rejections must not abort the batch");
1842
1843 assert_eq!(outcome.stored, 4);
1844 assert_eq!(outcome.failed, 2);
1845 }
1846
1847 #[tokio::test]
1851 async fn store_with_retry_reports_non_quorum_errors_as_fatal() {
1852 let chunks = make_chunks(3);
1853 let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1854 Err::<std::time::Instant, _>(Error::Payment("missing proof".into()))
1855 };
1856
1857 let outcome = merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, 3, store_one)
1858 .await
1859 .expect("fatal is carried in the outcome, not returned as Err");
1860 assert!(matches!(outcome.fatal, Some(Error::Payment(_))));
1861 }
1862
1863 #[tokio::test]
1868 async fn store_with_retry_fatal_preserves_same_pass_successes() {
1869 let chunks = make_chunks(6);
1870 let bad = chunks[5].0;
1871 let store_one = move |addr: [u8; 32], _content: Bytes| async move {
1872 if addr == bad {
1873 Err(Error::Payment("fatal".into()))
1874 } else {
1875 Ok(std::time::Instant::now())
1876 }
1877 };
1878
1879 let outcome = merkle_store_with_retry(chunks, 1, 1, Duration::ZERO, None, 0, 6, store_one)
1880 .await
1881 .expect("fatal carried in outcome, not returned as Err");
1882 assert!(matches!(outcome.fatal, Some(Error::Payment(_))));
1883 assert_eq!(outcome.stored, 5);
1885 assert_eq!(outcome.stored_addresses.len(), 5);
1886 assert!(!outcome.stored_addresses.contains(&bad));
1887 assert!(outcome.failed_addresses.iter().any(|(a, _)| *a == bad));
1889 }
1890
1891 #[tokio::test]
1893 async fn store_with_retry_retries_only_the_failed_set() {
1894 let chunks = make_chunks(5);
1895 let total = chunks.len();
1896 let failing: std::collections::HashSet<[u8; 32]> =
1897 chunks.iter().take(2).map(|(a, _)| *a).collect();
1898 let failing_for_closure = failing.clone();
1899
1900 let calls = Arc::new(Mutex::new(Vec::<[u8; 32]>::new()));
1902 let calls_for_closure = calls.clone();
1903
1904 let store_one = move |addr: [u8; 32], _content: Bytes| {
1905 let calls = calls_for_closure.clone();
1906 let already_seen = calls.lock().unwrap().iter().filter(|&&a| a == addr).count();
1908 let fail = failing_for_closure.contains(&addr) && already_seen == 0;
1909 calls.lock().unwrap().push(addr);
1910 async move {
1911 if fail {
1912 Err(Error::InsufficientPeers("round-1 shortfall".into()))
1913 } else {
1914 Ok(std::time::Instant::now())
1915 }
1916 }
1917 };
1918
1919 let outcome =
1920 merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1921 .await
1922 .expect("should converge after retry");
1923
1924 assert_eq!(outcome.stored, total);
1925 assert_eq!(outcome.failed, 0);
1926
1927 let calls = calls.lock().unwrap();
1931 assert_eq!(calls.len(), total + failing.len());
1932 let round_two: std::collections::HashSet<[u8; 32]> =
1933 calls[total..].iter().copied().collect();
1934 assert_eq!(round_two, failing);
1935 }
1936
1937 #[tokio::test]
1940 async fn store_with_retry_counts_retry_success_once_in_histogram() {
1941 let chunks = make_chunks(4);
1942 let total = chunks.len();
1943 let flaky_addr = chunks[0].0;
1944
1945 let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
1946 let attempts_for_closure = attempts.clone();
1947
1948 let store_one = move |addr: [u8; 32], _content: Bytes| {
1949 let attempts = attempts_for_closure.clone();
1950 let n = {
1951 let mut m = attempts.lock().unwrap();
1952 let entry = m.entry(addr).or_insert(0);
1953 *entry += 1;
1954 *entry
1955 };
1956 let fail = addr == flaky_addr && n == 1;
1957 async move {
1958 if fail {
1959 Err(Error::InsufficientPeers("transient".into()))
1960 } else {
1961 Ok(std::time::Instant::now())
1962 }
1963 }
1964 };
1965
1966 let outcome =
1967 merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1968 .await
1969 .expect("flaky chunk should recover on retry");
1970
1971 assert_eq!(outcome.stored, total);
1972 assert_eq!(outcome.failed, 0);
1973 assert_eq!(outcome.stats.retries_histogram[0], total - 1);
1975 assert_eq!(outcome.stats.retries_histogram[1], 1);
1976 assert_eq!(outcome.stats.chunk_attempts_total, total + 1);
1978 }
1979
1980 #[tokio::test]
1985 async fn store_with_retry_reports_all_failed_when_retries_exhausted() {
1986 let chunks = make_chunks(3);
1987 let total = chunks.len();
1988
1989 let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1990 Err::<std::time::Instant, _>(Error::InsufficientPeers("never converges".into()))
1991 };
1992
1993 let outcome = merkle_store_with_retry(
1994 chunks,
1995 8,
1996 MERKLE_STORE_MAX_ATTEMPTS,
1997 Duration::ZERO,
1998 None,
1999 0,
2000 total,
2001 store_one,
2002 )
2003 .await
2004 .expect("an exhausted retry budget is reported, not propagated as Err");
2005
2006 assert_eq!(outcome.stored, 0);
2007 assert_eq!(outcome.failed, total);
2008 assert_eq!(
2010 outcome.stats.chunk_attempts_total,
2011 total * MERKLE_STORE_MAX_ATTEMPTS
2012 );
2013 assert_eq!(outcome.stats.retries_histogram, [0; 4]);
2015 }
2016
2017 #[tokio::test]
2022 async fn store_with_retry_records_failed_addresses_when_exhausted() {
2023 let chunks = make_chunks(6);
2024 let failing: std::collections::HashSet<[u8; 32]> =
2025 chunks.iter().take(2).map(|(a, _)| *a).collect();
2026 let failing_for_closure = failing.clone();
2027
2028 let store_one = move |addr: [u8; 32], _content: Bytes| {
2029 let fail = failing_for_closure.contains(&addr);
2030 async move {
2031 if fail {
2032 Err(Error::InsufficientPeers("permanent shortfall".into()))
2033 } else {
2034 Ok(std::time::Instant::now())
2035 }
2036 }
2037 };
2038
2039 let outcome = merkle_store_with_retry(
2040 chunks,
2041 8,
2042 MERKLE_STORE_MAX_ATTEMPTS,
2043 Duration::ZERO,
2044 None,
2045 0,
2046 6,
2047 store_one,
2048 )
2049 .await
2050 .expect("quorum shortfalls must not abort the batch");
2051
2052 assert_eq!(outcome.stored, 4);
2053 assert_eq!(outcome.failed, 2);
2054 assert_eq!(outcome.failed_addresses.len(), 2);
2056 let reported: std::collections::HashSet<[u8; 32]> =
2057 outcome.failed_addresses.iter().map(|(a, _)| *a).collect();
2058 assert_eq!(reported, failing);
2059 for (_, msg) in &outcome.failed_addresses {
2061 assert!(msg.contains("permanent shortfall"));
2062 }
2063 }
2064
2065 #[tokio::test]
2068 async fn store_with_retry_failed_addresses_empty_on_full_success() {
2069 let chunks = make_chunks(4);
2070 let total = chunks.len();
2071 let store_one =
2072 |_addr: [u8; 32], _content: Bytes| async move { Ok(std::time::Instant::now()) };
2073
2074 let outcome = merkle_store_with_retry(
2075 chunks,
2076 8,
2077 MERKLE_STORE_MAX_ATTEMPTS,
2078 Duration::ZERO,
2079 None,
2080 0,
2081 total,
2082 store_one,
2083 )
2084 .await
2085 .expect("all chunks store");
2086
2087 assert_eq!(outcome.stored, total);
2088 assert_eq!(outcome.failed, 0);
2089 assert!(outcome.failed_addresses.is_empty());
2090 }
2091
2092 #[test]
2099 fn deferred_round_histogram_slot_maps_and_clamps() {
2100 assert_eq!(deferred_round_histogram_slot(0, 4), 1);
2101 assert_eq!(deferred_round_histogram_slot(1, 4), 2);
2102 assert_eq!(deferred_round_histogram_slot(2, 4), 3);
2103 assert_eq!(deferred_round_histogram_slot(3, 4), 3);
2105 assert_eq!(deferred_round_histogram_slot(9, 4), 3);
2106 }
2107
2108 fn fake_read_bodies(addrs: &[[u8; 32]]) -> Result<Vec<([u8; 32], Bytes)>> {
2112 Ok(addrs
2113 .iter()
2114 .map(|a| (*a, Bytes::from_static(b"deferred-body")))
2115 .collect())
2116 }
2117
2118 fn deferred_set(count: usize) -> Vec<([u8; 32], String)> {
2119 make_test_addresses(count)
2120 .into_iter()
2121 .map(|addr| (addr, "short of quorum".to_string()))
2122 .collect()
2123 }
2124
2125 #[tokio::test]
2129 async fn deferred_retry_succeeds_on_a_later_round() {
2130 let deferred = deferred_set(3);
2131 let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2134 let attempts_for_closure = attempts.clone();
2135 let store_one = move |addr: [u8; 32], _content: Bytes| {
2136 let attempts = attempts_for_closure.clone();
2137 async move {
2138 let n = {
2139 let mut map = attempts.lock().unwrap();
2140 let e = map.entry(addr).or_insert(0);
2141 *e += 1;
2142 *e
2143 };
2144 if n < 2 {
2145 Err(Error::InsufficientPeers("still short".into()))
2146 } else {
2147 Ok(std::time::Instant::now())
2148 }
2149 }
2150 };
2151
2152 let outcome = merkle_deferred_retry(
2153 deferred,
2154 &[0, 0, 0],
2155 64,
2156 fake_read_bodies,
2157 |n: usize| n.max(1),
2158 None,
2159 0,
2160 3,
2161 store_one,
2162 )
2163 .await
2164 .expect("deferred retry must not abort on quorum shortfalls");
2165
2166 assert_eq!(outcome.stored, 3, "all three land by round 1");
2167 assert_eq!(outcome.stored_addresses.len(), 3);
2168 assert_eq!(outcome.failed, 0);
2169 assert!(outcome.failed_addresses.is_empty());
2170 assert!(outcome.fatal.is_none());
2171 assert_eq!(outcome.stats.retries_histogram[1], 0);
2173 assert_eq!(outcome.stats.retries_histogram[2], 3);
2174 assert_eq!(outcome.stats.chunk_attempts_total, 6);
2176 }
2177
2178 #[tokio::test]
2181 async fn deferred_retry_leftovers_become_failed() {
2182 let deferred = deferred_set(2);
2183 let store_one = |_addr: [u8; 32], _content: Bytes| async move {
2184 Err::<std::time::Instant, _>(Error::InsufficientPeers("always short".into()))
2185 };
2186
2187 let outcome = merkle_deferred_retry(
2188 deferred,
2189 &[0, 0, 0],
2190 64,
2191 fake_read_bodies,
2192 |n: usize| n.max(1),
2193 None,
2194 0,
2195 2,
2196 store_one,
2197 )
2198 .await
2199 .expect("exhausted retries report failures, not an error");
2200
2201 assert_eq!(outcome.stored, 0);
2202 assert!(outcome.stored_addresses.is_empty());
2203 assert_eq!(outcome.failed, 2);
2204 assert_eq!(outcome.failed_addresses.len(), 2);
2205 assert!(outcome.fatal.is_none());
2206 assert_eq!(outcome.stats.chunk_attempts_total, 6);
2208 }
2209
2210 #[tokio::test]
2215 async fn deferred_retry_fatal_error_preserves_prior_progress() {
2216 let addrs = make_test_addresses(2);
2217 let good = addrs[0];
2218 let bad = addrs[1];
2219 let deferred = vec![(good, "short".to_string()), (bad, "short".to_string())];
2220
2221 let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2224 let attempts_for_closure = attempts.clone();
2225 let store_one = move |addr: [u8; 32], _content: Bytes| {
2226 let attempts = attempts_for_closure.clone();
2227 async move {
2228 let n = {
2229 let mut map = attempts.lock().unwrap();
2230 let e = map.entry(addr).or_insert(0);
2231 *e += 1;
2232 *e
2233 };
2234 if addr == good {
2235 Ok(std::time::Instant::now())
2236 } else if n == 1 {
2237 Err(Error::InsufficientPeers("short".into()))
2238 } else {
2239 Err(Error::Payment("fatal on retry".into()))
2240 }
2241 }
2242 };
2243
2244 let outcome = merkle_deferred_retry(
2245 deferred,
2246 &[0, 0, 0],
2247 64,
2248 fake_read_bodies,
2249 |n: usize| n.max(1),
2250 None,
2251 0,
2252 2,
2253 store_one,
2254 )
2255 .await
2256 .expect("a fatal round error is reported via `fatal`, not as Err");
2257
2258 assert!(outcome.fatal.is_some(), "fatal error must be captured");
2259 assert_eq!(outcome.stored, 1, "round-0 success preserved");
2260 assert_eq!(outcome.stored_addresses, vec![good]);
2261 assert_eq!(outcome.failed, 1);
2262 assert_eq!(outcome.failed_addresses.len(), 1);
2263 assert_eq!(outcome.failed_addresses[0].0, bad);
2264 }
2265
2266 #[tokio::test]
2268 async fn deferred_retry_empty_set_is_a_noop() {
2269 let store_one = |_addr: [u8; 32], _content: Bytes| async move {
2270 Err::<std::time::Instant, _>(Error::InsufficientPeers("unused".into()))
2271 };
2272
2273 let outcome = merkle_deferred_retry(
2274 Vec::new(),
2275 &DEFERRED_ROUND_DELAYS_SECS,
2276 64,
2277 fake_read_bodies,
2278 |n: usize| n.max(1),
2279 None,
2280 7,
2281 7,
2282 store_one,
2283 )
2284 .await
2285 .expect("empty deferred set is a no-op");
2286
2287 assert_eq!(outcome.stored, 7, "stored_offset carried through unchanged");
2288 assert_eq!(outcome.failed, 0);
2289 assert!(outcome.stored_addresses.is_empty());
2290 assert!(outcome.failed_addresses.is_empty());
2291 assert!(outcome.fatal.is_none());
2292 }
2293
2294 #[tokio::test]
2299 async fn deferred_retry_reads_bodies_in_bounded_batches() {
2300 let deferred = deferred_set(10);
2301 let batch_size = 4;
2302 let max_batch = Arc::new(Mutex::new(0usize));
2304 let max_batch_for_closure = max_batch.clone();
2305 let read_bodies = move |addrs: &[[u8; 32]]| {
2306 let mut m = max_batch_for_closure.lock().unwrap();
2307 *m = (*m).max(addrs.len());
2308 Ok(addrs
2309 .iter()
2310 .map(|a| (*a, Bytes::from_static(b"body")))
2311 .collect())
2312 };
2313 let store_one =
2314 |_addr: [u8; 32], _content: Bytes| async move { Ok(std::time::Instant::now()) };
2315
2316 let outcome = merkle_deferred_retry(
2317 deferred,
2318 &[0, 0, 0],
2319 batch_size,
2320 read_bodies,
2321 |n: usize| n.max(1),
2322 None,
2323 0,
2324 10,
2325 store_one,
2326 )
2327 .await
2328 .expect("bounded-batch deferred retry stores everything");
2329
2330 assert_eq!(outcome.stored, 10);
2331 assert_eq!(outcome.stored_addresses.len(), 10);
2332 assert_eq!(outcome.failed, 0);
2333 assert!(
2334 *max_batch.lock().unwrap() <= batch_size,
2335 "read_bodies must never be handed more than batch_size addresses at once"
2336 );
2337 }
2338}