Skip to main content

ant_core/data/client/
merkle.rs

1//! Merkle batch payment support for the Autonomi client.
2//!
3//! When uploading batches of 64+ chunks, merkle payments reduce gas costs
4//! by paying for the entire batch in a single on-chain transaction instead
5//! of one transaction per chunk.
6
7use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{
13    Amount, MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree,
14    MidpointProof, PoolCommitment, CANDIDATES_PER_POOL, MAX_LEAVES,
15};
16use ant_protocol::payment::{serialize_merkle_proof, verify_merkle_candidate_signature};
17use ant_protocol::transport::PeerId;
18use ant_protocol::{
19    compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
20    MerkleCandidateQuoteRequest, MerkleCandidateQuoteResponse,
21};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use rand::Rng;
25use std::collections::{HashMap, VecDeque};
26use std::time::Duration;
27use tokio::sync::mpsc;
28use tracing::{debug, info, warn};
29use xor_name::XorName;
30
31/// Default threshold: use merkle payments when chunk count >= this value.
32pub const DEFAULT_MERKLE_THRESHOLD: usize = 64;
33
34/// Payment mode for uploads.
35#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum PaymentMode {
38    /// Automatically choose: merkle for batches >= threshold, single otherwise.
39    #[default]
40    Auto,
41    /// Force merkle batch payment regardless of batch size (min 2 chunks).
42    Merkle,
43    /// Force single-node payment (one tx per chunk).
44    Single,
45}
46
47/// Result of a merkle batch payment.
48///
49/// Serializable so it can be persisted across runs for resume after a
50/// partial-upload failure. See `crate::data::client::cached_merkle`.
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct MerkleBatchPaymentResult {
53    /// Map of `XorName` to serialized tagged proof bytes (ready to use in PUT requests).
54    pub proofs: HashMap<[u8; 32], Vec<u8>>,
55    /// Number of chunks in the batch.
56    pub chunk_count: usize,
57    /// Total storage cost in atto (token smallest unit).
58    pub storage_cost_atto: String,
59    /// Total gas cost in wei.
60    pub gas_cost_wei: u128,
61    /// Unix timestamp (seconds) used for the on-chain merkle payment.
62    /// Persisted so resume can check whether the on-chain payment has
63    /// aged out beyond the merkle expiration window and the cached
64    /// receipt must be discarded.
65    #[serde(default)]
66    pub merkle_payment_timestamp: u64,
67}
68
69/// Prepared merkle batch ready for external payment.
70///
71/// Contains everything needed to submit the on-chain merkle payment
72/// and then finalize proof generation without a wallet.
73pub struct PreparedMerkleBatch {
74    /// Merkle tree depth (needed for the on-chain call).
75    pub depth: u8,
76    /// Pool commitments for the on-chain call.
77    pub pool_commitments: Vec<PoolCommitment>,
78    /// Timestamp used for the merkle payment.
79    pub merkle_payment_timestamp: u64,
80    /// Internal: candidate pools (needed for proof generation after payment).
81    candidate_pools: Vec<MerklePaymentCandidatePool>,
82    /// Internal: the merkle tree (needed for proof generation).
83    tree: MerkleTree,
84    /// Internal: chunk addresses in order.
85    addresses: Vec<[u8; 32]>,
86}
87
88/// Result of checking a merkle upload batch before payment.
89#[derive(Debug, Clone, Default)]
90pub(crate) struct MerkleUploadPlan {
91    /// Chunks already confirmed by their close group.
92    pub already_stored: Vec<[u8; 32]>,
93    /// Chunks that still need payment and storage.
94    pub to_upload: Vec<[u8; 32]>,
95    /// Total byte size of chunks in `to_upload`.
96    to_upload_total_bytes: u64,
97}
98
99impl MerkleUploadPlan {
100    /// Average byte size of chunks that still need upload.
101    #[must_use]
102    pub fn to_upload_avg_size(&self) -> u64 {
103        if self.to_upload.is_empty() {
104            return 0;
105        }
106
107        self.to_upload_total_bytes / self.to_upload.len() as u64
108    }
109}
110
111impl std::fmt::Debug for PreparedMerkleBatch {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("PreparedMerkleBatch")
114            .field("depth", &self.depth)
115            .field("pool_commitments", &self.pool_commitments.len())
116            .field("merkle_payment_timestamp", &self.merkle_payment_timestamp)
117            .field("candidate_pools", &self.candidate_pools.len())
118            .field("addresses", &self.addresses.len())
119            .finish()
120    }
121}
122
123/// Select chunk contents that correspond to `addresses`, preserving address order.
124///
125/// Extra chunk contents are ignored; missing contents for any requested address
126/// are treated as corrupted upload state.
127pub(crate) fn chunk_contents_for_upload_addresses(
128    chunk_contents: Vec<Bytes>,
129    addresses: &[[u8; 32]],
130) -> Result<Vec<Bytes>> {
131    if addresses.is_empty() {
132        return Ok(Vec::new());
133    }
134
135    let mut needed_by_address: HashMap<[u8; 32], usize> = HashMap::new();
136    for address in addresses {
137        *needed_by_address.entry(*address).or_default() += 1;
138    }
139
140    let mut chunks_by_address: HashMap<[u8; 32], VecDeque<Bytes>> =
141        HashMap::with_capacity(needed_by_address.len());
142    let mut remaining = addresses.len();
143    for chunk in chunk_contents {
144        let address = compute_address(&chunk);
145        if let Some(needed) = needed_by_address.get_mut(&address) {
146            if *needed > 0 {
147                chunks_by_address
148                    .entry(address)
149                    .or_default()
150                    .push_back(chunk);
151                *needed -= 1;
152                remaining -= 1;
153                if remaining == 0 {
154                    break;
155                }
156            }
157        }
158    }
159
160    for (address, needed) in &needed_by_address {
161        if *needed == 0 {
162            continue;
163        }
164
165        if chunks_by_address.contains_key(address) {
166            return Err(Error::InvalidData(format!(
167                "missing duplicate chunk content for merkle address {}",
168                hex::encode(address)
169            )));
170        }
171
172        return Err(Error::InvalidData(format!(
173            "missing chunk content for merkle address {}",
174            hex::encode(address)
175        )));
176    }
177
178    let mut selected = Vec::with_capacity(addresses.len());
179    for address in addresses {
180        let chunks = chunks_by_address.get_mut(address).ok_or_else(|| {
181            Error::InvalidData(format!(
182                "missing chunk content for merkle address {}",
183                hex::encode(address)
184            ))
185        })?;
186        let chunk = chunks.pop_front().ok_or_else(|| {
187            Error::InvalidData(format!(
188                "missing duplicate chunk content for merkle address {}",
189                hex::encode(address)
190            ))
191        })?;
192        selected.push(chunk);
193    }
194
195    Ok(selected)
196}
197
198/// Determine whether to use merkle payments for a given batch size.
199/// Free function — no Client needed.
200#[must_use]
201pub fn should_use_merkle(chunk_count: usize, mode: PaymentMode) -> bool {
202    match mode {
203        PaymentMode::Auto => chunk_count >= DEFAULT_MERKLE_THRESHOLD,
204        PaymentMode::Merkle => chunk_count >= 2,
205        PaymentMode::Single => false,
206    }
207}
208
209impl Client {
210    /// Determine whether to use merkle payments for a given batch size.
211    #[must_use]
212    pub fn should_use_merkle(&self, chunk_count: usize, mode: PaymentMode) -> bool {
213        should_use_merkle(chunk_count, mode)
214    }
215
216    /// Pay for a batch of chunks using merkle batch payment.
217    ///
218    /// Builds a merkle tree, collects candidate pools, pays on-chain in one tx,
219    /// and returns per-chunk proofs. Splits into sub-batches if > `MAX_LEAVES`.
220    ///
221    /// This low-level helper assumes the caller has already selected the
222    /// addresses that need payment. User-facing upload paths first run the
223    /// merkle upload planner to skip chunks already stored on the network.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the batch is too small, candidate collection fails,
228    /// on-chain payment fails, or proof generation fails.
229    pub async fn pay_for_merkle_batch(
230        &self,
231        addresses: &[[u8; 32]],
232        data_type: u32,
233        data_size: u64,
234    ) -> Result<MerkleBatchPaymentResult> {
235        let chunk_count = addresses.len();
236        if chunk_count < 2 {
237            return Err(Error::Payment(
238                "Merkle batch payment requires at least 2 chunks".to_string(),
239            ));
240        }
241
242        if chunk_count > MAX_LEAVES {
243            return self
244                .pay_for_merkle_multi_batch(addresses, data_type, data_size)
245                .await;
246        }
247
248        self.pay_for_merkle_single_batch(addresses, data_type, data_size)
249            .await
250    }
251
252    /// Check which chunks in a merkle upload still need payment/storage.
253    ///
254    /// Uses the normal per-chunk quote path because it already has the
255    /// close-group majority rule for `AlreadyStored`. Non-stored chunks only
256    /// use the quote response as a probe; their actual payment still happens
257    /// through the merkle batch.
258    ///
259    /// `chunks` contains `(address, data_size)` pairs.
260    pub(crate) async fn plan_merkle_upload(
261        &self,
262        chunks: Vec<([u8; 32], u64)>,
263        data_type: u32,
264        progress: Option<&mpsc::Sender<UploadEvent>>,
265    ) -> Result<MerkleUploadPlan> {
266        let total_chunks = chunks.len();
267        if total_chunks == 0 {
268            return Ok(MerkleUploadPlan::default());
269        }
270
271        info!("Checking {total_chunks} merkle chunks for existing storage before payment");
272
273        let quote_limiter = self.controller().quote.clone();
274        let quote_concurrency = quote_limiter.current().min(total_chunks.max(1));
275        let mut check_stream = stream::iter(chunks.into_iter().enumerate())
276            .map(|(index, (address, data_size))| {
277                let limiter = quote_limiter.clone();
278                async move {
279                    let result = observe_op(
280                        &limiter,
281                        || async move {
282                            self.chunk_already_stored_for_merkle(&address, data_type, data_size)
283                                .await
284                        },
285                        classify_error,
286                    )
287                    .await;
288                    (index, address, data_size, result)
289                }
290            })
291            .buffer_unordered(quote_concurrency);
292
293        let mut already_stored: Vec<(usize, [u8; 32])> = Vec::new();
294        let mut to_upload: Vec<(usize, [u8; 32], u64)> = Vec::new();
295        let mut checked = 0usize;
296
297        while let Some((index, address, data_size, result)) = check_stream.next().await {
298            let is_already_stored = result?;
299            checked += 1;
300
301            if let Some(tx) = progress {
302                let _ = tx.try_send(UploadEvent::ChunkQuoted {
303                    quoted: checked,
304                    total: total_chunks,
305                });
306            }
307
308            if is_already_stored {
309                debug!(
310                    "Merkle preflight {checked}/{total_chunks}: chunk {} already stored",
311                    hex::encode(address)
312                );
313                already_stored.push((index, address));
314                if let Some(tx) = progress {
315                    let _ = tx.try_send(UploadEvent::ChunkStored {
316                        stored: already_stored.len(),
317                        total: total_chunks,
318                    });
319                }
320            } else {
321                debug!(
322                    "Merkle preflight {checked}/{total_chunks}: chunk {} needs upload",
323                    hex::encode(address)
324                );
325                to_upload.push((index, address, data_size));
326            }
327        }
328
329        already_stored.sort_by_key(|(index, _)| *index);
330        to_upload.sort_by_key(|(index, _, _)| *index);
331
332        let to_upload_total_bytes = to_upload.iter().fold(0u64, |acc, (_, _, data_size)| {
333            acc.saturating_add(*data_size)
334        });
335
336        let already_stored = already_stored
337            .into_iter()
338            .map(|(_, address)| address)
339            .collect::<Vec<_>>();
340        let to_upload = to_upload
341            .into_iter()
342            .map(|(_, address, _)| address)
343            .collect::<Vec<_>>();
344
345        info!(
346            "Merkle preflight complete: {} already stored, {} need upload",
347            already_stored.len(),
348            to_upload.len()
349        );
350
351        Ok(MerkleUploadPlan {
352            already_stored,
353            to_upload,
354            to_upload_total_bytes,
355        })
356    }
357
358    async fn chunk_already_stored_for_merkle(
359        &self,
360        address: &[u8; 32],
361        data_type: u32,
362        data_size: u64,
363    ) -> Result<bool> {
364        match self.get_store_quotes(address, data_size, data_type).await {
365            Ok(_) => Ok(false),
366            Err(Error::AlreadyStored) => Ok(true),
367            Err(e) => Err(e),
368        }
369    }
370
371    /// Phase 1 of external-signer merkle payment: prepare batch without paying.
372    ///
373    /// Builds the merkle tree, collects candidate pools from the network,
374    /// and returns the data needed for the on-chain payment call.
375    /// Requires `EvmNetwork` but NOT a wallet.
376    pub async fn prepare_merkle_batch_external(
377        &self,
378        addresses: &[[u8; 32]],
379        data_type: u32,
380        data_size: u64,
381    ) -> Result<PreparedMerkleBatch> {
382        let chunk_count = addresses.len();
383        let xornames: Vec<XorName> = addresses.iter().map(|a| XorName(*a)).collect();
384
385        debug!("Building merkle tree for {chunk_count} chunks");
386
387        // 1. Build merkle tree
388        let tree = MerkleTree::from_xornames(xornames)
389            .map_err(|e| Error::Payment(format!("Failed to build merkle tree: {e}")))?;
390
391        let depth = tree.depth();
392        let merkle_payment_timestamp = std::time::SystemTime::now()
393            .duration_since(std::time::UNIX_EPOCH)
394            .map_err(|e| Error::Payment(format!("System time error: {e}")))?
395            .as_secs();
396
397        debug!("Merkle tree: depth={depth}, leaves={chunk_count}, ts={merkle_payment_timestamp}");
398
399        // 2. Get reward candidates (midpoint proofs)
400        let midpoint_proofs = tree
401            .reward_candidates(merkle_payment_timestamp)
402            .map_err(|e| Error::Payment(format!("Failed to generate reward candidates: {e}")))?;
403
404        debug!(
405            "Collecting candidate pools from {} midpoints (concurrent)",
406            midpoint_proofs.len()
407        );
408
409        // 3. Collect candidate pools from the network (all pools in parallel)
410        let candidate_pools = self
411            .build_candidate_pools(
412                &midpoint_proofs,
413                data_type,
414                data_size,
415                merkle_payment_timestamp,
416            )
417            .await?;
418
419        // 4. Build pool commitments for on-chain payment
420        let pool_commitments: Vec<PoolCommitment> = candidate_pools
421            .iter()
422            .map(MerklePaymentCandidatePool::to_commitment)
423            .collect();
424
425        Ok(PreparedMerkleBatch {
426            depth,
427            pool_commitments,
428            merkle_payment_timestamp,
429            candidate_pools,
430            tree,
431            addresses: addresses.to_vec(),
432        })
433    }
434
435    /// Pay for a single batch (up to `MAX_LEAVES` chunks).
436    async fn pay_for_merkle_single_batch(
437        &self,
438        addresses: &[[u8; 32]],
439        data_type: u32,
440        data_size: u64,
441    ) -> Result<MerkleBatchPaymentResult> {
442        let wallet = self.require_wallet()?;
443        let prepared = self
444            .prepare_merkle_batch_external(addresses, data_type, data_size)
445            .await?;
446
447        info!(
448            "Submitting merkle batch payment on-chain (depth={})",
449            prepared.depth
450        );
451        let (winner_pool_hash, amount, gas_info) = wallet
452            .pay_for_merkle_tree(
453                prepared.depth,
454                prepared.pool_commitments.clone(),
455                prepared.merkle_payment_timestamp,
456            )
457            .await
458            .map_err(|e| Error::Payment(format!("Merkle batch payment failed: {e}")))?;
459
460        info!(
461            "Merkle payment succeeded: winner pool {}",
462            hex::encode(winner_pool_hash)
463        );
464
465        let mut result = finalize_merkle_batch(prepared, winner_pool_hash)?;
466        result.storage_cost_atto = amount.to_string();
467        result.gas_cost_wei = gas_info.gas_cost_wei;
468        Ok(result)
469    }
470
471    /// Handle batches larger than `MAX_LEAVES` by splitting into sub-batches.
472    async fn pay_for_merkle_multi_batch(
473        &self,
474        addresses: &[[u8; 32]],
475        data_type: u32,
476        data_size: u64,
477    ) -> Result<MerkleBatchPaymentResult> {
478        let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect();
479        let total_sub_batches = sub_batches.len();
480        let mut all_proofs = HashMap::with_capacity(addresses.len());
481        let mut total_storage = Amount::ZERO;
482        let mut total_gas: u128 = 0;
483        // Track the oldest sub-batch timestamp so the overall receipt
484        // expires when the *first* sub-batch's on-chain payment ages
485        // out (worst case for resume).
486        let mut oldest_ts: u64 = 0;
487
488        for (i, chunk) in sub_batches.into_iter().enumerate() {
489            match self
490                .pay_for_merkle_single_batch(chunk, data_type, data_size)
491                .await
492            {
493                Ok(sub_result) => {
494                    if let Ok(cost) = sub_result.storage_cost_atto.parse::<Amount>() {
495                        total_storage += cost;
496                    }
497                    total_gas = total_gas.saturating_add(sub_result.gas_cost_wei);
498                    if oldest_ts == 0
499                        || (sub_result.merkle_payment_timestamp > 0
500                            && sub_result.merkle_payment_timestamp < oldest_ts)
501                    {
502                        oldest_ts = sub_result.merkle_payment_timestamp;
503                    }
504                    all_proofs.extend(sub_result.proofs);
505                }
506                Err(e) => {
507                    if all_proofs.is_empty() {
508                        // First sub-batch failed, nothing paid yet -- propagate directly.
509                        return Err(e);
510                    }
511                    // Return partial result so caller can still store already-paid chunks.
512                    warn!(
513                        "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \
514                         Returning {} proofs from prior sub-batches",
515                        i + 1,
516                        all_proofs.len()
517                    );
518                    return Ok(MerkleBatchPaymentResult {
519                        chunk_count: all_proofs.len(),
520                        proofs: all_proofs,
521                        storage_cost_atto: total_storage.to_string(),
522                        gas_cost_wei: total_gas,
523                        merkle_payment_timestamp: oldest_ts,
524                    });
525                }
526            }
527        }
528
529        Ok(MerkleBatchPaymentResult {
530            chunk_count: addresses.len(),
531            proofs: all_proofs,
532            storage_cost_atto: total_storage.to_string(),
533            gas_cost_wei: total_gas,
534            merkle_payment_timestamp: oldest_ts,
535        })
536    }
537
538    /// Build candidate pools by querying the network for each midpoint (concurrently).
539    async fn build_candidate_pools(
540        &self,
541        midpoint_proofs: &[MidpointProof],
542        data_type: u32,
543        data_size: u64,
544        merkle_payment_timestamp: u64,
545    ) -> Result<Vec<MerklePaymentCandidatePool>> {
546        let mut pool_futures = FuturesUnordered::new();
547
548        for midpoint_proof in midpoint_proofs {
549            let pool_address = midpoint_proof.address();
550            let mp = midpoint_proof.clone();
551            pool_futures.push(async move {
552                let candidate_nodes = self
553                    .get_merkle_candidate_pool(
554                        &pool_address.0,
555                        data_type,
556                        data_size,
557                        merkle_payment_timestamp,
558                    )
559                    .await?;
560                Ok::<_, Error>(MerklePaymentCandidatePool {
561                    midpoint_proof: mp,
562                    candidate_nodes,
563                })
564            });
565        }
566
567        let mut pools = Vec::with_capacity(midpoint_proofs.len());
568        while let Some(result) = pool_futures.next().await {
569            pools.push(result?);
570        }
571
572        Ok(pools)
573    }
574
575    /// Collect `CANDIDATES_PER_POOL` (16) merkle candidate quotes from the network.
576    #[allow(clippy::too_many_lines)]
577    async fn get_merkle_candidate_pool(
578        &self,
579        address: &[u8; 32],
580        data_type: u32,
581        data_size: u64,
582        merkle_payment_timestamp: u64,
583    ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
584        let node = self.network().node();
585        let timeout = Duration::from_secs(self.config().quote_timeout_secs);
586
587        // Query extra peers to handle validation failures (bad sigs, wrong type, etc.)
588        let query_count = CANDIDATES_PER_POOL * 2;
589        let mut remote_peers = self
590            .network()
591            .find_closest_peers(address, query_count)
592            .await?;
593
594        // If DHT closest-nodes didn't return enough, supplement with connected peers.
595        // On small networks the DHT iterative lookup may not discover enough peers
596        // close to a random pool address, but we know more peers via direct connections.
597        if remote_peers.len() < CANDIDATES_PER_POOL {
598            let connected = self.network().connected_peers().await;
599            for peer in connected {
600                if !remote_peers.iter().any(|(id, _)| *id == peer) {
601                    remote_peers.push((peer, vec![]));
602                }
603            }
604        }
605
606        if remote_peers.len() < CANDIDATES_PER_POOL {
607            return Err(Error::InsufficientPeers(format!(
608                "Found {} peers, need {CANDIDATES_PER_POOL} for merkle candidate pool. \
609                 Use --no-merkle or a larger network.",
610                remote_peers.len()
611            )));
612        }
613
614        let mut candidate_futures = FuturesUnordered::new();
615
616        for (peer_id, peer_addrs) in &remote_peers {
617            let request_id = self.next_request_id();
618            let request = MerkleCandidateQuoteRequest {
619                address: *address,
620                data_type,
621                data_size,
622                merkle_payment_timestamp,
623            };
624            let message = ChunkMessage {
625                request_id,
626                body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
627            };
628
629            let message_bytes = match message.encode() {
630                Ok(bytes) => bytes,
631                Err(e) => {
632                    warn!("Failed to encode merkle candidate request for {peer_id}: {e}");
633                    continue;
634                }
635            };
636
637            let peer_id_clone = *peer_id;
638            let addrs_clone = peer_addrs.clone();
639            let node_clone = node.clone();
640
641            let fut = async move {
642                let result = send_and_await_chunk_response(
643                    &node_clone,
644                    &peer_id_clone,
645                    message_bytes,
646                    request_id,
647                    timeout,
648                    &addrs_clone,
649                    |body| match body {
650                        ChunkMessageBody::MerkleCandidateQuoteResponse(
651                            MerkleCandidateQuoteResponse::Success { candidate_node },
652                        ) => {
653                            match rmp_serde::from_slice::<MerklePaymentCandidateNode>(
654                                &candidate_node,
655                            ) {
656                                Ok(node) => Some(Ok(node)),
657                                Err(e) => Some(Err(Error::Serialization(format!(
658                                    "Failed to deserialize candidate node from {peer_id_clone}: {e}"
659                                )))),
660                            }
661                        }
662                        ChunkMessageBody::MerkleCandidateQuoteResponse(
663                            MerkleCandidateQuoteResponse::Error(e),
664                        ) => Some(Err(Error::Protocol(format!(
665                            "Merkle quote error from {peer_id_clone}: {e}"
666                        )))),
667                        _ => None,
668                    },
669                    |e| {
670                        Error::Network(format!(
671                            "Failed to send merkle candidate request to {peer_id_clone}: {e}"
672                        ))
673                    },
674                    || {
675                        Error::Timeout(format!(
676                            "Timeout waiting for merkle candidate from {peer_id_clone}"
677                        ))
678                    },
679                )
680                .await;
681
682                (peer_id_clone, result)
683            };
684
685            candidate_futures.push(fut);
686        }
687
688        self.collect_validated_candidates(&mut candidate_futures, address, merkle_payment_timestamp)
689            .await
690    }
691
692    /// Collect and validate merkle candidate responses, then return the
693    /// `CANDIDATES_PER_POOL` valid responders that are XOR-closest to the
694    /// pool midpoint.
695    ///
696    /// Why distance-sort instead of "first N to respond":
697    /// the storing-node verifier re-runs a network closest-peers lookup of
698    /// the pool midpoint and rejects the pool if fewer than 13 of the 16
699    /// candidate `pub_keys` appear in that authoritative close-set. Pools
700    /// built from the fastest-to-respond quoters fail this check whenever
701    /// truly-close peers are slower (NAT/relay paths) than farther peers.
702    async fn collect_validated_candidates(
703        &self,
704        futures: &mut FuturesUnordered<
705            impl std::future::Future<
706                Output = (
707                    PeerId,
708                    std::result::Result<MerklePaymentCandidateNode, Error>,
709                ),
710            >,
711        >,
712        target_address: &[u8; 32],
713        merkle_payment_timestamp: u64,
714    ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
715        let mut valid: Vec<(PeerId, MerklePaymentCandidateNode)> = Vec::new();
716        let mut failures: Vec<String> = Vec::new();
717
718        while let Some((peer_id, result)) = futures.next().await {
719            match result {
720                Ok(candidate) => {
721                    if !verify_merkle_candidate_signature(&candidate) {
722                        warn!("Invalid ML-DSA-65 signature from merkle candidate {peer_id}");
723                        failures.push(format!("{peer_id}: invalid signature"));
724                        continue;
725                    }
726                    if candidate.merkle_payment_timestamp != merkle_payment_timestamp {
727                        warn!("Timestamp mismatch from merkle candidate {peer_id}");
728                        failures.push(format!("{peer_id}: timestamp mismatch"));
729                        continue;
730                    }
731                    valid.push((peer_id, candidate));
732                }
733                Err(e) => {
734                    debug!("Failed to get merkle candidate from {peer_id}: {e}");
735                    failures.push(format!("{peer_id}: {e}"));
736                }
737            }
738        }
739
740        if valid.len() < CANDIDATES_PER_POOL {
741            return Err(Error::InsufficientPeers(format!(
742                "Got {} merkle candidates, need {CANDIDATES_PER_POOL}. Failures: [{}]",
743                valid.len(),
744                failures.join("; ")
745            )));
746        }
747
748        let target_peer = PeerId::from_bytes(*target_address);
749        valid.sort_by_key(|(peer_id, _)| peer_id.xor_distance(&target_peer));
750
751        let candidates: Vec<MerklePaymentCandidateNode> = valid
752            .into_iter()
753            .take(CANDIDATES_PER_POOL)
754            .map(|(_, candidate)| candidate)
755            .collect();
756
757        candidates
758            .try_into()
759            .map_err(|_| Error::Payment("Failed to convert candidates to fixed array".to_string()))
760    }
761
762    /// Upload chunks using pre-computed merkle proofs from a batch payment.
763    ///
764    /// Each chunk is matched to its proof from `batch_result.proofs`, then
765    /// stored to its close group concurrently. A per-chunk quorum shortfall
766    /// (`InsufficientPeers`) does **not** abort the file: such chunks are
767    /// collected and retried — with the same reusable proof and a freshly
768    /// re-collected close group — for up to [`MERKLE_STORE_MAX_ATTEMPTS`]
769    /// attempts. `stored_offset` carries chunks already confirmed by an earlier
770    /// preflight (used for progress numbering and the returned `stored` count),
771    /// and `total_chunks` is the whole-file total for progress events.
772    ///
773    /// Returns how many chunks are stored (including `stored_offset`), how many
774    /// remained short of quorum after all retries, and the aggregate store
775    /// stats.
776    ///
777    /// # Errors
778    ///
779    /// Returns an error only for non-quorum failures (e.g. a missing proof, or a
780    /// chunk-count/address mismatch); quorum shortfalls are reported via
781    /// [`MerkleStoreOutcome::failed`].
782    pub(crate) async fn merkle_upload_chunks(
783        &self,
784        chunk_contents: Vec<Bytes>,
785        addresses: Vec<[u8; 32]>,
786        batch_result: &MerkleBatchPaymentResult,
787        progress: Option<&mpsc::Sender<UploadEvent>>,
788        stored_offset: usize,
789        total_chunks: usize,
790    ) -> Result<MerkleStoreOutcome> {
791        let store_limiter = self.controller().store.clone();
792        // Clamp fan-out to batch size — partial batches should not
793        // pay for unused slots (see PERF-RESULTS.md).
794        let batch_size = chunk_contents.len();
795        if batch_size != addresses.len() {
796            return Err(Error::InvalidData(format!(
797                "merkle upload has {batch_size} chunk contents but {} addresses",
798                addresses.len()
799            )));
800        }
801        let store_concurrency = store_limiter.current().min(batch_size.max(1));
802
803        let chunks: Vec<([u8; 32], Bytes)> = addresses.into_iter().zip(chunk_contents).collect();
804
805        // Store one chunk to its (freshly re-collected) close group. Called
806        // once per chunk per attempt, so a retry round naturally lands on a
807        // converged routing table. Only `InsufficientPeers` is recoverable;
808        // a missing proof stays fatal.
809        let store_one = |addr: [u8; 32], content: Bytes| {
810            let limiter = store_limiter.clone();
811            let proof_bytes = batch_result.proofs.get(&addr).cloned();
812            async move {
813                let started = std::time::Instant::now();
814                let proof = proof_bytes.ok_or_else(|| {
815                    Error::Payment(format!(
816                        "Missing merkle proof for chunk {}",
817                        hex::encode(addr)
818                    ))
819                })?;
820                let peers = self.close_group_peers(&addr).await?;
821                observe_op(
822                    &limiter,
823                    || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
824                    classify_error,
825                )
826                .await
827                .map(|_| started)
828            }
829        };
830
831        merkle_store_with_retry(
832            chunks,
833            store_concurrency,
834            MERKLE_STORE_MAX_ATTEMPTS,
835            MERKLE_RETRY_BACKOFF,
836            progress,
837            stored_offset,
838            total_chunks,
839            store_one,
840        )
841        .await
842    }
843}
844
845/// Total store-attempt budget for a merkle batch: the initial attempt plus up
846/// to three retries. Chosen to match the wave path's contract
847/// (`batch.rs` iterates `0..=MAX_RETRIES` with `MAX_RETRIES = 3`) and the
848/// four-slot [`WaveAggregateStats::retries_histogram`], so a chunk that lands
849/// on the final retry is recorded in `retries_histogram[3]`.
850///
851/// A chunk's close group can transiently reject its `winner_pool` midpoint
852/// while a few nodes' routing tables disagree about that midpoint; the network
853/// converges within minutes. Per-chunk proofs are reusable, so retrying the
854/// same proof after a short backoff recovers these shortfalls for free — no
855/// re-payment and no new pool.
856const MERKLE_STORE_MAX_ATTEMPTS: usize = 4;
857
858/// Base backoff between merkle store attempts. The routing-table divergence
859/// that causes `InsufficientPeers` resolves on the order of minutes, so a short
860/// sleep between rounds is enough to land on a converged close group. The
861/// actual wait is jittered by [`MERKLE_RETRY_JITTER`] so a large failed set
862/// does not re-fire against the same divergent nodes in lockstep.
863const MERKLE_RETRY_BACKOFF: Duration = Duration::from_secs(30);
864
865/// Fractional jitter applied to [`MERKLE_RETRY_BACKOFF`] (±10%), spreading the
866/// retry wave so convergent nodes are not all probed at the same instant.
867const MERKLE_RETRY_JITTER: f64 = 0.1;
868
869/// Outcome of storing a merkle batch: how many chunks landed, how many
870/// remained short of quorum after all retries, and the aggregate store stats.
871#[derive(Debug, Default)]
872pub(crate) struct MerkleStoreOutcome {
873    /// Chunks that reached quorum, including any `stored_offset` carried in
874    /// from a preflight (counted once, even if they needed retries).
875    pub stored: usize,
876    /// Chunks still short of quorum after [`MERKLE_STORE_MAX_ATTEMPTS`].
877    pub failed: usize,
878    /// Aggregate store stats (durations, attempts, per-round retry histogram).
879    pub stats: crate::data::client::batch::WaveAggregateStats,
880}
881
882/// Drive a set of merkle chunk stores with bounded retry of quorum shortfalls.
883///
884/// Runs `store_one` over all `chunks` concurrently (up to `store_concurrency`),
885/// collecting any `InsufficientPeers` failures rather than aborting. Failed
886/// chunks are retried — `store_one` re-collects their close group on each call,
887/// so a converged routing table can yield a fresh group — for up to
888/// `max_attempts` rounds, sleeping a jittered `backoff` between rounds. A
889/// chunk's success is counted once and recorded in the retry round it landed on
890/// (`retries_histogram[round]`). `stored_offset` seeds the returned `stored`
891/// count and the progress numbering; `total` is the whole-file total reported
892/// in progress events. Non-quorum errors abort immediately.
893#[allow(clippy::too_many_arguments)]
894async fn merkle_store_with_retry<F, Fut>(
895    chunks: Vec<([u8; 32], Bytes)>,
896    store_concurrency: usize,
897    max_attempts: usize,
898    backoff: Duration,
899    progress: Option<&mpsc::Sender<UploadEvent>>,
900    stored_offset: usize,
901    total: usize,
902    store_one: F,
903) -> Result<MerkleStoreOutcome>
904where
905    F: Fn([u8; 32], Bytes) -> Fut,
906    Fut: std::future::Future<Output = Result<std::time::Instant>>,
907{
908    let attempts = max_attempts.max(1);
909    let mut outcome = MerkleStoreOutcome {
910        stored: stored_offset,
911        ..MerkleStoreOutcome::default()
912    };
913    let mut pending = chunks;
914
915    for attempt in 0..attempts {
916        let concurrency = store_concurrency.min(pending.len().max(1)).max(1);
917        let mut next_failed: Vec<([u8; 32], Bytes)> = Vec::new();
918
919        let mut upload_stream = stream::iter(pending.into_iter().map(|(addr, content)| {
920            let fut = store_one(addr, content.clone());
921            async move { (addr, content, fut.await) }
922        }))
923        .buffer_unordered(concurrency);
924
925        while let Some((addr, content, result)) = upload_stream.next().await {
926            outcome.stats.chunk_attempts_total =
927                outcome.stats.chunk_attempts_total.saturating_add(1);
928            match result {
929                Ok(started) => {
930                    let duration_ms =
931                        u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
932                    outcome.stats.store_durations_ms.push(duration_ms);
933                    let idx = attempt.min(outcome.stats.retries_histogram.len().saturating_sub(1));
934                    outcome.stats.retries_histogram[idx] =
935                        outcome.stats.retries_histogram[idx].saturating_add(1);
936                    outcome.stored += 1;
937                    if let Some(tx) = progress {
938                        let _ = tx.try_send(UploadEvent::ChunkStored {
939                            stored: outcome.stored,
940                            total,
941                        });
942                    }
943                }
944                Err(Error::InsufficientPeers(_)) => next_failed.push((addr, content)),
945                Err(e) => return Err(e),
946            }
947        }
948
949        if next_failed.is_empty() {
950            break;
951        }
952
953        if attempt + 1 < attempts {
954            warn!(
955                failed = next_failed.len(),
956                attempt = attempt + 1,
957                "merkle chunks short of quorum, retrying after backoff"
958            );
959            pending = next_failed;
960            if backoff > Duration::ZERO {
961                // Jitter the wait (±MERKLE_RETRY_JITTER) so a large failed set
962                // does not re-probe the same divergent nodes in lockstep.
963                // `thread_rng` is !Send, so the value is computed and the rng
964                // dropped before the await to keep this future Send.
965                let wait = {
966                    let mut rng = rand::thread_rng();
967                    let factor = 1.0 + rng.gen_range(-MERKLE_RETRY_JITTER..=MERKLE_RETRY_JITTER);
968                    backoff.mul_f64(factor)
969                };
970                tokio::time::sleep(wait).await;
971            }
972        } else {
973            outcome.failed = next_failed.len();
974            break;
975        }
976    }
977
978    Ok(outcome)
979}
980
981/// Phase 2 of external-signer merkle payment: generate proofs from winner.
982///
983/// Takes the prepared batch and the winner pool hash returned by the
984/// on-chain payment transaction. Generates per-chunk merkle proofs.
985pub fn finalize_merkle_batch(
986    prepared: PreparedMerkleBatch,
987    winner_pool_hash: [u8; 32],
988) -> Result<MerkleBatchPaymentResult> {
989    let chunk_count = prepared.addresses.len();
990    let xornames: Vec<XorName> = prepared.addresses.iter().map(|a| XorName(*a)).collect();
991
992    // Find the winner pool
993    let winner_pool = prepared
994        .candidate_pools
995        .iter()
996        .find(|pool| pool.hash() == winner_pool_hash)
997        .ok_or_else(|| {
998            Error::Payment(format!(
999                "Winner pool {} not found in candidate pools",
1000                hex::encode(winner_pool_hash)
1001            ))
1002        })?;
1003
1004    // Generate proofs for each chunk
1005    info!("Generating merkle proofs for {chunk_count} chunks");
1006    let mut proofs = HashMap::with_capacity(chunk_count);
1007
1008    for (i, xorname) in xornames.iter().enumerate() {
1009        let address_proof = prepared
1010            .tree
1011            .generate_address_proof(i, *xorname)
1012            .map_err(|e| {
1013                Error::Payment(format!(
1014                    "Failed to generate address proof for chunk {i}: {e}"
1015                ))
1016            })?;
1017
1018        let merkle_proof = MerklePaymentProof::new(*xorname, address_proof, winner_pool.clone());
1019
1020        let tagged_bytes = serialize_merkle_proof(&merkle_proof)
1021            .map_err(|e| Error::Serialization(format!("Failed to serialize merkle proof: {e}")))?;
1022
1023        proofs.insert(prepared.addresses[i], tagged_bytes);
1024    }
1025
1026    info!("Merkle batch payment complete: {chunk_count} proofs generated");
1027
1028    Ok(MerkleBatchPaymentResult {
1029        proofs,
1030        chunk_count,
1031        storage_cost_atto: "0".to_string(),
1032        gas_cost_wei: 0,
1033        merkle_payment_timestamp: prepared.merkle_payment_timestamp,
1034    })
1035}
1036
1037/// Compile-time assertions that merkle method futures are Send.
1038#[cfg(test)]
1039mod send_assertions {
1040    use super::*;
1041    use crate::data::client::Client;
1042
1043    fn _assert_send<T: Send>(_: &T) {}
1044
1045    #[allow(
1046        dead_code,
1047        unreachable_code,
1048        unused_variables,
1049        clippy::diverging_sub_expression
1050    )]
1051    async fn _merkle_upload_chunks_is_send(client: &Client) {
1052        let batch_result: MerkleBatchPaymentResult = todo!();
1053        let fut = client.merkle_upload_chunks(Vec::new(), Vec::new(), &batch_result, None, 0, 0);
1054        _assert_send(&fut);
1055    }
1056}
1057
1058#[cfg(test)]
1059#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1060mod tests {
1061    use super::*;
1062    use ant_protocol::evm::{Amount, MerkleTree, RewardsAddress, CANDIDATES_PER_POOL};
1063
1064    // =========================================================================
1065    // should_use_merkle (free function, no Client needed)
1066    // =========================================================================
1067
1068    #[test]
1069    fn test_auto_below_threshold() {
1070        assert!(!should_use_merkle(1, PaymentMode::Auto));
1071        assert!(!should_use_merkle(10, PaymentMode::Auto));
1072        assert!(!should_use_merkle(63, PaymentMode::Auto));
1073    }
1074
1075    #[test]
1076    fn test_auto_at_and_above_threshold() {
1077        assert!(should_use_merkle(64, PaymentMode::Auto));
1078        assert!(should_use_merkle(65, PaymentMode::Auto));
1079        assert!(should_use_merkle(1000, PaymentMode::Auto));
1080    }
1081
1082    #[test]
1083    fn test_merkle_mode_forces_at_2() {
1084        assert!(!should_use_merkle(1, PaymentMode::Merkle));
1085        assert!(should_use_merkle(2, PaymentMode::Merkle));
1086        assert!(should_use_merkle(3, PaymentMode::Merkle));
1087    }
1088
1089    #[test]
1090    fn test_single_mode_always_false() {
1091        assert!(!should_use_merkle(0, PaymentMode::Single));
1092        assert!(!should_use_merkle(64, PaymentMode::Single));
1093        assert!(!should_use_merkle(1000, PaymentMode::Single));
1094    }
1095
1096    #[test]
1097    fn test_default_mode_is_auto() {
1098        assert_eq!(PaymentMode::default(), PaymentMode::Auto);
1099    }
1100
1101    #[test]
1102    fn test_threshold_value() {
1103        assert_eq!(DEFAULT_MERKLE_THRESHOLD, 64);
1104    }
1105
1106    #[test]
1107    fn chunk_contents_for_upload_addresses_preserves_requested_order() {
1108        let first = Bytes::from_static(b"first");
1109        let second = Bytes::from_static(b"second");
1110        let first_addr = compute_address(&first);
1111        let second_addr = compute_address(&second);
1112
1113        let selected = chunk_contents_for_upload_addresses(
1114            vec![first.clone(), second.clone()],
1115            &[second_addr, first_addr],
1116        )
1117        .unwrap();
1118
1119        assert_eq!(selected, vec![second, first]);
1120    }
1121
1122    #[test]
1123    fn chunk_contents_for_upload_addresses_preserves_duplicate_requests() {
1124        let repeated = Bytes::from_static(b"same-content");
1125        let other = Bytes::from_static(b"other-content");
1126        let repeated_addr = compute_address(&repeated);
1127
1128        let selected = chunk_contents_for_upload_addresses(
1129            vec![repeated.clone(), other, repeated.clone()],
1130            &[repeated_addr, repeated_addr],
1131        )
1132        .unwrap();
1133
1134        assert_eq!(selected, vec![repeated.clone(), repeated]);
1135    }
1136
1137    #[test]
1138    fn chunk_contents_for_upload_addresses_ignores_unrequested_duplicates() {
1139        let requested = Bytes::from_static(b"requested-content");
1140        let unrequested = Bytes::from_static(b"unrequested-content");
1141        let requested_addr = compute_address(&requested);
1142
1143        let selected = chunk_contents_for_upload_addresses(
1144            vec![
1145                unrequested.clone(),
1146                requested.clone(),
1147                unrequested.clone(),
1148                unrequested,
1149            ],
1150            &[requested_addr],
1151        )
1152        .unwrap();
1153
1154        assert_eq!(selected, vec![requested]);
1155    }
1156
1157    #[test]
1158    fn chunk_contents_for_upload_addresses_errors_for_missing_content() {
1159        let present = Bytes::from_static(b"present-content");
1160        let missing = Bytes::from_static(b"missing-content");
1161        let missing_addr = compute_address(&missing);
1162
1163        let result = chunk_contents_for_upload_addresses(vec![present], &[missing_addr]);
1164
1165        assert!(matches!(result, Err(Error::InvalidData(_))));
1166    }
1167
1168    // =========================================================================
1169    // MerkleTree construction and proof generation (pure, no network)
1170    // =========================================================================
1171
1172    fn make_test_addresses(count: usize) -> Vec<[u8; 32]> {
1173        (0..count)
1174            .map(|i| {
1175                let xn = XorName::from_content(&i.to_le_bytes());
1176                xn.0
1177            })
1178            .collect()
1179    }
1180
1181    #[test]
1182    fn test_tree_depth_for_known_sizes() {
1183        let cases = [(2, 1), (4, 2), (16, 4), (100, 7), (256, 8)];
1184        for (count, expected_depth) in cases {
1185            let addrs = make_test_addresses(count);
1186            let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1187            let tree = MerkleTree::from_xornames(xornames).unwrap();
1188            assert_eq!(
1189                tree.depth(),
1190                expected_depth,
1191                "depth mismatch for {count} leaves"
1192            );
1193        }
1194    }
1195
1196    #[test]
1197    fn test_proof_generation_and_verification_for_all_leaves() {
1198        let addrs = make_test_addresses(16);
1199        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1200        let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1201
1202        for (i, xn) in xornames.iter().enumerate() {
1203            let proof = tree.generate_address_proof(i, *xn).unwrap();
1204            assert!(proof.verify(), "proof for leaf {i} should verify");
1205            assert_eq!(proof.depth(), tree.depth() as usize);
1206        }
1207    }
1208
1209    #[test]
1210    fn test_proof_fails_for_wrong_address() {
1211        let addrs = make_test_addresses(8);
1212        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1213        let tree = MerkleTree::from_xornames(xornames).unwrap();
1214
1215        let wrong = XorName::from_content(b"wrong");
1216        let proof = tree.generate_address_proof(0, wrong).unwrap();
1217        assert!(!proof.verify(), "proof with wrong address should fail");
1218    }
1219
1220    #[test]
1221    fn test_tree_too_few_leaves() {
1222        let xornames = vec![XorName::from_content(b"only_one")];
1223        let result = MerkleTree::from_xornames(xornames);
1224        assert!(result.is_err());
1225    }
1226
1227    #[test]
1228    fn test_tree_at_max_leaves() {
1229        let addrs = make_test_addresses(MAX_LEAVES);
1230        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1231        let tree = MerkleTree::from_xornames(xornames).unwrap();
1232        assert_eq!(tree.leaf_count(), MAX_LEAVES);
1233    }
1234
1235    // =========================================================================
1236    // Proof serialization round-trip
1237    // =========================================================================
1238
1239    #[test]
1240    fn test_merkle_proof_serialize_deserialize_roundtrip() {
1241        use ant_protocol::evm::{Amount, MerklePaymentCandidateNode, RewardsAddress};
1242        use ant_protocol::payment::{deserialize_merkle_proof, serialize_merkle_proof};
1243
1244        let addrs = make_test_addresses(4);
1245        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1246        let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1247
1248        let timestamp = std::time::SystemTime::now()
1249            .duration_since(std::time::UNIX_EPOCH)
1250            .unwrap()
1251            .as_secs();
1252
1253        let candidates = tree.reward_candidates(timestamp).unwrap();
1254        let midpoint = candidates.first().unwrap().clone();
1255
1256        // Build candidate nodes (with dummy signatures — not ML-DSA, just for serialization test)
1257        #[allow(clippy::cast_possible_truncation)]
1258        let candidate_nodes: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
1259            std::array::from_fn(|i| MerklePaymentCandidateNode {
1260                pub_key: vec![i as u8; 32],
1261                price: Amount::from(1024u64),
1262                reward_address: RewardsAddress::new([i as u8; 20]),
1263                merkle_payment_timestamp: timestamp,
1264                signature: vec![i as u8; 64],
1265            });
1266
1267        let pool = MerklePaymentCandidatePool {
1268            midpoint_proof: midpoint,
1269            candidate_nodes,
1270        };
1271
1272        let address_proof = tree.generate_address_proof(0, xornames[0]).unwrap();
1273        let merkle_proof = MerklePaymentProof::new(xornames[0], address_proof, pool);
1274
1275        let tagged = serialize_merkle_proof(&merkle_proof).unwrap();
1276        assert_eq!(
1277            tagged.first().copied(),
1278            Some(0x02),
1279            "tag should be PROOF_TAG_MERKLE"
1280        );
1281
1282        let deserialized = deserialize_merkle_proof(&tagged).unwrap();
1283        assert_eq!(deserialized.address, merkle_proof.address);
1284        assert_eq!(
1285            deserialized.winner_pool.candidate_nodes.len(),
1286            CANDIDATES_PER_POOL
1287        );
1288    }
1289
1290    // =========================================================================
1291    // Candidate validation logic
1292    // =========================================================================
1293
1294    #[test]
1295    fn test_candidate_wrong_timestamp_rejected() {
1296        // Simulates what collect_validated_candidates checks
1297        let candidate = MerklePaymentCandidateNode {
1298            pub_key: vec![0u8; 32],
1299            price: ant_protocol::evm::Amount::ZERO,
1300            reward_address: ant_protocol::evm::RewardsAddress::new([0u8; 20]),
1301            merkle_payment_timestamp: 1000,
1302            signature: vec![0u8; 64],
1303        };
1304
1305        // Timestamp check: 1000 != 2000
1306        assert_ne!(candidate.merkle_payment_timestamp, 2000);
1307    }
1308
1309    // =========================================================================
1310    // finalize_merkle_batch (external signer)
1311    // =========================================================================
1312
1313    fn make_dummy_candidate_nodes(
1314        timestamp: u64,
1315    ) -> [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] {
1316        std::array::from_fn(|i| MerklePaymentCandidateNode {
1317            pub_key: vec![i as u8; 32],
1318            price: Amount::from(1024u64),
1319            reward_address: RewardsAddress::new([i as u8; 20]),
1320            merkle_payment_timestamp: timestamp,
1321            signature: vec![i as u8; 64],
1322        })
1323    }
1324
1325    fn make_prepared_merkle_batch(count: usize) -> PreparedMerkleBatch {
1326        let addrs = make_test_addresses(count);
1327        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1328        let tree = MerkleTree::from_xornames(xornames).unwrap();
1329
1330        let timestamp = std::time::SystemTime::now()
1331            .duration_since(std::time::UNIX_EPOCH)
1332            .unwrap()
1333            .as_secs();
1334
1335        let midpoints = tree.reward_candidates(timestamp).unwrap();
1336
1337        let candidate_pools: Vec<MerklePaymentCandidatePool> = midpoints
1338            .into_iter()
1339            .map(|mp| MerklePaymentCandidatePool {
1340                midpoint_proof: mp,
1341                candidate_nodes: make_dummy_candidate_nodes(timestamp),
1342            })
1343            .collect();
1344
1345        let pool_commitments = candidate_pools
1346            .iter()
1347            .map(MerklePaymentCandidatePool::to_commitment)
1348            .collect();
1349
1350        PreparedMerkleBatch {
1351            depth: tree.depth(),
1352            pool_commitments,
1353            merkle_payment_timestamp: timestamp,
1354            candidate_pools,
1355            tree,
1356            addresses: addrs,
1357        }
1358    }
1359
1360    #[test]
1361    fn test_finalize_merkle_batch_with_valid_winner() {
1362        let prepared = make_prepared_merkle_batch(4);
1363        let winner_hash = prepared.candidate_pools[0].hash();
1364
1365        let result = finalize_merkle_batch(prepared, winner_hash);
1366        assert!(
1367            result.is_ok(),
1368            "should succeed with valid winner: {result:?}"
1369        );
1370
1371        let batch = result.unwrap();
1372        assert_eq!(batch.chunk_count, 4);
1373        assert_eq!(batch.proofs.len(), 4);
1374
1375        // Every proof should be non-empty
1376        for proof_bytes in batch.proofs.values() {
1377            assert!(!proof_bytes.is_empty());
1378        }
1379    }
1380
1381    #[test]
1382    fn test_finalize_merkle_batch_with_invalid_winner() {
1383        let prepared = make_prepared_merkle_batch(4);
1384        let bad_hash = [0xFF; 32];
1385
1386        let result = finalize_merkle_batch(prepared, bad_hash);
1387        assert!(result.is_err());
1388        let err = result.unwrap_err().to_string();
1389        assert!(err.contains("not found in candidate pools"), "got: {err}");
1390    }
1391
1392    #[test]
1393    fn test_finalize_merkle_batch_proofs_are_deserializable() {
1394        use ant_protocol::payment::deserialize_merkle_proof;
1395
1396        let prepared = make_prepared_merkle_batch(8);
1397        let winner_hash = prepared.candidate_pools[0].hash();
1398
1399        let batch = finalize_merkle_batch(prepared, winner_hash).unwrap();
1400
1401        for (addr, proof_bytes) in &batch.proofs {
1402            let proof = deserialize_merkle_proof(proof_bytes);
1403            assert!(
1404                proof.is_ok(),
1405                "proof for {} should deserialize: {:?}",
1406                hex::encode(addr),
1407                proof.err()
1408            );
1409        }
1410    }
1411
1412    // =========================================================================
1413    // Batch splitting edge cases
1414    // =========================================================================
1415
1416    #[test]
1417    fn test_batch_split_calculation() {
1418        // MAX_LEAVES chunks should fit in 1 batch
1419        let addrs = make_test_addresses(MAX_LEAVES);
1420        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 1);
1421
1422        // MAX_LEAVES + 1 should split into 2
1423        let addrs = make_test_addresses(MAX_LEAVES + 1);
1424        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 2);
1425
1426        // 3 * MAX_LEAVES should split into 3
1427        let addrs = make_test_addresses(3 * MAX_LEAVES);
1428        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 3);
1429    }
1430
1431    // =========================================================================
1432    // merkle_store_with_retry: collect-not-abort + bounded retry (C2.1 / C2.2)
1433    // =========================================================================
1434
1435    use std::sync::{Arc, Mutex};
1436
1437    /// Build `count` (addr, content) pairs for the retry helper.
1438    fn make_chunks(count: usize) -> Vec<([u8; 32], Bytes)> {
1439        make_test_addresses(count)
1440            .into_iter()
1441            .map(|addr| (addr, Bytes::from_static(b"chunk")))
1442            .collect()
1443    }
1444
1445    /// C2.1: a per-chunk `InsufficientPeers` is collected, not propagated —
1446    /// the whole batch must NOT abort. With a single attempt, the failing
1447    /// subset is reported via `failed` and the rest are `stored`.
1448    #[tokio::test]
1449    async fn store_with_retry_collects_failures_instead_of_aborting() {
1450        let chunks = make_chunks(6);
1451        let failing: std::collections::HashSet<[u8; 32]> =
1452            chunks.iter().take(2).map(|(a, _)| *a).collect();
1453        let failing_for_closure = failing.clone();
1454
1455        let store_one = move |addr: [u8; 32], _content: Bytes| {
1456            let fail = failing_for_closure.contains(&addr);
1457            async move {
1458                if fail {
1459                    Err(Error::InsufficientPeers("test shortfall".into()))
1460                } else {
1461                    Ok(std::time::Instant::now())
1462                }
1463            }
1464        };
1465
1466        let outcome = merkle_store_with_retry(chunks, 8, 1, Duration::ZERO, None, 0, 6, store_one)
1467            .await
1468            .expect("quorum shortfalls must not abort the batch");
1469
1470        assert_eq!(outcome.stored, 4);
1471        assert_eq!(outcome.failed, 2);
1472        // Single attempt → all successes recorded in round 0.
1473        assert_eq!(outcome.stats.retries_histogram[0], 4);
1474        assert_eq!(outcome.stats.chunk_attempts_total, 6);
1475    }
1476
1477    /// A non-quorum error (e.g. a missing proof) stays fatal and aborts.
1478    #[tokio::test]
1479    async fn store_with_retry_propagates_non_quorum_errors() {
1480        let chunks = make_chunks(3);
1481        let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1482            Err::<std::time::Instant, _>(Error::Payment("missing proof".into()))
1483        };
1484
1485        let result =
1486            merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, 3, store_one).await;
1487        assert!(matches!(result, Err(Error::Payment(_))));
1488    }
1489
1490    /// C2.2: only the chunks that failed the previous round are retried.
1491    #[tokio::test]
1492    async fn store_with_retry_retries_only_the_failed_set() {
1493        let chunks = make_chunks(5);
1494        let total = chunks.len();
1495        let failing: std::collections::HashSet<[u8; 32]> =
1496            chunks.iter().take(2).map(|(a, _)| *a).collect();
1497        let failing_for_closure = failing.clone();
1498
1499        // Record every (addr) the store op was invoked with, in call order.
1500        let calls = Arc::new(Mutex::new(Vec::<[u8; 32]>::new()));
1501        let calls_for_closure = calls.clone();
1502
1503        let store_one = move |addr: [u8; 32], _content: Bytes| {
1504            let calls = calls_for_closure.clone();
1505            // Fails the first round only; succeeds thereafter.
1506            let already_seen = calls.lock().unwrap().iter().filter(|&&a| a == addr).count();
1507            let fail = failing_for_closure.contains(&addr) && already_seen == 0;
1508            calls.lock().unwrap().push(addr);
1509            async move {
1510                if fail {
1511                    Err(Error::InsufficientPeers("round-1 shortfall".into()))
1512                } else {
1513                    Ok(std::time::Instant::now())
1514                }
1515            }
1516        };
1517
1518        let outcome =
1519            merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1520                .await
1521                .expect("should converge after retry");
1522
1523        assert_eq!(outcome.stored, total);
1524        assert_eq!(outcome.failed, 0);
1525
1526        // Round 1 drains fully before round 2 starts, so the call log is
1527        // segmented: first `total` calls = round 1 (all chunks), the rest =
1528        // the retry round, which must contain ONLY the failing set.
1529        let calls = calls.lock().unwrap();
1530        assert_eq!(calls.len(), total + failing.len());
1531        let round_two: std::collections::HashSet<[u8; 32]> =
1532            calls[total..].iter().copied().collect();
1533        assert_eq!(round_two, failing);
1534    }
1535
1536    /// C2.2: a chunk that fails attempt 1 and succeeds attempt 2 is counted
1537    /// once as stored and recorded as one retry in `retries_histogram[1]`.
1538    #[tokio::test]
1539    async fn store_with_retry_counts_retry_success_once_in_histogram() {
1540        let chunks = make_chunks(4);
1541        let total = chunks.len();
1542        let flaky_addr = chunks[0].0;
1543
1544        let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
1545        let attempts_for_closure = attempts.clone();
1546
1547        let store_one = move |addr: [u8; 32], _content: Bytes| {
1548            let attempts = attempts_for_closure.clone();
1549            let n = {
1550                let mut m = attempts.lock().unwrap();
1551                let entry = m.entry(addr).or_insert(0);
1552                *entry += 1;
1553                *entry
1554            };
1555            let fail = addr == flaky_addr && n == 1;
1556            async move {
1557                if fail {
1558                    Err(Error::InsufficientPeers("transient".into()))
1559                } else {
1560                    Ok(std::time::Instant::now())
1561                }
1562            }
1563        };
1564
1565        let outcome =
1566            merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1567                .await
1568                .expect("flaky chunk should recover on retry");
1569
1570        assert_eq!(outcome.stored, total);
1571        assert_eq!(outcome.failed, 0);
1572        // 3 chunks landed on the first attempt, 1 on the first retry.
1573        assert_eq!(outcome.stats.retries_histogram[0], total - 1);
1574        assert_eq!(outcome.stats.retries_histogram[1], 1);
1575        // One extra store attempt for the flaky chunk.
1576        assert_eq!(outcome.stats.chunk_attempts_total, total + 1);
1577    }
1578
1579    /// C2.2: when every chunk stays short of quorum through the whole attempt
1580    /// budget, the helper still returns `Ok` (collect-not-abort) with the full
1581    /// batch reported as `failed`, having tried each chunk exactly
1582    /// `MERKLE_STORE_MAX_ATTEMPTS` times.
1583    #[tokio::test]
1584    async fn store_with_retry_reports_all_failed_when_retries_exhausted() {
1585        let chunks = make_chunks(3);
1586        let total = chunks.len();
1587
1588        let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1589            Err::<std::time::Instant, _>(Error::InsufficientPeers("never converges".into()))
1590        };
1591
1592        let outcome = merkle_store_with_retry(
1593            chunks,
1594            8,
1595            MERKLE_STORE_MAX_ATTEMPTS,
1596            Duration::ZERO,
1597            None,
1598            0,
1599            total,
1600            store_one,
1601        )
1602        .await
1603        .expect("an exhausted retry budget is reported, not propagated as Err");
1604
1605        assert_eq!(outcome.stored, 0);
1606        assert_eq!(outcome.failed, total);
1607        // Every chunk was attempted once per round across the full budget.
1608        assert_eq!(
1609            outcome.stats.chunk_attempts_total,
1610            total * MERKLE_STORE_MAX_ATTEMPTS
1611        );
1612        // No successes, so the histogram stays empty.
1613        assert_eq!(outcome.stats.retries_histogram, [0; 4]);
1614    }
1615}