1use crate::data::client::adaptive::{observe_op, Outcome};
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{
13 Amount, MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree,
14 MidpointProof, PoolCommitment, CANDIDATES_PER_POOL, MAX_LEAVES,
15};
16use ant_protocol::payment::{serialize_merkle_proof, verify_merkle_candidate_signature};
17use ant_protocol::transport::PeerId;
18use ant_protocol::{
19 compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
20 MerkleCandidateQuoteRequest, MerkleCandidateQuoteResponse,
21};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use rand::Rng;
25use std::collections::{HashMap, VecDeque};
26use std::time::Duration;
27use tokio::sync::mpsc;
28use tracing::{debug, info, warn};
29use xor_name::XorName;
30
31pub const DEFAULT_MERKLE_THRESHOLD: usize = 64;
33
34#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum PaymentMode {
38 #[default]
40 Auto,
41 Merkle,
43 Single,
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct MerkleBatchPaymentResult {
53 pub proofs: HashMap<[u8; 32], Vec<u8>>,
55 pub chunk_count: usize,
57 pub storage_cost_atto: String,
59 pub gas_cost_wei: u128,
61 #[serde(default)]
66 pub merkle_payment_timestamp: u64,
67}
68
69pub struct PreparedMerkleBatch {
74 pub depth: u8,
76 pub pool_commitments: Vec<PoolCommitment>,
78 pub merkle_payment_timestamp: u64,
80 candidate_pools: Vec<MerklePaymentCandidatePool>,
82 tree: MerkleTree,
84 addresses: Vec<[u8; 32]>,
86}
87
88#[derive(Debug, Clone, Default)]
90pub(crate) struct MerkleUploadPlan {
91 pub already_stored: Vec<[u8; 32]>,
93 pub to_upload: Vec<[u8; 32]>,
95 to_upload_total_bytes: u64,
97}
98
99impl MerkleUploadPlan {
100 #[must_use]
102 pub fn to_upload_avg_size(&self) -> u64 {
103 if self.to_upload.is_empty() {
104 return 0;
105 }
106
107 self.to_upload_total_bytes / self.to_upload.len() as u64
108 }
109}
110
111impl std::fmt::Debug for PreparedMerkleBatch {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("PreparedMerkleBatch")
114 .field("depth", &self.depth)
115 .field("pool_commitments", &self.pool_commitments.len())
116 .field("merkle_payment_timestamp", &self.merkle_payment_timestamp)
117 .field("candidate_pools", &self.candidate_pools.len())
118 .field("addresses", &self.addresses.len())
119 .finish()
120 }
121}
122
123pub(crate) fn chunk_contents_for_upload_addresses(
128 chunk_contents: Vec<Bytes>,
129 addresses: &[[u8; 32]],
130) -> Result<Vec<Bytes>> {
131 if addresses.is_empty() {
132 return Ok(Vec::new());
133 }
134
135 let mut needed_by_address: HashMap<[u8; 32], usize> = HashMap::new();
136 for address in addresses {
137 *needed_by_address.entry(*address).or_default() += 1;
138 }
139
140 let mut chunks_by_address: HashMap<[u8; 32], VecDeque<Bytes>> =
141 HashMap::with_capacity(needed_by_address.len());
142 let mut remaining = addresses.len();
143 for chunk in chunk_contents {
144 let address = compute_address(&chunk);
145 if let Some(needed) = needed_by_address.get_mut(&address) {
146 if *needed > 0 {
147 chunks_by_address
148 .entry(address)
149 .or_default()
150 .push_back(chunk);
151 *needed -= 1;
152 remaining -= 1;
153 if remaining == 0 {
154 break;
155 }
156 }
157 }
158 }
159
160 for (address, needed) in &needed_by_address {
161 if *needed == 0 {
162 continue;
163 }
164
165 if chunks_by_address.contains_key(address) {
166 return Err(Error::InvalidData(format!(
167 "missing duplicate chunk content for merkle address {}",
168 hex::encode(address)
169 )));
170 }
171
172 return Err(Error::InvalidData(format!(
173 "missing chunk content for merkle address {}",
174 hex::encode(address)
175 )));
176 }
177
178 let mut selected = Vec::with_capacity(addresses.len());
179 for address in addresses {
180 let chunks = chunks_by_address.get_mut(address).ok_or_else(|| {
181 Error::InvalidData(format!(
182 "missing chunk content for merkle address {}",
183 hex::encode(address)
184 ))
185 })?;
186 let chunk = chunks.pop_front().ok_or_else(|| {
187 Error::InvalidData(format!(
188 "missing duplicate chunk content for merkle address {}",
189 hex::encode(address)
190 ))
191 })?;
192 selected.push(chunk);
193 }
194
195 Ok(selected)
196}
197
198fn preflight_stored_status<T>(result: Result<T>) -> Result<bool> {
217 match result {
218 Ok(_) => Ok(false),
219 Err(Error::AlreadyStored) => Ok(true),
220 Err(e) if matches!(classify_error(&e), Outcome::Timeout | Outcome::NetworkError) => {
221 Ok(false)
222 }
223 Err(e) => Err(e),
224 }
225}
226
227#[must_use]
230pub fn should_use_merkle(chunk_count: usize, mode: PaymentMode) -> bool {
231 match mode {
232 PaymentMode::Auto => chunk_count >= DEFAULT_MERKLE_THRESHOLD,
233 PaymentMode::Merkle => chunk_count >= 2,
234 PaymentMode::Single => false,
235 }
236}
237
238impl Client {
239 #[must_use]
241 pub fn should_use_merkle(&self, chunk_count: usize, mode: PaymentMode) -> bool {
242 should_use_merkle(chunk_count, mode)
243 }
244
245 pub async fn pay_for_merkle_batch(
259 &self,
260 addresses: &[[u8; 32]],
261 data_type: u32,
262 data_size: u64,
263 ) -> Result<MerkleBatchPaymentResult> {
264 let chunk_count = addresses.len();
265 if chunk_count < 2 {
266 return Err(Error::Payment(
267 "Merkle batch payment requires at least 2 chunks".to_string(),
268 ));
269 }
270
271 if chunk_count > MAX_LEAVES {
272 return self
273 .pay_for_merkle_multi_batch(addresses, data_type, data_size)
274 .await;
275 }
276
277 self.pay_for_merkle_single_batch(addresses, data_type, data_size)
278 .await
279 }
280
281 pub(crate) async fn plan_merkle_upload(
290 &self,
291 chunks: Vec<([u8; 32], u64)>,
292 data_type: u32,
293 progress: Option<&mpsc::Sender<UploadEvent>>,
294 ) -> Result<MerkleUploadPlan> {
295 let total_chunks = chunks.len();
296 if total_chunks == 0 {
297 return Ok(MerkleUploadPlan::default());
298 }
299
300 info!("Checking {total_chunks} merkle chunks for existing storage before payment");
301
302 let quote_limiter = self.controller().quote.clone();
303 let quote_concurrency = quote_limiter.current().min(total_chunks.max(1));
304 let mut check_stream = stream::iter(chunks.into_iter().enumerate())
305 .map(|(index, (address, data_size))| {
306 let limiter = quote_limiter.clone();
307 async move {
308 let result = observe_op(
309 &limiter,
310 || async move {
311 self.chunk_already_stored_for_merkle(&address, data_type, data_size)
312 .await
313 },
314 classify_error,
315 )
316 .await;
317 (index, address, data_size, result)
318 }
319 })
320 .buffer_unordered(quote_concurrency);
321
322 let mut already_stored: Vec<(usize, [u8; 32])> = Vec::new();
323 let mut to_upload: Vec<(usize, [u8; 32], u64)> = Vec::new();
324 let mut checked = 0usize;
325
326 while let Some((index, address, data_size, result)) = check_stream.next().await {
327 let is_already_stored = result?;
328 checked += 1;
329
330 if let Some(tx) = progress {
331 let _ = tx.try_send(UploadEvent::ChunkQuoted {
332 quoted: checked,
333 total: total_chunks,
334 });
335 }
336
337 if is_already_stored {
338 debug!(
339 "Merkle preflight {checked}/{total_chunks}: chunk {} already stored",
340 hex::encode(address)
341 );
342 already_stored.push((index, address));
343 if let Some(tx) = progress {
344 let _ = tx.try_send(UploadEvent::ChunkStored {
345 stored: already_stored.len(),
346 total: total_chunks,
347 });
348 }
349 } else {
350 debug!(
351 "Merkle preflight {checked}/{total_chunks}: chunk {} needs upload",
352 hex::encode(address)
353 );
354 to_upload.push((index, address, data_size));
355 }
356 }
357
358 already_stored.sort_by_key(|(index, _)| *index);
359 to_upload.sort_by_key(|(index, _, _)| *index);
360
361 let to_upload_total_bytes = to_upload.iter().fold(0u64, |acc, (_, _, data_size)| {
362 acc.saturating_add(*data_size)
363 });
364
365 let already_stored = already_stored
366 .into_iter()
367 .map(|(_, address)| address)
368 .collect::<Vec<_>>();
369 let to_upload = to_upload
370 .into_iter()
371 .map(|(_, address, _)| address)
372 .collect::<Vec<_>>();
373
374 info!(
375 "Merkle preflight complete: {} already stored, {} need upload",
376 already_stored.len(),
377 to_upload.len()
378 );
379
380 Ok(MerkleUploadPlan {
381 already_stored,
382 to_upload,
383 to_upload_total_bytes,
384 })
385 }
386
387 async fn chunk_already_stored_for_merkle(
388 &self,
389 address: &[u8; 32],
390 data_type: u32,
391 data_size: u64,
392 ) -> Result<bool> {
393 let result = self
394 .get_store_quotes_with_fault_tolerance(address, data_size, data_type)
395 .await;
396 if let Err(e) = &result {
397 if matches!(classify_error(e), Outcome::Timeout | Outcome::NetworkError) {
398 debug!(
399 "Merkle preflight: could not determine stored status for {} ({e}); \
400 treating as not stored and queuing for upload",
401 hex::encode(address)
402 );
403 }
404 }
405 preflight_stored_status(result)
406 }
407
408 pub async fn prepare_merkle_batch_external(
414 &self,
415 addresses: &[[u8; 32]],
416 data_type: u32,
417 data_size: u64,
418 ) -> Result<PreparedMerkleBatch> {
419 let chunk_count = addresses.len();
420 let xornames: Vec<XorName> = addresses.iter().map(|a| XorName(*a)).collect();
421
422 debug!("Building merkle tree for {chunk_count} chunks");
423
424 let tree = MerkleTree::from_xornames(xornames)
426 .map_err(|e| Error::Payment(format!("Failed to build merkle tree: {e}")))?;
427
428 let depth = tree.depth();
429 let merkle_payment_timestamp = std::time::SystemTime::now()
430 .duration_since(std::time::UNIX_EPOCH)
431 .map_err(|e| Error::Payment(format!("System time error: {e}")))?
432 .as_secs();
433
434 debug!("Merkle tree: depth={depth}, leaves={chunk_count}, ts={merkle_payment_timestamp}");
435
436 let midpoint_proofs = tree
438 .reward_candidates(merkle_payment_timestamp)
439 .map_err(|e| Error::Payment(format!("Failed to generate reward candidates: {e}")))?;
440
441 debug!(
442 "Collecting candidate pools from {} midpoints (concurrent)",
443 midpoint_proofs.len()
444 );
445
446 let candidate_pools = self
448 .build_candidate_pools(
449 &midpoint_proofs,
450 data_type,
451 data_size,
452 merkle_payment_timestamp,
453 )
454 .await?;
455
456 let pool_commitments: Vec<PoolCommitment> = candidate_pools
458 .iter()
459 .map(MerklePaymentCandidatePool::to_commitment)
460 .collect();
461
462 Ok(PreparedMerkleBatch {
463 depth,
464 pool_commitments,
465 merkle_payment_timestamp,
466 candidate_pools,
467 tree,
468 addresses: addresses.to_vec(),
469 })
470 }
471
472 async fn pay_for_merkle_single_batch(
474 &self,
475 addresses: &[[u8; 32]],
476 data_type: u32,
477 data_size: u64,
478 ) -> Result<MerkleBatchPaymentResult> {
479 let wallet = self.require_wallet()?;
480 let prepared = self
481 .prepare_merkle_batch_external(addresses, data_type, data_size)
482 .await?;
483
484 info!(
485 "Submitting merkle batch payment on-chain (depth={})",
486 prepared.depth
487 );
488 let (winner_pool_hash, amount, gas_info) = wallet
489 .pay_for_merkle_tree(
490 prepared.depth,
491 prepared.pool_commitments.clone(),
492 prepared.merkle_payment_timestamp,
493 )
494 .await
495 .map_err(|e| Error::Payment(format!("Merkle batch payment failed: {e}")))?;
496
497 info!(
498 "Merkle payment succeeded: winner pool {}",
499 hex::encode(winner_pool_hash)
500 );
501
502 let mut result = finalize_merkle_batch(prepared, winner_pool_hash)?;
503 result.storage_cost_atto = amount.to_string();
504 result.gas_cost_wei = gas_info.gas_cost_wei;
505 Ok(result)
506 }
507
508 async fn pay_for_merkle_multi_batch(
510 &self,
511 addresses: &[[u8; 32]],
512 data_type: u32,
513 data_size: u64,
514 ) -> Result<MerkleBatchPaymentResult> {
515 let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect();
516 let total_sub_batches = sub_batches.len();
517 let mut all_proofs = HashMap::with_capacity(addresses.len());
518 let mut total_storage = Amount::ZERO;
519 let mut total_gas: u128 = 0;
520 let mut oldest_ts: u64 = 0;
524
525 for (i, chunk) in sub_batches.into_iter().enumerate() {
526 match self
527 .pay_for_merkle_single_batch(chunk, data_type, data_size)
528 .await
529 {
530 Ok(sub_result) => {
531 if let Ok(cost) = sub_result.storage_cost_atto.parse::<Amount>() {
532 total_storage += cost;
533 }
534 total_gas = total_gas.saturating_add(sub_result.gas_cost_wei);
535 if oldest_ts == 0
536 || (sub_result.merkle_payment_timestamp > 0
537 && sub_result.merkle_payment_timestamp < oldest_ts)
538 {
539 oldest_ts = sub_result.merkle_payment_timestamp;
540 }
541 all_proofs.extend(sub_result.proofs);
542 }
543 Err(e) => {
544 if all_proofs.is_empty() {
545 return Err(e);
547 }
548 warn!(
550 "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \
551 Returning {} proofs from prior sub-batches",
552 i + 1,
553 all_proofs.len()
554 );
555 return Ok(MerkleBatchPaymentResult {
556 chunk_count: all_proofs.len(),
557 proofs: all_proofs,
558 storage_cost_atto: total_storage.to_string(),
559 gas_cost_wei: total_gas,
560 merkle_payment_timestamp: oldest_ts,
561 });
562 }
563 }
564 }
565
566 Ok(MerkleBatchPaymentResult {
567 chunk_count: addresses.len(),
568 proofs: all_proofs,
569 storage_cost_atto: total_storage.to_string(),
570 gas_cost_wei: total_gas,
571 merkle_payment_timestamp: oldest_ts,
572 })
573 }
574
575 async fn build_candidate_pools(
577 &self,
578 midpoint_proofs: &[MidpointProof],
579 data_type: u32,
580 data_size: u64,
581 merkle_payment_timestamp: u64,
582 ) -> Result<Vec<MerklePaymentCandidatePool>> {
583 let mut pool_futures = FuturesUnordered::new();
584
585 for midpoint_proof in midpoint_proofs {
586 let pool_address = midpoint_proof.address();
587 let mp = midpoint_proof.clone();
588 pool_futures.push(async move {
589 let candidate_nodes = self
590 .get_merkle_candidate_pool(
591 &pool_address.0,
592 data_type,
593 data_size,
594 merkle_payment_timestamp,
595 )
596 .await?;
597 Ok::<_, Error>(MerklePaymentCandidatePool {
598 midpoint_proof: mp,
599 candidate_nodes,
600 })
601 });
602 }
603
604 let mut pools = Vec::with_capacity(midpoint_proofs.len());
605 while let Some(result) = pool_futures.next().await {
606 pools.push(result?);
607 }
608
609 Ok(pools)
610 }
611
612 #[allow(clippy::too_many_lines)]
614 async fn get_merkle_candidate_pool(
615 &self,
616 address: &[u8; 32],
617 data_type: u32,
618 data_size: u64,
619 merkle_payment_timestamp: u64,
620 ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
621 let node = self.network().node();
622 let timeout = Duration::from_secs(self.config().quote_timeout_secs);
623
624 let query_count = CANDIDATES_PER_POOL * 2;
626 let mut remote_peers = self
627 .network()
628 .find_closest_peers(address, query_count)
629 .await?;
630
631 if remote_peers.len() < CANDIDATES_PER_POOL {
635 let connected = self.network().connected_peers().await;
636 for peer in connected {
637 if !remote_peers.iter().any(|(id, _)| *id == peer) {
638 remote_peers.push((peer, vec![]));
639 }
640 }
641 }
642
643 if remote_peers.len() < CANDIDATES_PER_POOL {
644 return Err(Error::InsufficientPeers(format!(
645 "Found {} peers, need {CANDIDATES_PER_POOL} for merkle candidate pool. \
646 Use --no-merkle or a larger network.",
647 remote_peers.len()
648 )));
649 }
650
651 let mut candidate_futures = FuturesUnordered::new();
652
653 for (peer_id, peer_addrs) in &remote_peers {
654 let request_id = self.next_request_id();
655 let request = MerkleCandidateQuoteRequest {
656 address: *address,
657 data_type,
658 data_size,
659 merkle_payment_timestamp,
660 };
661 let message = ChunkMessage {
662 request_id,
663 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
664 };
665
666 let message_bytes = match message.encode() {
667 Ok(bytes) => bytes,
668 Err(e) => {
669 warn!("Failed to encode merkle candidate request for {peer_id}: {e}");
670 continue;
671 }
672 };
673
674 let peer_id_clone = *peer_id;
675 let addrs_clone = peer_addrs.clone();
676 let node_clone = node.clone();
677
678 let fut = async move {
679 let result = send_and_await_chunk_response(
680 &node_clone,
681 &peer_id_clone,
682 message_bytes,
683 request_id,
684 timeout,
685 &addrs_clone,
686 |body| match body {
687 ChunkMessageBody::MerkleCandidateQuoteResponse(
688 MerkleCandidateQuoteResponse::Success { candidate_node },
689 ) => {
690 match rmp_serde::from_slice::<MerklePaymentCandidateNode>(
691 &candidate_node,
692 ) {
693 Ok(node) => Some(Ok(node)),
694 Err(e) => Some(Err(Error::Serialization(format!(
695 "Failed to deserialize candidate node from {peer_id_clone}: {e}"
696 )))),
697 }
698 }
699 ChunkMessageBody::MerkleCandidateQuoteResponse(
700 MerkleCandidateQuoteResponse::Error(e),
701 ) => Some(Err(Error::Protocol(format!(
702 "Merkle quote error from {peer_id_clone}: {e}"
703 )))),
704 _ => None,
705 },
706 |e| {
707 Error::Network(format!(
708 "Failed to send merkle candidate request to {peer_id_clone}: {e}"
709 ))
710 },
711 || {
712 Error::Timeout(format!(
713 "Timeout waiting for merkle candidate from {peer_id_clone}"
714 ))
715 },
716 )
717 .await;
718
719 (peer_id_clone, result)
720 };
721
722 candidate_futures.push(fut);
723 }
724
725 self.collect_validated_candidates(&mut candidate_futures, address, merkle_payment_timestamp)
726 .await
727 }
728
729 async fn collect_validated_candidates(
740 &self,
741 futures: &mut FuturesUnordered<
742 impl std::future::Future<
743 Output = (
744 PeerId,
745 std::result::Result<MerklePaymentCandidateNode, Error>,
746 ),
747 >,
748 >,
749 target_address: &[u8; 32],
750 merkle_payment_timestamp: u64,
751 ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
752 let mut valid: Vec<(PeerId, MerklePaymentCandidateNode)> = Vec::new();
753 let mut failures: Vec<String> = Vec::new();
754
755 while let Some((peer_id, result)) = futures.next().await {
756 match result {
757 Ok(candidate) => {
758 if !verify_merkle_candidate_signature(&candidate) {
759 warn!("Invalid ML-DSA-65 signature from merkle candidate {peer_id}");
760 failures.push(format!("{peer_id}: invalid signature"));
761 continue;
762 }
763 if candidate.merkle_payment_timestamp != merkle_payment_timestamp {
764 warn!("Timestamp mismatch from merkle candidate {peer_id}");
765 failures.push(format!("{peer_id}: timestamp mismatch"));
766 continue;
767 }
768 valid.push((peer_id, candidate));
769 }
770 Err(e) => {
771 debug!("Failed to get merkle candidate from {peer_id}: {e}");
772 failures.push(format!("{peer_id}: {e}"));
773 }
774 }
775 }
776
777 if valid.len() < CANDIDATES_PER_POOL {
778 return Err(Error::InsufficientPeers(format!(
779 "Got {} merkle candidates, need {CANDIDATES_PER_POOL}. Failures: [{}]",
780 valid.len(),
781 failures.join("; ")
782 )));
783 }
784
785 let target_peer = PeerId::from_bytes(*target_address);
786 valid.sort_by_key(|(peer_id, _)| peer_id.xor_distance(&target_peer));
787
788 let candidates: Vec<MerklePaymentCandidateNode> = valid
789 .into_iter()
790 .take(CANDIDATES_PER_POOL)
791 .map(|(_, candidate)| candidate)
792 .collect();
793
794 candidates
795 .try_into()
796 .map_err(|_| Error::Payment("Failed to convert candidates to fixed array".to_string()))
797 }
798
799 pub(crate) async fn merkle_upload_chunks(
820 &self,
821 chunk_contents: Vec<Bytes>,
822 addresses: Vec<[u8; 32]>,
823 batch_result: &MerkleBatchPaymentResult,
824 progress: Option<&mpsc::Sender<UploadEvent>>,
825 stored_offset: usize,
826 total_chunks: usize,
827 ) -> Result<MerkleStoreOutcome> {
828 let store_limiter = self.controller().store.clone();
829 let batch_size = chunk_contents.len();
832 if batch_size != addresses.len() {
833 return Err(Error::InvalidData(format!(
834 "merkle upload has {batch_size} chunk contents but {} addresses",
835 addresses.len()
836 )));
837 }
838 let cap = || store_limiter.current().min(batch_size.max(1));
842
843 let bodies: std::collections::HashMap<[u8; 32], Bytes> =
848 addresses.iter().copied().zip(chunk_contents).collect();
849 let addrs = addresses;
850
851 let store_one = |addr: [u8; 32]| {
856 let limiter = store_limiter.clone();
857 let content = bodies.get(&addr).cloned();
858 let proof_bytes = batch_result.proofs.get(&addr).cloned();
859 async move {
860 let started = std::time::Instant::now();
861 let content = content.ok_or_else(|| {
862 Error::InvalidData(format!("missing chunk body for {}", hex::encode(addr)))
863 })?;
864 let proof = proof_bytes.ok_or_else(|| {
865 Error::Payment(format!(
866 "Missing merkle proof for chunk {}",
867 hex::encode(addr)
868 ))
869 })?;
870 let peers = self.put_target_peers(&addr).await?;
871 observe_op(
872 &limiter,
873 || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
874 classify_error,
875 )
876 .await
877 .map(|_| started)
878 }
879 };
880
881 let outcome = merkle_store_with_retry(
882 addrs,
883 cap,
884 MERKLE_STORE_MAX_ATTEMPTS,
885 MERKLE_RETRY_BACKOFF,
886 progress,
887 stored_offset,
888 total_chunks,
889 store_one,
890 )
891 .await?;
892
893 if let Some(e) = outcome.fatal {
899 return Err(e);
900 }
901 Ok(outcome)
902 }
903}
904
905pub(crate) const MERKLE_STORE_MAX_ATTEMPTS: usize = 4;
917
918pub(crate) const MERKLE_RETRY_BACKOFF: Duration = Duration::from_secs(30);
924
925const MERKLE_RETRY_JITTER: f64 = 0.1;
928
929#[derive(Debug, Default)]
932pub(crate) struct MerkleStoreOutcome {
933 pub stored: usize,
936 pub stored_addresses: Vec<[u8; 32]>,
943 pub failed: usize,
945 pub failed_addresses: Vec<([u8; 32], String)>,
950 pub fatal: Option<Error>,
957 pub stats: crate::data::client::batch::WaveAggregateStats,
959}
960
961#[allow(clippy::too_many_arguments)]
987pub(crate) async fn merkle_store_with_retry<F, Fut, C>(
988 addrs: Vec<[u8; 32]>,
989 cap: C,
990 max_attempts: usize,
991 backoff: Duration,
992 progress: Option<&mpsc::Sender<UploadEvent>>,
993 stored_offset: usize,
994 total: usize,
995 store_one: F,
996) -> Result<MerkleStoreOutcome>
997where
998 F: Fn([u8; 32]) -> Fut,
999 Fut: std::future::Future<Output = Result<std::time::Instant>>,
1000 C: Fn() -> usize,
1001{
1002 let attempts = max_attempts.max(1);
1003 let mut outcome = MerkleStoreOutcome {
1004 stored: stored_offset,
1005 ..MerkleStoreOutcome::default()
1006 };
1007 let mut pending = addrs;
1008
1009 for attempt in 0..attempts {
1010 let mut next_failed: Vec<([u8; 32], String)> = Vec::new();
1016
1017 let mut pending_iter = pending.into_iter();
1022 let mut in_flight = FuturesUnordered::new();
1023 loop {
1024 let slots = cap().max(1);
1025 while in_flight.len() < slots {
1026 match pending_iter.next() {
1027 Some(addr) => {
1028 let fut = store_one(addr);
1029 in_flight.push(async move { (addr, fut.await) });
1030 }
1031 None => break,
1032 }
1033 }
1034 let Some((addr, result)) = in_flight.next().await else {
1035 break;
1036 };
1037 outcome.stats.chunk_attempts_total =
1038 outcome.stats.chunk_attempts_total.saturating_add(1);
1039 match result {
1040 Ok(started) => {
1041 let duration_ms =
1042 u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
1043 outcome.stats.store_durations_ms.push(duration_ms);
1044 let idx = attempt.min(outcome.stats.retries_histogram.len().saturating_sub(1));
1045 outcome.stats.retries_histogram[idx] =
1046 outcome.stats.retries_histogram[idx].saturating_add(1);
1047 outcome.stored += 1;
1048 outcome.stored_addresses.push(addr);
1049 if let Some(tx) = progress {
1050 let _ = tx.try_send(UploadEvent::ChunkStored {
1051 stored: outcome.stored,
1052 total,
1053 });
1054 }
1055 }
1056 Err(
1063 e @ (Error::InsufficientPeers(_)
1064 | Error::CloseGroupShortfall(_)
1065 | Error::RemotePut { .. }),
1066 ) => {
1067 next_failed.push((addr, e.to_string()));
1068 }
1069 Err(e) => {
1070 next_failed.push((addr, e.to_string()));
1078 outcome.fatal = Some(e);
1079 break;
1080 }
1081 }
1082 }
1083
1084 if outcome.fatal.is_some() {
1085 outcome.failed = next_failed.len();
1086 outcome.failed_addresses = next_failed;
1087 return Ok(outcome);
1088 }
1089
1090 if next_failed.is_empty() {
1091 break;
1092 }
1093
1094 if attempt + 1 < attempts {
1095 warn!(
1096 failed = next_failed.len(),
1097 attempt = attempt + 1,
1098 "merkle chunks short of quorum, retrying after backoff"
1099 );
1100 pending = next_failed.into_iter().map(|(addr, _msg)| addr).collect();
1101 if backoff > Duration::ZERO {
1102 let wait = {
1107 let mut rng = rand::thread_rng();
1108 let factor = 1.0 + rng.gen_range(-MERKLE_RETRY_JITTER..=MERKLE_RETRY_JITTER);
1109 backoff.mul_f64(factor)
1110 };
1111 tokio::time::sleep(wait).await;
1112 }
1113 } else {
1114 outcome.failed = next_failed.len();
1115 outcome.failed_addresses = next_failed;
1116 break;
1117 }
1118 }
1119
1120 Ok(outcome)
1121}
1122
1123pub(crate) const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
1132
1133pub(crate) fn deferred_round_histogram_slot(round: usize, hist_len: usize) -> usize {
1140 (round + 1).min(hist_len.saturating_sub(1))
1141}
1142
1143#[derive(Debug, Default)]
1145pub(crate) struct DeferredRetryOutcome {
1146 pub stored: usize,
1150 pub stored_addresses: Vec<[u8; 32]>,
1153 pub failed: usize,
1155 pub failed_addresses: Vec<([u8; 32], String)>,
1159 pub fatal: Option<String>,
1163 pub stats: crate::data::client::batch::WaveAggregateStats,
1166}
1167
1168#[allow(clippy::too_many_arguments)]
1186pub(crate) async fn merkle_deferred_retry<CF, SF, Fut>(
1187 deferred: Vec<([u8; 32], String)>,
1188 round_delays_secs: &[u64],
1189 concurrency_for: CF,
1190 progress: Option<&mpsc::Sender<UploadEvent>>,
1191 stored_offset: usize,
1192 total: usize,
1193 store_one: SF,
1194) -> Result<DeferredRetryOutcome>
1195where
1196 CF: Fn(usize) -> usize,
1197 SF: Fn([u8; 32]) -> Fut,
1198 Fut: std::future::Future<Output = Result<std::time::Instant>>,
1199{
1200 let mut outcome = DeferredRetryOutcome {
1201 stored: stored_offset,
1202 ..DeferredRetryOutcome::default()
1203 };
1204 let mut remaining = deferred;
1205 let rounds = round_delays_secs.len();
1206
1207 for (round, &delay_secs) in round_delays_secs.iter().enumerate() {
1208 if remaining.is_empty() {
1209 break;
1210 }
1211 if delay_secs > 0 {
1212 tokio::time::sleep(Duration::from_secs(delay_secs)).await;
1213 }
1214 info!(
1215 "Deferred merkle retry round {}/{}: {} chunk(s) short of quorum",
1216 round + 1,
1217 rounds,
1218 remaining.len(),
1219 );
1220
1221 let slot = deferred_round_histogram_slot(round, outcome.stats.retries_histogram.len());
1225 let round_addrs: Vec<[u8; 32]> = std::mem::take(&mut remaining)
1226 .into_iter()
1227 .map(|(addr, _msg)| addr)
1228 .collect();
1229 let round_len = round_addrs.len();
1230 let cap = || concurrency_for(round_len);
1233
1234 let round_outcome = merkle_store_with_retry(
1235 round_addrs,
1236 cap,
1237 1,
1238 Duration::ZERO,
1239 progress,
1240 outcome.stored,
1241 total,
1242 &store_one,
1243 )
1244 .await?;
1245
1246 outcome.stored = round_outcome.stored;
1247 outcome
1248 .stored_addresses
1249 .extend(round_outcome.stored_addresses);
1250
1251 outcome.stats.chunk_attempts_total = outcome
1253 .stats
1254 .chunk_attempts_total
1255 .saturating_add(round_outcome.stats.chunk_attempts_total);
1256 outcome
1257 .stats
1258 .store_durations_ms
1259 .extend(round_outcome.stats.store_durations_ms);
1260 let landed: usize = round_outcome.stats.retries_histogram.iter().sum();
1261 outcome.stats.retries_histogram[slot] =
1262 outcome.stats.retries_histogram[slot].saturating_add(landed);
1263
1264 if let Some(fatal) = round_outcome.fatal {
1265 outcome.fatal = Some(fatal.to_string());
1270 outcome.failed = round_outcome.failed_addresses.len();
1271 outcome.failed_addresses = round_outcome.failed_addresses;
1272 return Ok(outcome);
1273 }
1274
1275 remaining = round_outcome.failed_addresses;
1277 }
1278
1279 outcome.failed = remaining.len();
1280 outcome.failed_addresses = remaining;
1281 Ok(outcome)
1282}
1283
1284pub fn finalize_merkle_batch(
1289 prepared: PreparedMerkleBatch,
1290 winner_pool_hash: [u8; 32],
1291) -> Result<MerkleBatchPaymentResult> {
1292 let chunk_count = prepared.addresses.len();
1293 let xornames: Vec<XorName> = prepared.addresses.iter().map(|a| XorName(*a)).collect();
1294
1295 let winner_pool = prepared
1297 .candidate_pools
1298 .iter()
1299 .find(|pool| pool.hash() == winner_pool_hash)
1300 .ok_or_else(|| {
1301 Error::Payment(format!(
1302 "Winner pool {} not found in candidate pools",
1303 hex::encode(winner_pool_hash)
1304 ))
1305 })?;
1306
1307 info!("Generating merkle proofs for {chunk_count} chunks");
1309 let mut proofs = HashMap::with_capacity(chunk_count);
1310
1311 for (i, xorname) in xornames.iter().enumerate() {
1312 let address_proof = prepared
1313 .tree
1314 .generate_address_proof(i, *xorname)
1315 .map_err(|e| {
1316 Error::Payment(format!(
1317 "Failed to generate address proof for chunk {i}: {e}"
1318 ))
1319 })?;
1320
1321 let merkle_proof = MerklePaymentProof::new(*xorname, address_proof, winner_pool.clone());
1322
1323 let tagged_bytes = serialize_merkle_proof(&merkle_proof)
1324 .map_err(|e| Error::Serialization(format!("Failed to serialize merkle proof: {e}")))?;
1325
1326 proofs.insert(prepared.addresses[i], tagged_bytes);
1327 }
1328
1329 info!("Merkle batch payment complete: {chunk_count} proofs generated");
1330
1331 Ok(MerkleBatchPaymentResult {
1332 proofs,
1333 chunk_count,
1334 storage_cost_atto: "0".to_string(),
1335 gas_cost_wei: 0,
1336 merkle_payment_timestamp: prepared.merkle_payment_timestamp,
1337 })
1338}
1339
1340#[cfg(test)]
1342mod send_assertions {
1343 use super::*;
1344 use crate::data::client::Client;
1345
1346 fn _assert_send<T: Send>(_: &T) {}
1347
1348 #[allow(
1349 dead_code,
1350 unreachable_code,
1351 unused_variables,
1352 clippy::diverging_sub_expression
1353 )]
1354 async fn _merkle_upload_chunks_is_send(client: &Client) {
1355 let batch_result: MerkleBatchPaymentResult = todo!();
1356 let fut = client.merkle_upload_chunks(Vec::new(), Vec::new(), &batch_result, None, 0, 0);
1357 _assert_send(&fut);
1358 }
1359}
1360
1361#[cfg(test)]
1362#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1363mod tests {
1364 use super::*;
1365 use ant_protocol::evm::{Amount, MerkleTree, RewardsAddress, CANDIDATES_PER_POOL};
1366
1367 #[test]
1372 fn test_auto_below_threshold() {
1373 assert!(!should_use_merkle(1, PaymentMode::Auto));
1374 assert!(!should_use_merkle(10, PaymentMode::Auto));
1375 assert!(!should_use_merkle(63, PaymentMode::Auto));
1376 }
1377
1378 #[test]
1379 fn test_auto_at_and_above_threshold() {
1380 assert!(should_use_merkle(64, PaymentMode::Auto));
1381 assert!(should_use_merkle(65, PaymentMode::Auto));
1382 assert!(should_use_merkle(1000, PaymentMode::Auto));
1383 }
1384
1385 #[test]
1386 fn test_merkle_mode_forces_at_2() {
1387 assert!(!should_use_merkle(1, PaymentMode::Merkle));
1388 assert!(should_use_merkle(2, PaymentMode::Merkle));
1389 assert!(should_use_merkle(3, PaymentMode::Merkle));
1390 }
1391
1392 #[test]
1393 fn test_single_mode_always_false() {
1394 assert!(!should_use_merkle(0, PaymentMode::Single));
1395 assert!(!should_use_merkle(64, PaymentMode::Single));
1396 assert!(!should_use_merkle(1000, PaymentMode::Single));
1397 }
1398
1399 #[test]
1400 fn test_default_mode_is_auto() {
1401 assert_eq!(PaymentMode::default(), PaymentMode::Auto);
1402 }
1403
1404 #[test]
1405 fn test_threshold_value() {
1406 assert_eq!(DEFAULT_MERKLE_THRESHOLD, 64);
1407 }
1408
1409 #[test]
1414 fn test_preflight_quotes_gathered_means_not_stored() {
1415 assert!(matches!(preflight_stored_status(Ok(())), Ok(false)));
1416 }
1417
1418 #[test]
1419 fn test_preflight_already_stored_is_stored() {
1420 let r: Result<()> = Err(Error::AlreadyStored);
1421 assert!(matches!(preflight_stored_status(r), Ok(true)));
1422 }
1423
1424 #[test]
1428 fn test_preflight_transient_quote_failure_does_not_abort() {
1429 let insufficient: Result<()> =
1431 Err(Error::InsufficientPeers("Got 5 quotes, need 7".to_string()));
1432 assert!(
1433 matches!(preflight_stored_status(insufficient), Ok(false)),
1434 "insufficient-peers during preflight must degrade to not-stored, not error"
1435 );
1436
1437 let timeout: Result<()> = Err(Error::Timeout("Timeout waiting for quote".to_string()));
1438 assert!(matches!(preflight_stored_status(timeout), Ok(false)));
1439
1440 let network: Result<()> = Err(Error::Network("connection reset".to_string()));
1441 assert!(matches!(preflight_stored_status(network), Ok(false)));
1442 }
1443
1444 #[test]
1447 fn test_preflight_application_error_propagates() {
1448 let payment: Result<()> = Err(Error::Payment("bad payment".to_string()));
1449 assert!(matches!(
1450 preflight_stored_status(payment),
1451 Err(Error::Payment(_))
1452 ));
1453 }
1454
1455 #[test]
1456 fn chunk_contents_for_upload_addresses_preserves_requested_order() {
1457 let first = Bytes::from_static(b"first");
1458 let second = Bytes::from_static(b"second");
1459 let first_addr = compute_address(&first);
1460 let second_addr = compute_address(&second);
1461
1462 let selected = chunk_contents_for_upload_addresses(
1463 vec![first.clone(), second.clone()],
1464 &[second_addr, first_addr],
1465 )
1466 .unwrap();
1467
1468 assert_eq!(selected, vec![second, first]);
1469 }
1470
1471 #[test]
1472 fn chunk_contents_for_upload_addresses_preserves_duplicate_requests() {
1473 let repeated = Bytes::from_static(b"same-content");
1474 let other = Bytes::from_static(b"other-content");
1475 let repeated_addr = compute_address(&repeated);
1476
1477 let selected = chunk_contents_for_upload_addresses(
1478 vec![repeated.clone(), other, repeated.clone()],
1479 &[repeated_addr, repeated_addr],
1480 )
1481 .unwrap();
1482
1483 assert_eq!(selected, vec![repeated.clone(), repeated]);
1484 }
1485
1486 #[test]
1487 fn chunk_contents_for_upload_addresses_ignores_unrequested_duplicates() {
1488 let requested = Bytes::from_static(b"requested-content");
1489 let unrequested = Bytes::from_static(b"unrequested-content");
1490 let requested_addr = compute_address(&requested);
1491
1492 let selected = chunk_contents_for_upload_addresses(
1493 vec![
1494 unrequested.clone(),
1495 requested.clone(),
1496 unrequested.clone(),
1497 unrequested,
1498 ],
1499 &[requested_addr],
1500 )
1501 .unwrap();
1502
1503 assert_eq!(selected, vec![requested]);
1504 }
1505
1506 #[test]
1507 fn chunk_contents_for_upload_addresses_errors_for_missing_content() {
1508 let present = Bytes::from_static(b"present-content");
1509 let missing = Bytes::from_static(b"missing-content");
1510 let missing_addr = compute_address(&missing);
1511
1512 let result = chunk_contents_for_upload_addresses(vec![present], &[missing_addr]);
1513
1514 assert!(matches!(result, Err(Error::InvalidData(_))));
1515 }
1516
1517 fn make_test_addresses(count: usize) -> Vec<[u8; 32]> {
1522 (0..count)
1523 .map(|i| {
1524 let xn = XorName::from_content(&i.to_le_bytes());
1525 xn.0
1526 })
1527 .collect()
1528 }
1529
1530 #[test]
1531 fn test_tree_depth_for_known_sizes() {
1532 let cases = [(2, 1), (4, 2), (16, 4), (100, 7), (256, 8)];
1533 for (count, expected_depth) in cases {
1534 let addrs = make_test_addresses(count);
1535 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1536 let tree = MerkleTree::from_xornames(xornames).unwrap();
1537 assert_eq!(
1538 tree.depth(),
1539 expected_depth,
1540 "depth mismatch for {count} leaves"
1541 );
1542 }
1543 }
1544
1545 #[test]
1546 fn test_proof_generation_and_verification_for_all_leaves() {
1547 let addrs = make_test_addresses(16);
1548 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1549 let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1550
1551 for (i, xn) in xornames.iter().enumerate() {
1552 let proof = tree.generate_address_proof(i, *xn).unwrap();
1553 assert!(proof.verify(), "proof for leaf {i} should verify");
1554 assert_eq!(proof.depth(), tree.depth() as usize);
1555 }
1556 }
1557
1558 #[test]
1559 fn test_proof_fails_for_wrong_address() {
1560 let addrs = make_test_addresses(8);
1561 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1562 let tree = MerkleTree::from_xornames(xornames).unwrap();
1563
1564 let wrong = XorName::from_content(b"wrong");
1565 let proof = tree.generate_address_proof(0, wrong).unwrap();
1566 assert!(!proof.verify(), "proof with wrong address should fail");
1567 }
1568
1569 #[test]
1570 fn test_tree_too_few_leaves() {
1571 let xornames = vec![XorName::from_content(b"only_one")];
1572 let result = MerkleTree::from_xornames(xornames);
1573 assert!(result.is_err());
1574 }
1575
1576 #[test]
1577 fn test_tree_at_max_leaves() {
1578 let addrs = make_test_addresses(MAX_LEAVES);
1579 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1580 let tree = MerkleTree::from_xornames(xornames).unwrap();
1581 assert_eq!(tree.leaf_count(), MAX_LEAVES);
1582 }
1583
1584 #[test]
1589 fn test_merkle_proof_serialize_deserialize_roundtrip() {
1590 use ant_protocol::evm::{Amount, MerklePaymentCandidateNode, RewardsAddress};
1591 use ant_protocol::payment::{deserialize_merkle_proof, serialize_merkle_proof};
1592
1593 let addrs = make_test_addresses(4);
1594 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1595 let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1596
1597 let timestamp = std::time::SystemTime::now()
1598 .duration_since(std::time::UNIX_EPOCH)
1599 .unwrap()
1600 .as_secs();
1601
1602 let candidates = tree.reward_candidates(timestamp).unwrap();
1603 let midpoint = candidates.first().unwrap().clone();
1604
1605 #[allow(clippy::cast_possible_truncation)]
1607 let candidate_nodes: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
1608 std::array::from_fn(|i| MerklePaymentCandidateNode {
1609 pub_key: vec![i as u8; 32],
1610 price: Amount::from(1024u64),
1611 reward_address: RewardsAddress::new([i as u8; 20]),
1612 merkle_payment_timestamp: timestamp,
1613 signature: vec![i as u8; 64],
1614 });
1615
1616 let pool = MerklePaymentCandidatePool {
1617 midpoint_proof: midpoint,
1618 candidate_nodes,
1619 };
1620
1621 let address_proof = tree.generate_address_proof(0, xornames[0]).unwrap();
1622 let merkle_proof = MerklePaymentProof::new(xornames[0], address_proof, pool);
1623
1624 let tagged = serialize_merkle_proof(&merkle_proof).unwrap();
1625 assert_eq!(
1626 tagged.first().copied(),
1627 Some(0x02),
1628 "tag should be PROOF_TAG_MERKLE"
1629 );
1630
1631 let deserialized = deserialize_merkle_proof(&tagged).unwrap();
1632 assert_eq!(deserialized.address, merkle_proof.address);
1633 assert_eq!(
1634 deserialized.winner_pool.candidate_nodes.len(),
1635 CANDIDATES_PER_POOL
1636 );
1637 }
1638
1639 #[test]
1644 fn test_candidate_wrong_timestamp_rejected() {
1645 let candidate = MerklePaymentCandidateNode {
1647 pub_key: vec![0u8; 32],
1648 price: ant_protocol::evm::Amount::ZERO,
1649 reward_address: ant_protocol::evm::RewardsAddress::new([0u8; 20]),
1650 merkle_payment_timestamp: 1000,
1651 signature: vec![0u8; 64],
1652 };
1653
1654 assert_ne!(candidate.merkle_payment_timestamp, 2000);
1656 }
1657
1658 fn make_dummy_candidate_nodes(
1663 timestamp: u64,
1664 ) -> [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] {
1665 std::array::from_fn(|i| MerklePaymentCandidateNode {
1666 pub_key: vec![i as u8; 32],
1667 price: Amount::from(1024u64),
1668 reward_address: RewardsAddress::new([i as u8; 20]),
1669 merkle_payment_timestamp: timestamp,
1670 signature: vec![i as u8; 64],
1671 })
1672 }
1673
1674 fn make_prepared_merkle_batch(count: usize) -> PreparedMerkleBatch {
1675 let addrs = make_test_addresses(count);
1676 let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1677 let tree = MerkleTree::from_xornames(xornames).unwrap();
1678
1679 let timestamp = std::time::SystemTime::now()
1680 .duration_since(std::time::UNIX_EPOCH)
1681 .unwrap()
1682 .as_secs();
1683
1684 let midpoints = tree.reward_candidates(timestamp).unwrap();
1685
1686 let candidate_pools: Vec<MerklePaymentCandidatePool> = midpoints
1687 .into_iter()
1688 .map(|mp| MerklePaymentCandidatePool {
1689 midpoint_proof: mp,
1690 candidate_nodes: make_dummy_candidate_nodes(timestamp),
1691 })
1692 .collect();
1693
1694 let pool_commitments = candidate_pools
1695 .iter()
1696 .map(MerklePaymentCandidatePool::to_commitment)
1697 .collect();
1698
1699 PreparedMerkleBatch {
1700 depth: tree.depth(),
1701 pool_commitments,
1702 merkle_payment_timestamp: timestamp,
1703 candidate_pools,
1704 tree,
1705 addresses: addrs,
1706 }
1707 }
1708
1709 #[test]
1710 fn test_finalize_merkle_batch_with_valid_winner() {
1711 let prepared = make_prepared_merkle_batch(4);
1712 let winner_hash = prepared.candidate_pools[0].hash();
1713
1714 let result = finalize_merkle_batch(prepared, winner_hash);
1715 assert!(
1716 result.is_ok(),
1717 "should succeed with valid winner: {result:?}"
1718 );
1719
1720 let batch = result.unwrap();
1721 assert_eq!(batch.chunk_count, 4);
1722 assert_eq!(batch.proofs.len(), 4);
1723
1724 for proof_bytes in batch.proofs.values() {
1726 assert!(!proof_bytes.is_empty());
1727 }
1728 }
1729
1730 #[test]
1731 fn test_finalize_merkle_batch_with_invalid_winner() {
1732 let prepared = make_prepared_merkle_batch(4);
1733 let bad_hash = [0xFF; 32];
1734
1735 let result = finalize_merkle_batch(prepared, bad_hash);
1736 assert!(result.is_err());
1737 let err = result.unwrap_err().to_string();
1738 assert!(err.contains("not found in candidate pools"), "got: {err}");
1739 }
1740
1741 #[test]
1742 fn test_finalize_merkle_batch_proofs_are_deserializable() {
1743 use ant_protocol::payment::deserialize_merkle_proof;
1744
1745 let prepared = make_prepared_merkle_batch(8);
1746 let winner_hash = prepared.candidate_pools[0].hash();
1747
1748 let batch = finalize_merkle_batch(prepared, winner_hash).unwrap();
1749
1750 for (addr, proof_bytes) in &batch.proofs {
1751 let proof = deserialize_merkle_proof(proof_bytes);
1752 assert!(
1753 proof.is_ok(),
1754 "proof for {} should deserialize: {:?}",
1755 hex::encode(addr),
1756 proof.err()
1757 );
1758 }
1759 }
1760
1761 #[test]
1766 fn test_batch_split_calculation() {
1767 let addrs = make_test_addresses(MAX_LEAVES);
1769 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 1);
1770
1771 let addrs = make_test_addresses(MAX_LEAVES + 1);
1773 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 2);
1774
1775 let addrs = make_test_addresses(3 * MAX_LEAVES);
1777 assert_eq!(addrs.chunks(MAX_LEAVES).count(), 3);
1778 }
1779
1780 use std::sync::{Arc, Mutex};
1785
1786 fn make_addrs(count: usize) -> Vec<[u8; 32]> {
1789 make_test_addresses(count)
1790 }
1791
1792 #[tokio::test]
1796 async fn store_with_retry_collects_failures_instead_of_aborting() {
1797 let chunks = make_addrs(6);
1798 let failing: std::collections::HashSet<[u8; 32]> = chunks.iter().take(2).copied().collect();
1799 let failing_for_closure = failing.clone();
1800
1801 let store_one = move |addr: [u8; 32]| {
1802 let fail = failing_for_closure.contains(&addr);
1803 async move {
1804 if fail {
1805 Err(Error::InsufficientPeers("test shortfall".into()))
1806 } else {
1807 Ok(std::time::Instant::now())
1808 }
1809 }
1810 };
1811
1812 let outcome =
1813 merkle_store_with_retry(chunks, || 8, 1, Duration::ZERO, None, 0, 6, store_one)
1814 .await
1815 .expect("quorum shortfalls must not abort the batch");
1816
1817 assert_eq!(outcome.stored, 4);
1818 assert_eq!(outcome.failed, 2);
1819 assert_eq!(outcome.stats.retries_histogram[0], 4);
1821 assert_eq!(outcome.stats.chunk_attempts_total, 6);
1822 }
1823
1824 #[tokio::test]
1830 async fn store_with_retry_rereads_cap_per_slot() {
1831 let count = 6;
1832 let chunks = make_addrs(count);
1833 let cap_calls = Arc::new(Mutex::new(0usize));
1834 let cap_calls_for_closure = cap_calls.clone();
1835 let cap = move || {
1836 *cap_calls_for_closure.lock().expect("cap counter poisoned") += 1;
1837 2
1838 };
1839 let store_one = move |_addr: [u8; 32]| async move { Ok(std::time::Instant::now()) };
1840
1841 let outcome =
1842 merkle_store_with_retry(chunks, cap, 1, Duration::ZERO, None, 0, count, store_one)
1843 .await
1844 .expect("all stores succeed");
1845
1846 assert_eq!(outcome.stored, count);
1847 let calls = *cap_calls.lock().expect("cap counter poisoned");
1848 assert!(
1849 calls >= count,
1850 "cap must be re-read per drained slot (rolling), not snapshotted once — \
1851 expected >= {count} invocations, got {calls}",
1852 );
1853 }
1854
1855 #[tokio::test]
1862 async fn store_pass_has_no_barrier() {
1863 use std::sync::atomic::{AtomicUsize, Ordering};
1864 let count = 8;
1865 let addrs = make_addrs(count);
1866 let slow = addrs[0];
1867 let fast_completed = Arc::new(AtomicUsize::new(0));
1868 let release_slow = Arc::new(tokio::sync::Notify::new());
1869
1870 let store_one = move |addr: [u8; 32]| {
1871 let fast_completed = fast_completed.clone();
1872 let release_slow = release_slow.clone();
1873 async move {
1874 if addr == slow {
1875 release_slow.notified().await;
1879 } else if fast_completed.fetch_add(1, Ordering::SeqCst) + 1 == count - 1 {
1880 release_slow.notify_one();
1881 }
1882 Ok(std::time::Instant::now())
1883 }
1884 };
1885
1886 let outcome = tokio::time::timeout(
1887 Duration::from_secs(5),
1888 merkle_store_with_retry(addrs, || 8, 1, Duration::ZERO, None, 0, count, store_one),
1889 )
1890 .await
1891 .expect("store pass must not deadlock — a slow chunk must not block the others")
1892 .expect("all stores succeed");
1893
1894 assert_eq!(outcome.stored, count);
1895 }
1896
1897 #[tokio::test]
1902 async fn store_pass_keeps_at_most_cap_in_flight() {
1903 use std::sync::atomic::{AtomicUsize, Ordering};
1904 let count = 40;
1905 let cap = 4;
1906 let addrs = make_addrs(count);
1907 let in_flight = Arc::new(AtomicUsize::new(0));
1908 let max_in_flight = Arc::new(AtomicUsize::new(0));
1909 let max_in_flight_for_closure = max_in_flight.clone();
1910
1911 let store_one = move |_addr: [u8; 32]| {
1912 let in_flight = in_flight.clone();
1913 let max_in_flight = max_in_flight_for_closure.clone();
1914 async move {
1915 let now = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
1916 max_in_flight.fetch_max(now, Ordering::SeqCst);
1917 tokio::task::yield_now().await;
1920 in_flight.fetch_sub(1, Ordering::SeqCst);
1921 Ok(std::time::Instant::now())
1922 }
1923 };
1924
1925 let outcome = merkle_store_with_retry(
1926 addrs,
1927 move || cap,
1928 1,
1929 Duration::ZERO,
1930 None,
1931 0,
1932 count,
1933 store_one,
1934 )
1935 .await
1936 .expect("all stores succeed");
1937
1938 assert_eq!(outcome.stored, count);
1939 let peak = max_in_flight.load(Ordering::SeqCst);
1940 assert!(
1941 peak <= cap,
1942 "at most `cap` bodies may be in flight (memory bound), got peak {peak} > cap {cap}",
1943 );
1944 assert!(
1945 peak > 1,
1946 "the pass must actually run concurrently, not serialize (peak {peak})",
1947 );
1948 }
1949
1950 #[tokio::test]
1955 async fn store_with_retry_treats_remote_put_as_recoverable() {
1956 let chunks = make_addrs(6);
1957 let failing: std::collections::HashSet<[u8; 32]> = chunks.iter().take(2).copied().collect();
1958 let failing_for_closure = failing.clone();
1959
1960 let store_one = move |addr: [u8; 32]| {
1961 let fail = failing_for_closure.contains(&addr);
1962 async move {
1963 if fail {
1964 Err(Error::RemotePut {
1965 address: hex::encode(addr),
1966 source: ant_protocol::ProtocolError::StorageFailed(
1967 "insufficient disk space".into(),
1968 ),
1969 })
1970 } else {
1971 Ok(std::time::Instant::now())
1972 }
1973 }
1974 };
1975
1976 let outcome =
1977 merkle_store_with_retry(chunks, || 8, 1, Duration::ZERO, None, 0, 6, store_one)
1978 .await
1979 .expect("remote app-rejections must not abort the batch");
1980
1981 assert_eq!(outcome.stored, 4);
1982 assert_eq!(outcome.failed, 2);
1983 }
1984
1985 #[tokio::test]
1989 async fn store_with_retry_reports_non_quorum_errors_as_fatal() {
1990 let chunks = make_addrs(3);
1991 let store_one = |_addr: [u8; 32]| async move {
1992 Err::<std::time::Instant, _>(Error::Payment("missing proof".into()))
1993 };
1994
1995 let outcome =
1996 merkle_store_with_retry(chunks, || 8, 3, Duration::ZERO, None, 0, 3, store_one)
1997 .await
1998 .expect("fatal is carried in the outcome, not returned as Err");
1999 assert!(matches!(outcome.fatal, Some(Error::Payment(_))));
2000 }
2001
2002 #[tokio::test]
2007 async fn store_with_retry_fatal_preserves_same_pass_successes() {
2008 let chunks = make_addrs(6);
2009 let bad = chunks[5];
2010 let store_one = move |addr: [u8; 32]| async move {
2011 if addr == bad {
2012 Err(Error::Payment("fatal".into()))
2013 } else {
2014 Ok(std::time::Instant::now())
2015 }
2016 };
2017
2018 let outcome =
2019 merkle_store_with_retry(chunks, || 1, 1, Duration::ZERO, None, 0, 6, store_one)
2020 .await
2021 .expect("fatal carried in outcome, not returned as Err");
2022 assert!(matches!(outcome.fatal, Some(Error::Payment(_))));
2023 assert_eq!(outcome.stored, 5);
2025 assert_eq!(outcome.stored_addresses.len(), 5);
2026 assert!(!outcome.stored_addresses.contains(&bad));
2027 assert!(outcome.failed_addresses.iter().any(|(a, _)| *a == bad));
2029 }
2030
2031 #[tokio::test]
2033 async fn store_with_retry_retries_only_the_failed_set() {
2034 let chunks = make_addrs(5);
2035 let total = chunks.len();
2036 let failing: std::collections::HashSet<[u8; 32]> = chunks.iter().take(2).copied().collect();
2037 let failing_for_closure = failing.clone();
2038
2039 let calls = Arc::new(Mutex::new(Vec::<[u8; 32]>::new()));
2041 let calls_for_closure = calls.clone();
2042
2043 let store_one = move |addr: [u8; 32]| {
2044 let calls = calls_for_closure.clone();
2045 let already_seen = calls.lock().unwrap().iter().filter(|&&a| a == addr).count();
2047 let fail = failing_for_closure.contains(&addr) && already_seen == 0;
2048 calls.lock().unwrap().push(addr);
2049 async move {
2050 if fail {
2051 Err(Error::InsufficientPeers("round-1 shortfall".into()))
2052 } else {
2053 Ok(std::time::Instant::now())
2054 }
2055 }
2056 };
2057
2058 let outcome =
2059 merkle_store_with_retry(chunks, || 8, 3, Duration::ZERO, None, 0, total, store_one)
2060 .await
2061 .expect("should converge after retry");
2062
2063 assert_eq!(outcome.stored, total);
2064 assert_eq!(outcome.failed, 0);
2065
2066 let calls = calls.lock().unwrap();
2070 assert_eq!(calls.len(), total + failing.len());
2071 let round_two: std::collections::HashSet<[u8; 32]> =
2072 calls[total..].iter().copied().collect();
2073 assert_eq!(round_two, failing);
2074 }
2075
2076 #[tokio::test]
2079 async fn store_with_retry_counts_retry_success_once_in_histogram() {
2080 let chunks = make_addrs(4);
2081 let total = chunks.len();
2082 let flaky_addr = chunks[0];
2083
2084 let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2085 let attempts_for_closure = attempts.clone();
2086
2087 let store_one = move |addr: [u8; 32]| {
2088 let attempts = attempts_for_closure.clone();
2089 let n = {
2090 let mut m = attempts.lock().unwrap();
2091 let entry = m.entry(addr).or_insert(0);
2092 *entry += 1;
2093 *entry
2094 };
2095 let fail = addr == flaky_addr && n == 1;
2096 async move {
2097 if fail {
2098 Err(Error::InsufficientPeers("transient".into()))
2099 } else {
2100 Ok(std::time::Instant::now())
2101 }
2102 }
2103 };
2104
2105 let outcome =
2106 merkle_store_with_retry(chunks, || 8, 3, Duration::ZERO, None, 0, total, store_one)
2107 .await
2108 .expect("flaky chunk should recover on retry");
2109
2110 assert_eq!(outcome.stored, total);
2111 assert_eq!(outcome.failed, 0);
2112 assert_eq!(outcome.stats.retries_histogram[0], total - 1);
2114 assert_eq!(outcome.stats.retries_histogram[1], 1);
2115 assert_eq!(outcome.stats.chunk_attempts_total, total + 1);
2117 }
2118
2119 #[tokio::test]
2124 async fn store_with_retry_reports_all_failed_when_retries_exhausted() {
2125 let chunks = make_addrs(3);
2126 let total = chunks.len();
2127
2128 let store_one = |_addr: [u8; 32]| async move {
2129 Err::<std::time::Instant, _>(Error::InsufficientPeers("never converges".into()))
2130 };
2131
2132 let outcome = merkle_store_with_retry(
2133 chunks,
2134 || 8,
2135 MERKLE_STORE_MAX_ATTEMPTS,
2136 Duration::ZERO,
2137 None,
2138 0,
2139 total,
2140 store_one,
2141 )
2142 .await
2143 .expect("an exhausted retry budget is reported, not propagated as Err");
2144
2145 assert_eq!(outcome.stored, 0);
2146 assert_eq!(outcome.failed, total);
2147 assert_eq!(
2149 outcome.stats.chunk_attempts_total,
2150 total * MERKLE_STORE_MAX_ATTEMPTS
2151 );
2152 assert_eq!(outcome.stats.retries_histogram, [0; 4]);
2154 }
2155
2156 #[tokio::test]
2161 async fn store_with_retry_records_failed_addresses_when_exhausted() {
2162 let chunks = make_addrs(6);
2163 let failing: std::collections::HashSet<[u8; 32]> = chunks.iter().take(2).copied().collect();
2164 let failing_for_closure = failing.clone();
2165
2166 let store_one = move |addr: [u8; 32]| {
2167 let fail = failing_for_closure.contains(&addr);
2168 async move {
2169 if fail {
2170 Err(Error::InsufficientPeers("permanent shortfall".into()))
2171 } else {
2172 Ok(std::time::Instant::now())
2173 }
2174 }
2175 };
2176
2177 let outcome = merkle_store_with_retry(
2178 chunks,
2179 || 8,
2180 MERKLE_STORE_MAX_ATTEMPTS,
2181 Duration::ZERO,
2182 None,
2183 0,
2184 6,
2185 store_one,
2186 )
2187 .await
2188 .expect("quorum shortfalls must not abort the batch");
2189
2190 assert_eq!(outcome.stored, 4);
2191 assert_eq!(outcome.failed, 2);
2192 assert_eq!(outcome.failed_addresses.len(), 2);
2194 let reported: std::collections::HashSet<[u8; 32]> =
2195 outcome.failed_addresses.iter().map(|(a, _)| *a).collect();
2196 assert_eq!(reported, failing);
2197 for (_, msg) in &outcome.failed_addresses {
2199 assert!(msg.contains("permanent shortfall"));
2200 }
2201 }
2202
2203 #[tokio::test]
2206 async fn store_with_retry_failed_addresses_empty_on_full_success() {
2207 let chunks = make_addrs(4);
2208 let total = chunks.len();
2209 let store_one = |_addr: [u8; 32]| async move { Ok(std::time::Instant::now()) };
2210
2211 let outcome = merkle_store_with_retry(
2212 chunks,
2213 || 8,
2214 MERKLE_STORE_MAX_ATTEMPTS,
2215 Duration::ZERO,
2216 None,
2217 0,
2218 total,
2219 store_one,
2220 )
2221 .await
2222 .expect("all chunks store");
2223
2224 assert_eq!(outcome.stored, total);
2225 assert_eq!(outcome.failed, 0);
2226 assert!(outcome.failed_addresses.is_empty());
2227 }
2228
2229 #[test]
2236 fn deferred_round_histogram_slot_maps_and_clamps() {
2237 assert_eq!(deferred_round_histogram_slot(0, 4), 1);
2238 assert_eq!(deferred_round_histogram_slot(1, 4), 2);
2239 assert_eq!(deferred_round_histogram_slot(2, 4), 3);
2240 assert_eq!(deferred_round_histogram_slot(3, 4), 3);
2242 assert_eq!(deferred_round_histogram_slot(9, 4), 3);
2243 }
2244
2245 fn deferred_set(count: usize) -> Vec<([u8; 32], String)> {
2246 make_test_addresses(count)
2247 .into_iter()
2248 .map(|addr| (addr, "short of quorum".to_string()))
2249 .collect()
2250 }
2251
2252 #[tokio::test]
2256 async fn deferred_retry_succeeds_on_a_later_round() {
2257 let deferred = deferred_set(3);
2258 let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2261 let attempts_for_closure = attempts.clone();
2262 let store_one = move |addr: [u8; 32]| {
2263 let attempts = attempts_for_closure.clone();
2264 async move {
2265 let n = {
2266 let mut map = attempts.lock().unwrap();
2267 let e = map.entry(addr).or_insert(0);
2268 *e += 1;
2269 *e
2270 };
2271 if n < 2 {
2272 Err(Error::InsufficientPeers("still short".into()))
2273 } else {
2274 Ok(std::time::Instant::now())
2275 }
2276 }
2277 };
2278
2279 let outcome = merkle_deferred_retry(
2280 deferred,
2281 &[0, 0, 0],
2282 |n: usize| n.max(1),
2283 None,
2284 0,
2285 3,
2286 store_one,
2287 )
2288 .await
2289 .expect("deferred retry must not abort on quorum shortfalls");
2290
2291 assert_eq!(outcome.stored, 3, "all three land by round 1");
2292 assert_eq!(outcome.stored_addresses.len(), 3);
2293 assert_eq!(outcome.failed, 0);
2294 assert!(outcome.failed_addresses.is_empty());
2295 assert!(outcome.fatal.is_none());
2296 assert_eq!(outcome.stats.retries_histogram[1], 0);
2298 assert_eq!(outcome.stats.retries_histogram[2], 3);
2299 assert_eq!(outcome.stats.chunk_attempts_total, 6);
2301 }
2302
2303 #[tokio::test]
2306 async fn deferred_retry_leftovers_become_failed() {
2307 let deferred = deferred_set(2);
2308 let store_one = |_addr: [u8; 32]| async move {
2309 Err::<std::time::Instant, _>(Error::InsufficientPeers("always short".into()))
2310 };
2311
2312 let outcome = merkle_deferred_retry(
2313 deferred,
2314 &[0, 0, 0],
2315 |n: usize| n.max(1),
2316 None,
2317 0,
2318 2,
2319 store_one,
2320 )
2321 .await
2322 .expect("exhausted retries report failures, not an error");
2323
2324 assert_eq!(outcome.stored, 0);
2325 assert!(outcome.stored_addresses.is_empty());
2326 assert_eq!(outcome.failed, 2);
2327 assert_eq!(outcome.failed_addresses.len(), 2);
2328 assert!(outcome.fatal.is_none());
2329 assert_eq!(outcome.stats.chunk_attempts_total, 6);
2331 }
2332
2333 #[tokio::test]
2338 async fn deferred_retry_fatal_error_preserves_prior_progress() {
2339 let addrs = make_test_addresses(2);
2340 let good = addrs[0];
2341 let bad = addrs[1];
2342 let deferred = vec![(good, "short".to_string()), (bad, "short".to_string())];
2343
2344 let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2347 let attempts_for_closure = attempts.clone();
2348 let store_one = move |addr: [u8; 32]| {
2349 let attempts = attempts_for_closure.clone();
2350 async move {
2351 let n = {
2352 let mut map = attempts.lock().unwrap();
2353 let e = map.entry(addr).or_insert(0);
2354 *e += 1;
2355 *e
2356 };
2357 if addr == good {
2358 Ok(std::time::Instant::now())
2359 } else if n == 1 {
2360 Err(Error::InsufficientPeers("short".into()))
2361 } else {
2362 Err(Error::Payment("fatal on retry".into()))
2363 }
2364 }
2365 };
2366
2367 let outcome = merkle_deferred_retry(
2368 deferred,
2369 &[0, 0, 0],
2370 |n: usize| n.max(1),
2371 None,
2372 0,
2373 2,
2374 store_one,
2375 )
2376 .await
2377 .expect("a fatal round error is reported via `fatal`, not as Err");
2378
2379 assert!(outcome.fatal.is_some(), "fatal error must be captured");
2380 assert_eq!(outcome.stored, 1, "round-0 success preserved");
2381 assert_eq!(outcome.stored_addresses, vec![good]);
2382 assert_eq!(outcome.failed, 1);
2383 assert_eq!(outcome.failed_addresses.len(), 1);
2384 assert_eq!(outcome.failed_addresses[0].0, bad);
2385 }
2386
2387 #[tokio::test]
2389 async fn deferred_retry_empty_set_is_a_noop() {
2390 let store_one = |_addr: [u8; 32]| async move {
2391 Err::<std::time::Instant, _>(Error::InsufficientPeers("unused".into()))
2392 };
2393
2394 let outcome = merkle_deferred_retry(
2395 Vec::new(),
2396 &DEFERRED_ROUND_DELAYS_SECS,
2397 |n: usize| n.max(1),
2398 None,
2399 7,
2400 7,
2401 store_one,
2402 )
2403 .await
2404 .expect("empty deferred set is a no-op");
2405
2406 assert_eq!(outcome.stored, 7, "stored_offset carried through unchanged");
2407 assert_eq!(outcome.failed, 0);
2408 assert!(outcome.stored_addresses.is_empty());
2409 assert!(outcome.failed_addresses.is_empty());
2410 assert!(outcome.fatal.is_none());
2411 }
2412}