Skip to main content

ant_core/data/client/
merkle.rs

1//! Merkle batch payment support for the Autonomi client.
2//!
3//! When uploading batches of 64+ chunks, merkle payments reduce gas costs
4//! by paying for the entire batch in a single on-chain transaction instead
5//! of one transaction per chunk.
6
7use crate::data::client::adaptive::{observe_op, Outcome};
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{
13    Amount, MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree,
14    MidpointProof, PoolCommitment, CANDIDATES_PER_POOL, MAX_LEAVES,
15};
16use ant_protocol::payment::{serialize_merkle_proof, verify_merkle_candidate_signature};
17use ant_protocol::transport::PeerId;
18use ant_protocol::{
19    compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
20    MerkleCandidateQuoteRequest, MerkleCandidateQuoteResponse,
21};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use rand::Rng;
25use std::collections::{HashMap, VecDeque};
26use std::time::Duration;
27use tokio::sync::mpsc;
28use tracing::{debug, info, warn};
29use xor_name::XorName;
30
31/// Default threshold: use merkle payments when chunk count >= this value.
32pub const DEFAULT_MERKLE_THRESHOLD: usize = 64;
33
34/// Payment mode for uploads.
35#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum PaymentMode {
38    /// Automatically choose: merkle for batches >= threshold, single otherwise.
39    #[default]
40    Auto,
41    /// Force merkle batch payment regardless of batch size (min 2 chunks).
42    Merkle,
43    /// Force single-node payment (one tx per chunk).
44    Single,
45}
46
47/// Result of a merkle batch payment.
48///
49/// Serializable so it can be persisted across runs for resume after a
50/// partial-upload failure. See `crate::data::client::cached_merkle`.
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct MerkleBatchPaymentResult {
53    /// Map of `XorName` to serialized tagged proof bytes (ready to use in PUT requests).
54    pub proofs: HashMap<[u8; 32], Vec<u8>>,
55    /// Number of chunks in the batch.
56    pub chunk_count: usize,
57    /// Total storage cost in atto (token smallest unit).
58    pub storage_cost_atto: String,
59    /// Total gas cost in wei.
60    pub gas_cost_wei: u128,
61    /// Unix timestamp (seconds) used for the on-chain merkle payment.
62    /// Persisted so resume can check whether the on-chain payment has
63    /// aged out beyond the merkle expiration window and the cached
64    /// receipt must be discarded.
65    #[serde(default)]
66    pub merkle_payment_timestamp: u64,
67}
68
69/// Prepared merkle batch ready for external payment.
70///
71/// Contains everything needed to submit the on-chain merkle payment
72/// and then finalize proof generation without a wallet.
73pub struct PreparedMerkleBatch {
74    /// Merkle tree depth (needed for the on-chain call).
75    pub depth: u8,
76    /// Pool commitments for the on-chain call.
77    pub pool_commitments: Vec<PoolCommitment>,
78    /// Timestamp used for the merkle payment.
79    pub merkle_payment_timestamp: u64,
80    /// Internal: candidate pools (needed for proof generation after payment).
81    candidate_pools: Vec<MerklePaymentCandidatePool>,
82    /// Internal: the merkle tree (needed for proof generation).
83    tree: MerkleTree,
84    /// Internal: chunk addresses in order.
85    addresses: Vec<[u8; 32]>,
86}
87
88/// Result of checking a merkle upload batch before payment.
89#[derive(Debug, Clone, Default)]
90pub(crate) struct MerkleUploadPlan {
91    /// Chunks already confirmed by their close group.
92    pub already_stored: Vec<[u8; 32]>,
93    /// Chunks that still need payment and storage.
94    pub to_upload: Vec<[u8; 32]>,
95    /// Total byte size of chunks in `to_upload`.
96    to_upload_total_bytes: u64,
97}
98
99impl MerkleUploadPlan {
100    /// Average byte size of chunks that still need upload.
101    #[must_use]
102    pub fn to_upload_avg_size(&self) -> u64 {
103        if self.to_upload.is_empty() {
104            return 0;
105        }
106
107        self.to_upload_total_bytes / self.to_upload.len() as u64
108    }
109}
110
111impl std::fmt::Debug for PreparedMerkleBatch {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("PreparedMerkleBatch")
114            .field("depth", &self.depth)
115            .field("pool_commitments", &self.pool_commitments.len())
116            .field("merkle_payment_timestamp", &self.merkle_payment_timestamp)
117            .field("candidate_pools", &self.candidate_pools.len())
118            .field("addresses", &self.addresses.len())
119            .finish()
120    }
121}
122
123/// Select chunk contents that correspond to `addresses`, preserving address order.
124///
125/// Extra chunk contents are ignored; missing contents for any requested address
126/// are treated as corrupted upload state.
127pub(crate) fn chunk_contents_for_upload_addresses(
128    chunk_contents: Vec<Bytes>,
129    addresses: &[[u8; 32]],
130) -> Result<Vec<Bytes>> {
131    if addresses.is_empty() {
132        return Ok(Vec::new());
133    }
134
135    let mut needed_by_address: HashMap<[u8; 32], usize> = HashMap::new();
136    for address in addresses {
137        *needed_by_address.entry(*address).or_default() += 1;
138    }
139
140    let mut chunks_by_address: HashMap<[u8; 32], VecDeque<Bytes>> =
141        HashMap::with_capacity(needed_by_address.len());
142    let mut remaining = addresses.len();
143    for chunk in chunk_contents {
144        let address = compute_address(&chunk);
145        if let Some(needed) = needed_by_address.get_mut(&address) {
146            if *needed > 0 {
147                chunks_by_address
148                    .entry(address)
149                    .or_default()
150                    .push_back(chunk);
151                *needed -= 1;
152                remaining -= 1;
153                if remaining == 0 {
154                    break;
155                }
156            }
157        }
158    }
159
160    for (address, needed) in &needed_by_address {
161        if *needed == 0 {
162            continue;
163        }
164
165        if chunks_by_address.contains_key(address) {
166            return Err(Error::InvalidData(format!(
167                "missing duplicate chunk content for merkle address {}",
168                hex::encode(address)
169            )));
170        }
171
172        return Err(Error::InvalidData(format!(
173            "missing chunk content for merkle address {}",
174            hex::encode(address)
175        )));
176    }
177
178    let mut selected = Vec::with_capacity(addresses.len());
179    for address in addresses {
180        let chunks = chunks_by_address.get_mut(address).ok_or_else(|| {
181            Error::InvalidData(format!(
182                "missing chunk content for merkle address {}",
183                hex::encode(address)
184            ))
185        })?;
186        let chunk = chunks.pop_front().ok_or_else(|| {
187            Error::InvalidData(format!(
188                "missing duplicate chunk content for merkle address {}",
189                hex::encode(address)
190            ))
191        })?;
192        selected.push(chunk);
193    }
194
195    Ok(selected)
196}
197
198/// Map a per-chunk quote-collection result to its merkle-preflight stored
199/// status, degrading gracefully on transient failures.
200///
201/// The preflight only needs to answer "is this chunk already on the network?"
202/// so it can be skipped before payment — it is an optimisation, never a gate.
203/// Therefore:
204/// - `Ok(_)` (quotes gathered) → `Ok(false)`: chunk is not stored, upload it.
205/// - `Err(AlreadyStored)` → `Ok(true)`: skip it.
206/// - a transient failure (timeout / insufficient peers / transport error) →
207///   `Ok(false)`: we could not confirm, so queue it for upload rather than
208///   aborting the whole batch. Re-storing an existing chunk is idempotent.
209/// - any other (application) error → propagate; it would recur on a healthy
210///   link and is not something the preflight should paper over.
211///
212/// Without this, a single chunk's transient quote timeout failed the entire
213/// upload — fatal for forced `--merkle` uploads, which (unlike `Auto`) have no
214/// wave-batch fallback, and catastrophic for large files via the multiplicative
215/// per-chunk effect.
216fn preflight_stored_status<T>(result: Result<T>) -> Result<bool> {
217    match result {
218        Ok(_) => Ok(false),
219        Err(Error::AlreadyStored) => Ok(true),
220        Err(e) if matches!(classify_error(&e), Outcome::Timeout | Outcome::NetworkError) => {
221            Ok(false)
222        }
223        Err(e) => Err(e),
224    }
225}
226
227/// Determine whether to use merkle payments for a given batch size.
228/// Free function — no Client needed.
229#[must_use]
230pub fn should_use_merkle(chunk_count: usize, mode: PaymentMode) -> bool {
231    match mode {
232        PaymentMode::Auto => chunk_count >= DEFAULT_MERKLE_THRESHOLD,
233        PaymentMode::Merkle => chunk_count >= 2,
234        PaymentMode::Single => false,
235    }
236}
237
238impl Client {
239    /// Determine whether to use merkle payments for a given batch size.
240    #[must_use]
241    pub fn should_use_merkle(&self, chunk_count: usize, mode: PaymentMode) -> bool {
242        should_use_merkle(chunk_count, mode)
243    }
244
245    /// Pay for a batch of chunks using merkle batch payment.
246    ///
247    /// Builds a merkle tree, collects candidate pools, pays on-chain in one tx,
248    /// and returns per-chunk proofs. Splits into sub-batches if > `MAX_LEAVES`.
249    ///
250    /// This low-level helper assumes the caller has already selected the
251    /// addresses that need payment. User-facing upload paths first run the
252    /// merkle upload planner to skip chunks already stored on the network.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the batch is too small, candidate collection fails,
257    /// on-chain payment fails, or proof generation fails.
258    pub async fn pay_for_merkle_batch(
259        &self,
260        addresses: &[[u8; 32]],
261        data_type: u32,
262        data_size: u64,
263    ) -> Result<MerkleBatchPaymentResult> {
264        let chunk_count = addresses.len();
265        if chunk_count < 2 {
266            return Err(Error::Payment(
267                "Merkle batch payment requires at least 2 chunks".to_string(),
268            ));
269        }
270
271        if chunk_count > MAX_LEAVES {
272            return self
273                .pay_for_merkle_multi_batch(addresses, data_type, data_size)
274                .await;
275        }
276
277        self.pay_for_merkle_single_batch(addresses, data_type, data_size)
278            .await
279    }
280
281    /// Check which chunks in a merkle upload still need payment/storage.
282    ///
283    /// Uses the normal per-chunk quote path because it already has the
284    /// close-group majority rule for `AlreadyStored`. Non-stored chunks only
285    /// use the quote response as a probe; their actual payment still happens
286    /// through the merkle batch.
287    ///
288    /// `chunks` contains `(address, data_size)` pairs.
289    pub(crate) async fn plan_merkle_upload(
290        &self,
291        chunks: Vec<([u8; 32], u64)>,
292        data_type: u32,
293        progress: Option<&mpsc::Sender<UploadEvent>>,
294    ) -> Result<MerkleUploadPlan> {
295        let total_chunks = chunks.len();
296        if total_chunks == 0 {
297            return Ok(MerkleUploadPlan::default());
298        }
299
300        info!("Checking {total_chunks} merkle chunks for existing storage before payment");
301
302        let quote_limiter = self.controller().quote.clone();
303        let quote_concurrency = quote_limiter.current().min(total_chunks.max(1));
304        let mut check_stream = stream::iter(chunks.into_iter().enumerate())
305            .map(|(index, (address, data_size))| {
306                let limiter = quote_limiter.clone();
307                async move {
308                    let result = observe_op(
309                        &limiter,
310                        || async move {
311                            self.chunk_already_stored_for_merkle(&address, data_type, data_size)
312                                .await
313                        },
314                        classify_error,
315                    )
316                    .await;
317                    (index, address, data_size, result)
318                }
319            })
320            .buffer_unordered(quote_concurrency);
321
322        let mut already_stored: Vec<(usize, [u8; 32])> = Vec::new();
323        let mut to_upload: Vec<(usize, [u8; 32], u64)> = Vec::new();
324        let mut checked = 0usize;
325
326        while let Some((index, address, data_size, result)) = check_stream.next().await {
327            let is_already_stored = result?;
328            checked += 1;
329
330            if let Some(tx) = progress {
331                let _ = tx.try_send(UploadEvent::ChunkQuoted {
332                    quoted: checked,
333                    total: total_chunks,
334                });
335            }
336
337            if is_already_stored {
338                debug!(
339                    "Merkle preflight {checked}/{total_chunks}: chunk {} already stored",
340                    hex::encode(address)
341                );
342                already_stored.push((index, address));
343                if let Some(tx) = progress {
344                    let _ = tx.try_send(UploadEvent::ChunkStored {
345                        stored: already_stored.len(),
346                        total: total_chunks,
347                    });
348                }
349            } else {
350                debug!(
351                    "Merkle preflight {checked}/{total_chunks}: chunk {} needs upload",
352                    hex::encode(address)
353                );
354                to_upload.push((index, address, data_size));
355            }
356        }
357
358        already_stored.sort_by_key(|(index, _)| *index);
359        to_upload.sort_by_key(|(index, _, _)| *index);
360
361        let to_upload_total_bytes = to_upload.iter().fold(0u64, |acc, (_, _, data_size)| {
362            acc.saturating_add(*data_size)
363        });
364
365        let already_stored = already_stored
366            .into_iter()
367            .map(|(_, address)| address)
368            .collect::<Vec<_>>();
369        let to_upload = to_upload
370            .into_iter()
371            .map(|(_, address, _)| address)
372            .collect::<Vec<_>>();
373
374        info!(
375            "Merkle preflight complete: {} already stored, {} need upload",
376            already_stored.len(),
377            to_upload.len()
378        );
379
380        Ok(MerkleUploadPlan {
381            already_stored,
382            to_upload,
383            to_upload_total_bytes,
384        })
385    }
386
387    async fn chunk_already_stored_for_merkle(
388        &self,
389        address: &[u8; 32],
390        data_type: u32,
391        data_size: u64,
392    ) -> Result<bool> {
393        let result = self.get_store_quotes(address, data_size, data_type).await;
394        if let Err(e) = &result {
395            if matches!(classify_error(e), Outcome::Timeout | Outcome::NetworkError) {
396                debug!(
397                    "Merkle preflight: could not determine stored status for {} ({e}); \
398                     treating as not stored and queuing for upload",
399                    hex::encode(address)
400                );
401            }
402        }
403        preflight_stored_status(result)
404    }
405
406    /// Phase 1 of external-signer merkle payment: prepare batch without paying.
407    ///
408    /// Builds the merkle tree, collects candidate pools from the network,
409    /// and returns the data needed for the on-chain payment call.
410    /// Requires `EvmNetwork` but NOT a wallet.
411    pub async fn prepare_merkle_batch_external(
412        &self,
413        addresses: &[[u8; 32]],
414        data_type: u32,
415        data_size: u64,
416    ) -> Result<PreparedMerkleBatch> {
417        let chunk_count = addresses.len();
418        let xornames: Vec<XorName> = addresses.iter().map(|a| XorName(*a)).collect();
419
420        debug!("Building merkle tree for {chunk_count} chunks");
421
422        // 1. Build merkle tree
423        let tree = MerkleTree::from_xornames(xornames)
424            .map_err(|e| Error::Payment(format!("Failed to build merkle tree: {e}")))?;
425
426        let depth = tree.depth();
427        let merkle_payment_timestamp = std::time::SystemTime::now()
428            .duration_since(std::time::UNIX_EPOCH)
429            .map_err(|e| Error::Payment(format!("System time error: {e}")))?
430            .as_secs();
431
432        debug!("Merkle tree: depth={depth}, leaves={chunk_count}, ts={merkle_payment_timestamp}");
433
434        // 2. Get reward candidates (midpoint proofs)
435        let midpoint_proofs = tree
436            .reward_candidates(merkle_payment_timestamp)
437            .map_err(|e| Error::Payment(format!("Failed to generate reward candidates: {e}")))?;
438
439        debug!(
440            "Collecting candidate pools from {} midpoints (concurrent)",
441            midpoint_proofs.len()
442        );
443
444        // 3. Collect candidate pools from the network (all pools in parallel)
445        let candidate_pools = self
446            .build_candidate_pools(
447                &midpoint_proofs,
448                data_type,
449                data_size,
450                merkle_payment_timestamp,
451            )
452            .await?;
453
454        // 4. Build pool commitments for on-chain payment
455        let pool_commitments: Vec<PoolCommitment> = candidate_pools
456            .iter()
457            .map(MerklePaymentCandidatePool::to_commitment)
458            .collect();
459
460        Ok(PreparedMerkleBatch {
461            depth,
462            pool_commitments,
463            merkle_payment_timestamp,
464            candidate_pools,
465            tree,
466            addresses: addresses.to_vec(),
467        })
468    }
469
470    /// Pay for a single batch (up to `MAX_LEAVES` chunks).
471    async fn pay_for_merkle_single_batch(
472        &self,
473        addresses: &[[u8; 32]],
474        data_type: u32,
475        data_size: u64,
476    ) -> Result<MerkleBatchPaymentResult> {
477        let wallet = self.require_wallet()?;
478        let prepared = self
479            .prepare_merkle_batch_external(addresses, data_type, data_size)
480            .await?;
481
482        info!(
483            "Submitting merkle batch payment on-chain (depth={})",
484            prepared.depth
485        );
486        let (winner_pool_hash, amount, gas_info) = wallet
487            .pay_for_merkle_tree(
488                prepared.depth,
489                prepared.pool_commitments.clone(),
490                prepared.merkle_payment_timestamp,
491            )
492            .await
493            .map_err(|e| Error::Payment(format!("Merkle batch payment failed: {e}")))?;
494
495        info!(
496            "Merkle payment succeeded: winner pool {}",
497            hex::encode(winner_pool_hash)
498        );
499
500        let mut result = finalize_merkle_batch(prepared, winner_pool_hash)?;
501        result.storage_cost_atto = amount.to_string();
502        result.gas_cost_wei = gas_info.gas_cost_wei;
503        Ok(result)
504    }
505
506    /// Handle batches larger than `MAX_LEAVES` by splitting into sub-batches.
507    async fn pay_for_merkle_multi_batch(
508        &self,
509        addresses: &[[u8; 32]],
510        data_type: u32,
511        data_size: u64,
512    ) -> Result<MerkleBatchPaymentResult> {
513        let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect();
514        let total_sub_batches = sub_batches.len();
515        let mut all_proofs = HashMap::with_capacity(addresses.len());
516        let mut total_storage = Amount::ZERO;
517        let mut total_gas: u128 = 0;
518        // Track the oldest sub-batch timestamp so the overall receipt
519        // expires when the *first* sub-batch's on-chain payment ages
520        // out (worst case for resume).
521        let mut oldest_ts: u64 = 0;
522
523        for (i, chunk) in sub_batches.into_iter().enumerate() {
524            match self
525                .pay_for_merkle_single_batch(chunk, data_type, data_size)
526                .await
527            {
528                Ok(sub_result) => {
529                    if let Ok(cost) = sub_result.storage_cost_atto.parse::<Amount>() {
530                        total_storage += cost;
531                    }
532                    total_gas = total_gas.saturating_add(sub_result.gas_cost_wei);
533                    if oldest_ts == 0
534                        || (sub_result.merkle_payment_timestamp > 0
535                            && sub_result.merkle_payment_timestamp < oldest_ts)
536                    {
537                        oldest_ts = sub_result.merkle_payment_timestamp;
538                    }
539                    all_proofs.extend(sub_result.proofs);
540                }
541                Err(e) => {
542                    if all_proofs.is_empty() {
543                        // First sub-batch failed, nothing paid yet -- propagate directly.
544                        return Err(e);
545                    }
546                    // Return partial result so caller can still store already-paid chunks.
547                    warn!(
548                        "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \
549                         Returning {} proofs from prior sub-batches",
550                        i + 1,
551                        all_proofs.len()
552                    );
553                    return Ok(MerkleBatchPaymentResult {
554                        chunk_count: all_proofs.len(),
555                        proofs: all_proofs,
556                        storage_cost_atto: total_storage.to_string(),
557                        gas_cost_wei: total_gas,
558                        merkle_payment_timestamp: oldest_ts,
559                    });
560                }
561            }
562        }
563
564        Ok(MerkleBatchPaymentResult {
565            chunk_count: addresses.len(),
566            proofs: all_proofs,
567            storage_cost_atto: total_storage.to_string(),
568            gas_cost_wei: total_gas,
569            merkle_payment_timestamp: oldest_ts,
570        })
571    }
572
573    /// Build candidate pools by querying the network for each midpoint (concurrently).
574    async fn build_candidate_pools(
575        &self,
576        midpoint_proofs: &[MidpointProof],
577        data_type: u32,
578        data_size: u64,
579        merkle_payment_timestamp: u64,
580    ) -> Result<Vec<MerklePaymentCandidatePool>> {
581        let mut pool_futures = FuturesUnordered::new();
582
583        for midpoint_proof in midpoint_proofs {
584            let pool_address = midpoint_proof.address();
585            let mp = midpoint_proof.clone();
586            pool_futures.push(async move {
587                let candidate_nodes = self
588                    .get_merkle_candidate_pool(
589                        &pool_address.0,
590                        data_type,
591                        data_size,
592                        merkle_payment_timestamp,
593                    )
594                    .await?;
595                Ok::<_, Error>(MerklePaymentCandidatePool {
596                    midpoint_proof: mp,
597                    candidate_nodes,
598                })
599            });
600        }
601
602        let mut pools = Vec::with_capacity(midpoint_proofs.len());
603        while let Some(result) = pool_futures.next().await {
604            pools.push(result?);
605        }
606
607        Ok(pools)
608    }
609
610    /// Collect `CANDIDATES_PER_POOL` (16) merkle candidate quotes from the network.
611    #[allow(clippy::too_many_lines)]
612    async fn get_merkle_candidate_pool(
613        &self,
614        address: &[u8; 32],
615        data_type: u32,
616        data_size: u64,
617        merkle_payment_timestamp: u64,
618    ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
619        let node = self.network().node();
620        let timeout = Duration::from_secs(self.config().quote_timeout_secs);
621
622        // Query extra peers to handle validation failures (bad sigs, wrong type, etc.)
623        let query_count = CANDIDATES_PER_POOL * 2;
624        let mut remote_peers = self
625            .network()
626            .find_closest_peers(address, query_count)
627            .await?;
628
629        // If DHT closest-nodes didn't return enough, supplement with connected peers.
630        // On small networks the DHT iterative lookup may not discover enough peers
631        // close to a random pool address, but we know more peers via direct connections.
632        if remote_peers.len() < CANDIDATES_PER_POOL {
633            let connected = self.network().connected_peers().await;
634            for peer in connected {
635                if !remote_peers.iter().any(|(id, _)| *id == peer) {
636                    remote_peers.push((peer, vec![]));
637                }
638            }
639        }
640
641        if remote_peers.len() < CANDIDATES_PER_POOL {
642            return Err(Error::InsufficientPeers(format!(
643                "Found {} peers, need {CANDIDATES_PER_POOL} for merkle candidate pool. \
644                 Use --no-merkle or a larger network.",
645                remote_peers.len()
646            )));
647        }
648
649        let mut candidate_futures = FuturesUnordered::new();
650
651        for (peer_id, peer_addrs) in &remote_peers {
652            let request_id = self.next_request_id();
653            let request = MerkleCandidateQuoteRequest {
654                address: *address,
655                data_type,
656                data_size,
657                merkle_payment_timestamp,
658            };
659            let message = ChunkMessage {
660                request_id,
661                body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
662            };
663
664            let message_bytes = match message.encode() {
665                Ok(bytes) => bytes,
666                Err(e) => {
667                    warn!("Failed to encode merkle candidate request for {peer_id}: {e}");
668                    continue;
669                }
670            };
671
672            let peer_id_clone = *peer_id;
673            let addrs_clone = peer_addrs.clone();
674            let node_clone = node.clone();
675
676            let fut = async move {
677                let result = send_and_await_chunk_response(
678                    &node_clone,
679                    &peer_id_clone,
680                    message_bytes,
681                    request_id,
682                    timeout,
683                    &addrs_clone,
684                    |body| match body {
685                        ChunkMessageBody::MerkleCandidateQuoteResponse(
686                            MerkleCandidateQuoteResponse::Success { candidate_node },
687                        ) => {
688                            match rmp_serde::from_slice::<MerklePaymentCandidateNode>(
689                                &candidate_node,
690                            ) {
691                                Ok(node) => Some(Ok(node)),
692                                Err(e) => Some(Err(Error::Serialization(format!(
693                                    "Failed to deserialize candidate node from {peer_id_clone}: {e}"
694                                )))),
695                            }
696                        }
697                        ChunkMessageBody::MerkleCandidateQuoteResponse(
698                            MerkleCandidateQuoteResponse::Error(e),
699                        ) => Some(Err(Error::Protocol(format!(
700                            "Merkle quote error from {peer_id_clone}: {e}"
701                        )))),
702                        _ => None,
703                    },
704                    |e| {
705                        Error::Network(format!(
706                            "Failed to send merkle candidate request to {peer_id_clone}: {e}"
707                        ))
708                    },
709                    || {
710                        Error::Timeout(format!(
711                            "Timeout waiting for merkle candidate from {peer_id_clone}"
712                        ))
713                    },
714                )
715                .await;
716
717                (peer_id_clone, result)
718            };
719
720            candidate_futures.push(fut);
721        }
722
723        self.collect_validated_candidates(&mut candidate_futures, address, merkle_payment_timestamp)
724            .await
725    }
726
727    /// Collect and validate merkle candidate responses, then return the
728    /// `CANDIDATES_PER_POOL` valid responders that are XOR-closest to the
729    /// pool midpoint.
730    ///
731    /// Why distance-sort instead of "first N to respond":
732    /// the storing-node verifier re-runs a network closest-peers lookup of
733    /// the pool midpoint and rejects the pool if fewer than 13 of the 16
734    /// candidate `pub_keys` appear in that authoritative close-set. Pools
735    /// built from the fastest-to-respond quoters fail this check whenever
736    /// truly-close peers are slower (NAT/relay paths) than farther peers.
737    async fn collect_validated_candidates(
738        &self,
739        futures: &mut FuturesUnordered<
740            impl std::future::Future<
741                Output = (
742                    PeerId,
743                    std::result::Result<MerklePaymentCandidateNode, Error>,
744                ),
745            >,
746        >,
747        target_address: &[u8; 32],
748        merkle_payment_timestamp: u64,
749    ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
750        let mut valid: Vec<(PeerId, MerklePaymentCandidateNode)> = Vec::new();
751        let mut failures: Vec<String> = Vec::new();
752
753        while let Some((peer_id, result)) = futures.next().await {
754            match result {
755                Ok(candidate) => {
756                    if !verify_merkle_candidate_signature(&candidate) {
757                        warn!("Invalid ML-DSA-65 signature from merkle candidate {peer_id}");
758                        failures.push(format!("{peer_id}: invalid signature"));
759                        continue;
760                    }
761                    if candidate.merkle_payment_timestamp != merkle_payment_timestamp {
762                        warn!("Timestamp mismatch from merkle candidate {peer_id}");
763                        failures.push(format!("{peer_id}: timestamp mismatch"));
764                        continue;
765                    }
766                    valid.push((peer_id, candidate));
767                }
768                Err(e) => {
769                    debug!("Failed to get merkle candidate from {peer_id}: {e}");
770                    failures.push(format!("{peer_id}: {e}"));
771                }
772            }
773        }
774
775        if valid.len() < CANDIDATES_PER_POOL {
776            return Err(Error::InsufficientPeers(format!(
777                "Got {} merkle candidates, need {CANDIDATES_PER_POOL}. Failures: [{}]",
778                valid.len(),
779                failures.join("; ")
780            )));
781        }
782
783        let target_peer = PeerId::from_bytes(*target_address);
784        valid.sort_by_key(|(peer_id, _)| peer_id.xor_distance(&target_peer));
785
786        let candidates: Vec<MerklePaymentCandidateNode> = valid
787            .into_iter()
788            .take(CANDIDATES_PER_POOL)
789            .map(|(_, candidate)| candidate)
790            .collect();
791
792        candidates
793            .try_into()
794            .map_err(|_| Error::Payment("Failed to convert candidates to fixed array".to_string()))
795    }
796
797    /// Upload chunks using pre-computed merkle proofs from a batch payment.
798    ///
799    /// Each chunk is matched to its proof from `batch_result.proofs`, then
800    /// stored to its close group concurrently. A per-chunk quorum shortfall
801    /// (`InsufficientPeers`) does **not** abort the file: such chunks are
802    /// collected and retried — with the same reusable proof and a freshly
803    /// re-collected close group — for up to [`MERKLE_STORE_MAX_ATTEMPTS`]
804    /// attempts. `stored_offset` carries chunks already confirmed by an earlier
805    /// preflight (used for progress numbering and the returned `stored` count),
806    /// and `total_chunks` is the whole-file total for progress events.
807    ///
808    /// Returns how many chunks are stored (including `stored_offset`), how many
809    /// remained short of quorum after all retries, and the aggregate store
810    /// stats.
811    ///
812    /// # Errors
813    ///
814    /// Returns an error only for non-quorum failures (e.g. a missing proof, or a
815    /// chunk-count/address mismatch); quorum shortfalls are reported via
816    /// [`MerkleStoreOutcome::failed`].
817    pub(crate) async fn merkle_upload_chunks(
818        &self,
819        chunk_contents: Vec<Bytes>,
820        addresses: Vec<[u8; 32]>,
821        batch_result: &MerkleBatchPaymentResult,
822        progress: Option<&mpsc::Sender<UploadEvent>>,
823        stored_offset: usize,
824        total_chunks: usize,
825    ) -> Result<MerkleStoreOutcome> {
826        let store_limiter = self.controller().store.clone();
827        // Clamp fan-out to batch size — partial batches should not
828        // pay for unused slots (see PERF-RESULTS.md).
829        let batch_size = chunk_contents.len();
830        if batch_size != addresses.len() {
831            return Err(Error::InvalidData(format!(
832                "merkle upload has {batch_size} chunk contents but {} addresses",
833                addresses.len()
834            )));
835        }
836        let store_concurrency = store_limiter.current().min(batch_size.max(1));
837
838        let chunks: Vec<([u8; 32], Bytes)> = addresses.into_iter().zip(chunk_contents).collect();
839
840        // Store one chunk to its (freshly re-collected) close group. Called
841        // once per chunk per attempt, so a retry round naturally lands on a
842        // converged routing table. Only `InsufficientPeers` is recoverable;
843        // a missing proof stays fatal.
844        let store_one = |addr: [u8; 32], content: Bytes| {
845            let limiter = store_limiter.clone();
846            let proof_bytes = batch_result.proofs.get(&addr).cloned();
847            async move {
848                let started = std::time::Instant::now();
849                let proof = proof_bytes.ok_or_else(|| {
850                    Error::Payment(format!(
851                        "Missing merkle proof for chunk {}",
852                        hex::encode(addr)
853                    ))
854                })?;
855                let peers = self.close_group_peers(&addr).await?;
856                observe_op(
857                    &limiter,
858                    || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
859                    classify_error,
860                )
861                .await
862                .map(|_| started)
863            }
864        };
865
866        merkle_store_with_retry(
867            chunks,
868            store_concurrency,
869            MERKLE_STORE_MAX_ATTEMPTS,
870            MERKLE_RETRY_BACKOFF,
871            progress,
872            stored_offset,
873            total_chunks,
874            store_one,
875        )
876        .await
877    }
878}
879
880/// Total store-attempt budget for a merkle batch: the initial attempt plus up
881/// to three retries. Chosen to match the wave path's contract
882/// (`batch.rs` iterates `0..=MAX_RETRIES` with `MAX_RETRIES = 3`) and the
883/// four-slot [`WaveAggregateStats::retries_histogram`], so a chunk that lands
884/// on the final retry is recorded in `retries_histogram[3]`.
885///
886/// A chunk's close group can transiently reject its `winner_pool` midpoint
887/// while a few nodes' routing tables disagree about that midpoint; the network
888/// converges within minutes. Per-chunk proofs are reusable, so retrying the
889/// same proof after a short backoff recovers these shortfalls for free — no
890/// re-payment and no new pool.
891pub(crate) const MERKLE_STORE_MAX_ATTEMPTS: usize = 4;
892
893/// Base backoff between merkle store attempts. The routing-table divergence
894/// that causes `InsufficientPeers` resolves on the order of minutes, so a short
895/// sleep between rounds is enough to land on a converged close group. The
896/// actual wait is jittered by [`MERKLE_RETRY_JITTER`] so a large failed set
897/// does not re-fire against the same divergent nodes in lockstep.
898pub(crate) const MERKLE_RETRY_BACKOFF: Duration = Duration::from_secs(30);
899
900/// Fractional jitter applied to [`MERKLE_RETRY_BACKOFF`] (±10%), spreading the
901/// retry wave so convergent nodes are not all probed at the same instant.
902const MERKLE_RETRY_JITTER: f64 = 0.1;
903
904/// Outcome of storing a merkle batch: how many chunks landed, how many
905/// remained short of quorum after all retries, and the aggregate store stats.
906#[derive(Debug, Default)]
907pub(crate) struct MerkleStoreOutcome {
908    /// Chunks that reached quorum, including any `stored_offset` carried in
909    /// from a preflight (counted once, even if they needed retries).
910    pub stored: usize,
911    /// Chunks still short of quorum after [`MERKLE_STORE_MAX_ATTEMPTS`].
912    pub failed: usize,
913    /// Addresses (and the last error message) of chunks still short of quorum
914    /// after all retries. Empty when `failed == 0`. Used by the CLI path to
915    /// build [`crate::data::Error::PartialUpload`]; the external-signer path
916    /// only reads the counts.
917    pub failed_addresses: Vec<([u8; 32], String)>,
918    /// Aggregate store stats (durations, attempts, per-round retry histogram).
919    pub stats: crate::data::client::batch::WaveAggregateStats,
920}
921
922/// Drive a set of merkle chunk stores with bounded retry of quorum shortfalls.
923///
924/// Runs `store_one` over all `chunks` concurrently (up to `store_concurrency`),
925/// collecting any `InsufficientPeers` failures rather than aborting. Failed
926/// chunks are retried — `store_one` re-collects their close group on each call,
927/// so a converged routing table can yield a fresh group — for up to
928/// `max_attempts` rounds, sleeping a jittered `backoff` between rounds. A
929/// chunk's success is counted once and recorded in the retry round it landed on
930/// (`retries_histogram[round]`). `stored_offset` seeds the returned `stored`
931/// count and the progress numbering; `total` is the whole-file total reported
932/// in progress events. Non-quorum errors abort immediately.
933#[allow(clippy::too_many_arguments)]
934pub(crate) async fn merkle_store_with_retry<F, Fut>(
935    chunks: Vec<([u8; 32], Bytes)>,
936    store_concurrency: usize,
937    max_attempts: usize,
938    backoff: Duration,
939    progress: Option<&mpsc::Sender<UploadEvent>>,
940    stored_offset: usize,
941    total: usize,
942    store_one: F,
943) -> Result<MerkleStoreOutcome>
944where
945    F: Fn([u8; 32], Bytes) -> Fut,
946    Fut: std::future::Future<Output = Result<std::time::Instant>>,
947{
948    let attempts = max_attempts.max(1);
949    let mut outcome = MerkleStoreOutcome {
950        stored: stored_offset,
951        ..MerkleStoreOutcome::default()
952    };
953    let mut pending = chunks;
954
955    for attempt in 0..attempts {
956        let concurrency = store_concurrency.min(pending.len().max(1)).max(1);
957        // Carries the chunk body forward for the next round plus the last
958        // quorum-shortfall message, so an exhausted set can report per-chunk
959        // errors via `failed_addresses`.
960        let mut next_failed: Vec<([u8; 32], Bytes, String)> = Vec::new();
961
962        let mut upload_stream = stream::iter(pending.into_iter().map(|(addr, content)| {
963            let fut = store_one(addr, content.clone());
964            async move { (addr, content, fut.await) }
965        }))
966        .buffer_unordered(concurrency);
967
968        while let Some((addr, content, result)) = upload_stream.next().await {
969            outcome.stats.chunk_attempts_total =
970                outcome.stats.chunk_attempts_total.saturating_add(1);
971            match result {
972                Ok(started) => {
973                    let duration_ms =
974                        u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
975                    outcome.stats.store_durations_ms.push(duration_ms);
976                    let idx = attempt.min(outcome.stats.retries_histogram.len().saturating_sub(1));
977                    outcome.stats.retries_histogram[idx] =
978                        outcome.stats.retries_histogram[idx].saturating_add(1);
979                    outcome.stored += 1;
980                    if let Some(tx) = progress {
981                        let _ = tx.try_send(UploadEvent::ChunkStored {
982                            stored: outcome.stored,
983                            total,
984                        });
985                    }
986                }
987                Err(e @ Error::InsufficientPeers(_)) => {
988                    next_failed.push((addr, content, e.to_string()));
989                }
990                Err(e) => return Err(e),
991            }
992        }
993
994        if next_failed.is_empty() {
995            break;
996        }
997
998        if attempt + 1 < attempts {
999            warn!(
1000                failed = next_failed.len(),
1001                attempt = attempt + 1,
1002                "merkle chunks short of quorum, retrying after backoff"
1003            );
1004            pending = next_failed
1005                .into_iter()
1006                .map(|(addr, content, _msg)| (addr, content))
1007                .collect();
1008            if backoff > Duration::ZERO {
1009                // Jitter the wait (±MERKLE_RETRY_JITTER) so a large failed set
1010                // does not re-probe the same divergent nodes in lockstep.
1011                // `thread_rng` is !Send, so the value is computed and the rng
1012                // dropped before the await to keep this future Send.
1013                let wait = {
1014                    let mut rng = rand::thread_rng();
1015                    let factor = 1.0 + rng.gen_range(-MERKLE_RETRY_JITTER..=MERKLE_RETRY_JITTER);
1016                    backoff.mul_f64(factor)
1017                };
1018                tokio::time::sleep(wait).await;
1019            }
1020        } else {
1021            outcome.failed = next_failed.len();
1022            outcome.failed_addresses = next_failed
1023                .into_iter()
1024                .map(|(addr, _content, msg)| (addr, msg))
1025                .collect();
1026            break;
1027        }
1028    }
1029
1030    Ok(outcome)
1031}
1032
1033/// Phase 2 of external-signer merkle payment: generate proofs from winner.
1034///
1035/// Takes the prepared batch and the winner pool hash returned by the
1036/// on-chain payment transaction. Generates per-chunk merkle proofs.
1037pub fn finalize_merkle_batch(
1038    prepared: PreparedMerkleBatch,
1039    winner_pool_hash: [u8; 32],
1040) -> Result<MerkleBatchPaymentResult> {
1041    let chunk_count = prepared.addresses.len();
1042    let xornames: Vec<XorName> = prepared.addresses.iter().map(|a| XorName(*a)).collect();
1043
1044    // Find the winner pool
1045    let winner_pool = prepared
1046        .candidate_pools
1047        .iter()
1048        .find(|pool| pool.hash() == winner_pool_hash)
1049        .ok_or_else(|| {
1050            Error::Payment(format!(
1051                "Winner pool {} not found in candidate pools",
1052                hex::encode(winner_pool_hash)
1053            ))
1054        })?;
1055
1056    // Generate proofs for each chunk
1057    info!("Generating merkle proofs for {chunk_count} chunks");
1058    let mut proofs = HashMap::with_capacity(chunk_count);
1059
1060    for (i, xorname) in xornames.iter().enumerate() {
1061        let address_proof = prepared
1062            .tree
1063            .generate_address_proof(i, *xorname)
1064            .map_err(|e| {
1065                Error::Payment(format!(
1066                    "Failed to generate address proof for chunk {i}: {e}"
1067                ))
1068            })?;
1069
1070        let merkle_proof = MerklePaymentProof::new(*xorname, address_proof, winner_pool.clone());
1071
1072        let tagged_bytes = serialize_merkle_proof(&merkle_proof)
1073            .map_err(|e| Error::Serialization(format!("Failed to serialize merkle proof: {e}")))?;
1074
1075        proofs.insert(prepared.addresses[i], tagged_bytes);
1076    }
1077
1078    info!("Merkle batch payment complete: {chunk_count} proofs generated");
1079
1080    Ok(MerkleBatchPaymentResult {
1081        proofs,
1082        chunk_count,
1083        storage_cost_atto: "0".to_string(),
1084        gas_cost_wei: 0,
1085        merkle_payment_timestamp: prepared.merkle_payment_timestamp,
1086    })
1087}
1088
1089/// Compile-time assertions that merkle method futures are Send.
1090#[cfg(test)]
1091mod send_assertions {
1092    use super::*;
1093    use crate::data::client::Client;
1094
1095    fn _assert_send<T: Send>(_: &T) {}
1096
1097    #[allow(
1098        dead_code,
1099        unreachable_code,
1100        unused_variables,
1101        clippy::diverging_sub_expression
1102    )]
1103    async fn _merkle_upload_chunks_is_send(client: &Client) {
1104        let batch_result: MerkleBatchPaymentResult = todo!();
1105        let fut = client.merkle_upload_chunks(Vec::new(), Vec::new(), &batch_result, None, 0, 0);
1106        _assert_send(&fut);
1107    }
1108}
1109
1110#[cfg(test)]
1111#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1112mod tests {
1113    use super::*;
1114    use ant_protocol::evm::{Amount, MerkleTree, RewardsAddress, CANDIDATES_PER_POOL};
1115
1116    // =========================================================================
1117    // should_use_merkle (free function, no Client needed)
1118    // =========================================================================
1119
1120    #[test]
1121    fn test_auto_below_threshold() {
1122        assert!(!should_use_merkle(1, PaymentMode::Auto));
1123        assert!(!should_use_merkle(10, PaymentMode::Auto));
1124        assert!(!should_use_merkle(63, PaymentMode::Auto));
1125    }
1126
1127    #[test]
1128    fn test_auto_at_and_above_threshold() {
1129        assert!(should_use_merkle(64, PaymentMode::Auto));
1130        assert!(should_use_merkle(65, PaymentMode::Auto));
1131        assert!(should_use_merkle(1000, PaymentMode::Auto));
1132    }
1133
1134    #[test]
1135    fn test_merkle_mode_forces_at_2() {
1136        assert!(!should_use_merkle(1, PaymentMode::Merkle));
1137        assert!(should_use_merkle(2, PaymentMode::Merkle));
1138        assert!(should_use_merkle(3, PaymentMode::Merkle));
1139    }
1140
1141    #[test]
1142    fn test_single_mode_always_false() {
1143        assert!(!should_use_merkle(0, PaymentMode::Single));
1144        assert!(!should_use_merkle(64, PaymentMode::Single));
1145        assert!(!should_use_merkle(1000, PaymentMode::Single));
1146    }
1147
1148    #[test]
1149    fn test_default_mode_is_auto() {
1150        assert_eq!(PaymentMode::default(), PaymentMode::Auto);
1151    }
1152
1153    #[test]
1154    fn test_threshold_value() {
1155        assert_eq!(DEFAULT_MERKLE_THRESHOLD, 64);
1156    }
1157
1158    // =========================================================================
1159    // preflight_stored_status — must degrade gracefully, never abort the batch
1160    // =========================================================================
1161
1162    #[test]
1163    fn test_preflight_quotes_gathered_means_not_stored() {
1164        assert!(matches!(preflight_stored_status(Ok(())), Ok(false)));
1165    }
1166
1167    #[test]
1168    fn test_preflight_already_stored_is_stored() {
1169        let r: Result<()> = Err(Error::AlreadyStored);
1170        assert!(matches!(preflight_stored_status(r), Ok(true)));
1171    }
1172
1173    /// The regression: a transient quote-quorum failure during the preflight
1174    /// must NOT propagate (which would abort the whole forced-merkle upload).
1175    /// It is treated as "not known to be stored" → queue the chunk for upload.
1176    #[test]
1177    fn test_preflight_transient_quote_failure_does_not_abort() {
1178        // The exact error STG-01 hit: couldn't gather a 7-quote quorum.
1179        let insufficient: Result<()> =
1180            Err(Error::InsufficientPeers("Got 5 quotes, need 7".to_string()));
1181        assert!(
1182            matches!(preflight_stored_status(insufficient), Ok(false)),
1183            "insufficient-peers during preflight must degrade to not-stored, not error"
1184        );
1185
1186        let timeout: Result<()> = Err(Error::Timeout("Timeout waiting for quote".to_string()));
1187        assert!(matches!(preflight_stored_status(timeout), Ok(false)));
1188
1189        let network: Result<()> = Err(Error::Network("connection reset".to_string()));
1190        assert!(matches!(preflight_stored_status(network), Ok(false)));
1191    }
1192
1193    /// Genuine application errors still propagate — the preflight should not
1194    /// silently swallow problems that would recur on a healthy link.
1195    #[test]
1196    fn test_preflight_application_error_propagates() {
1197        let payment: Result<()> = Err(Error::Payment("bad payment".to_string()));
1198        assert!(matches!(
1199            preflight_stored_status(payment),
1200            Err(Error::Payment(_))
1201        ));
1202    }
1203
1204    #[test]
1205    fn chunk_contents_for_upload_addresses_preserves_requested_order() {
1206        let first = Bytes::from_static(b"first");
1207        let second = Bytes::from_static(b"second");
1208        let first_addr = compute_address(&first);
1209        let second_addr = compute_address(&second);
1210
1211        let selected = chunk_contents_for_upload_addresses(
1212            vec![first.clone(), second.clone()],
1213            &[second_addr, first_addr],
1214        )
1215        .unwrap();
1216
1217        assert_eq!(selected, vec![second, first]);
1218    }
1219
1220    #[test]
1221    fn chunk_contents_for_upload_addresses_preserves_duplicate_requests() {
1222        let repeated = Bytes::from_static(b"same-content");
1223        let other = Bytes::from_static(b"other-content");
1224        let repeated_addr = compute_address(&repeated);
1225
1226        let selected = chunk_contents_for_upload_addresses(
1227            vec![repeated.clone(), other, repeated.clone()],
1228            &[repeated_addr, repeated_addr],
1229        )
1230        .unwrap();
1231
1232        assert_eq!(selected, vec![repeated.clone(), repeated]);
1233    }
1234
1235    #[test]
1236    fn chunk_contents_for_upload_addresses_ignores_unrequested_duplicates() {
1237        let requested = Bytes::from_static(b"requested-content");
1238        let unrequested = Bytes::from_static(b"unrequested-content");
1239        let requested_addr = compute_address(&requested);
1240
1241        let selected = chunk_contents_for_upload_addresses(
1242            vec![
1243                unrequested.clone(),
1244                requested.clone(),
1245                unrequested.clone(),
1246                unrequested,
1247            ],
1248            &[requested_addr],
1249        )
1250        .unwrap();
1251
1252        assert_eq!(selected, vec![requested]);
1253    }
1254
1255    #[test]
1256    fn chunk_contents_for_upload_addresses_errors_for_missing_content() {
1257        let present = Bytes::from_static(b"present-content");
1258        let missing = Bytes::from_static(b"missing-content");
1259        let missing_addr = compute_address(&missing);
1260
1261        let result = chunk_contents_for_upload_addresses(vec![present], &[missing_addr]);
1262
1263        assert!(matches!(result, Err(Error::InvalidData(_))));
1264    }
1265
1266    // =========================================================================
1267    // MerkleTree construction and proof generation (pure, no network)
1268    // =========================================================================
1269
1270    fn make_test_addresses(count: usize) -> Vec<[u8; 32]> {
1271        (0..count)
1272            .map(|i| {
1273                let xn = XorName::from_content(&i.to_le_bytes());
1274                xn.0
1275            })
1276            .collect()
1277    }
1278
1279    #[test]
1280    fn test_tree_depth_for_known_sizes() {
1281        let cases = [(2, 1), (4, 2), (16, 4), (100, 7), (256, 8)];
1282        for (count, expected_depth) in cases {
1283            let addrs = make_test_addresses(count);
1284            let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1285            let tree = MerkleTree::from_xornames(xornames).unwrap();
1286            assert_eq!(
1287                tree.depth(),
1288                expected_depth,
1289                "depth mismatch for {count} leaves"
1290            );
1291        }
1292    }
1293
1294    #[test]
1295    fn test_proof_generation_and_verification_for_all_leaves() {
1296        let addrs = make_test_addresses(16);
1297        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1298        let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1299
1300        for (i, xn) in xornames.iter().enumerate() {
1301            let proof = tree.generate_address_proof(i, *xn).unwrap();
1302            assert!(proof.verify(), "proof for leaf {i} should verify");
1303            assert_eq!(proof.depth(), tree.depth() as usize);
1304        }
1305    }
1306
1307    #[test]
1308    fn test_proof_fails_for_wrong_address() {
1309        let addrs = make_test_addresses(8);
1310        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1311        let tree = MerkleTree::from_xornames(xornames).unwrap();
1312
1313        let wrong = XorName::from_content(b"wrong");
1314        let proof = tree.generate_address_proof(0, wrong).unwrap();
1315        assert!(!proof.verify(), "proof with wrong address should fail");
1316    }
1317
1318    #[test]
1319    fn test_tree_too_few_leaves() {
1320        let xornames = vec![XorName::from_content(b"only_one")];
1321        let result = MerkleTree::from_xornames(xornames);
1322        assert!(result.is_err());
1323    }
1324
1325    #[test]
1326    fn test_tree_at_max_leaves() {
1327        let addrs = make_test_addresses(MAX_LEAVES);
1328        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1329        let tree = MerkleTree::from_xornames(xornames).unwrap();
1330        assert_eq!(tree.leaf_count(), MAX_LEAVES);
1331    }
1332
1333    // =========================================================================
1334    // Proof serialization round-trip
1335    // =========================================================================
1336
1337    #[test]
1338    fn test_merkle_proof_serialize_deserialize_roundtrip() {
1339        use ant_protocol::evm::{Amount, MerklePaymentCandidateNode, RewardsAddress};
1340        use ant_protocol::payment::{deserialize_merkle_proof, serialize_merkle_proof};
1341
1342        let addrs = make_test_addresses(4);
1343        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1344        let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1345
1346        let timestamp = std::time::SystemTime::now()
1347            .duration_since(std::time::UNIX_EPOCH)
1348            .unwrap()
1349            .as_secs();
1350
1351        let candidates = tree.reward_candidates(timestamp).unwrap();
1352        let midpoint = candidates.first().unwrap().clone();
1353
1354        // Build candidate nodes (with dummy signatures — not ML-DSA, just for serialization test)
1355        #[allow(clippy::cast_possible_truncation)]
1356        let candidate_nodes: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
1357            std::array::from_fn(|i| MerklePaymentCandidateNode {
1358                pub_key: vec![i as u8; 32],
1359                price: Amount::from(1024u64),
1360                reward_address: RewardsAddress::new([i as u8; 20]),
1361                merkle_payment_timestamp: timestamp,
1362                signature: vec![i as u8; 64],
1363            });
1364
1365        let pool = MerklePaymentCandidatePool {
1366            midpoint_proof: midpoint,
1367            candidate_nodes,
1368        };
1369
1370        let address_proof = tree.generate_address_proof(0, xornames[0]).unwrap();
1371        let merkle_proof = MerklePaymentProof::new(xornames[0], address_proof, pool);
1372
1373        let tagged = serialize_merkle_proof(&merkle_proof).unwrap();
1374        assert_eq!(
1375            tagged.first().copied(),
1376            Some(0x02),
1377            "tag should be PROOF_TAG_MERKLE"
1378        );
1379
1380        let deserialized = deserialize_merkle_proof(&tagged).unwrap();
1381        assert_eq!(deserialized.address, merkle_proof.address);
1382        assert_eq!(
1383            deserialized.winner_pool.candidate_nodes.len(),
1384            CANDIDATES_PER_POOL
1385        );
1386    }
1387
1388    // =========================================================================
1389    // Candidate validation logic
1390    // =========================================================================
1391
1392    #[test]
1393    fn test_candidate_wrong_timestamp_rejected() {
1394        // Simulates what collect_validated_candidates checks
1395        let candidate = MerklePaymentCandidateNode {
1396            pub_key: vec![0u8; 32],
1397            price: ant_protocol::evm::Amount::ZERO,
1398            reward_address: ant_protocol::evm::RewardsAddress::new([0u8; 20]),
1399            merkle_payment_timestamp: 1000,
1400            signature: vec![0u8; 64],
1401        };
1402
1403        // Timestamp check: 1000 != 2000
1404        assert_ne!(candidate.merkle_payment_timestamp, 2000);
1405    }
1406
1407    // =========================================================================
1408    // finalize_merkle_batch (external signer)
1409    // =========================================================================
1410
1411    fn make_dummy_candidate_nodes(
1412        timestamp: u64,
1413    ) -> [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] {
1414        std::array::from_fn(|i| MerklePaymentCandidateNode {
1415            pub_key: vec![i as u8; 32],
1416            price: Amount::from(1024u64),
1417            reward_address: RewardsAddress::new([i as u8; 20]),
1418            merkle_payment_timestamp: timestamp,
1419            signature: vec![i as u8; 64],
1420        })
1421    }
1422
1423    fn make_prepared_merkle_batch(count: usize) -> PreparedMerkleBatch {
1424        let addrs = make_test_addresses(count);
1425        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1426        let tree = MerkleTree::from_xornames(xornames).unwrap();
1427
1428        let timestamp = std::time::SystemTime::now()
1429            .duration_since(std::time::UNIX_EPOCH)
1430            .unwrap()
1431            .as_secs();
1432
1433        let midpoints = tree.reward_candidates(timestamp).unwrap();
1434
1435        let candidate_pools: Vec<MerklePaymentCandidatePool> = midpoints
1436            .into_iter()
1437            .map(|mp| MerklePaymentCandidatePool {
1438                midpoint_proof: mp,
1439                candidate_nodes: make_dummy_candidate_nodes(timestamp),
1440            })
1441            .collect();
1442
1443        let pool_commitments = candidate_pools
1444            .iter()
1445            .map(MerklePaymentCandidatePool::to_commitment)
1446            .collect();
1447
1448        PreparedMerkleBatch {
1449            depth: tree.depth(),
1450            pool_commitments,
1451            merkle_payment_timestamp: timestamp,
1452            candidate_pools,
1453            tree,
1454            addresses: addrs,
1455        }
1456    }
1457
1458    #[test]
1459    fn test_finalize_merkle_batch_with_valid_winner() {
1460        let prepared = make_prepared_merkle_batch(4);
1461        let winner_hash = prepared.candidate_pools[0].hash();
1462
1463        let result = finalize_merkle_batch(prepared, winner_hash);
1464        assert!(
1465            result.is_ok(),
1466            "should succeed with valid winner: {result:?}"
1467        );
1468
1469        let batch = result.unwrap();
1470        assert_eq!(batch.chunk_count, 4);
1471        assert_eq!(batch.proofs.len(), 4);
1472
1473        // Every proof should be non-empty
1474        for proof_bytes in batch.proofs.values() {
1475            assert!(!proof_bytes.is_empty());
1476        }
1477    }
1478
1479    #[test]
1480    fn test_finalize_merkle_batch_with_invalid_winner() {
1481        let prepared = make_prepared_merkle_batch(4);
1482        let bad_hash = [0xFF; 32];
1483
1484        let result = finalize_merkle_batch(prepared, bad_hash);
1485        assert!(result.is_err());
1486        let err = result.unwrap_err().to_string();
1487        assert!(err.contains("not found in candidate pools"), "got: {err}");
1488    }
1489
1490    #[test]
1491    fn test_finalize_merkle_batch_proofs_are_deserializable() {
1492        use ant_protocol::payment::deserialize_merkle_proof;
1493
1494        let prepared = make_prepared_merkle_batch(8);
1495        let winner_hash = prepared.candidate_pools[0].hash();
1496
1497        let batch = finalize_merkle_batch(prepared, winner_hash).unwrap();
1498
1499        for (addr, proof_bytes) in &batch.proofs {
1500            let proof = deserialize_merkle_proof(proof_bytes);
1501            assert!(
1502                proof.is_ok(),
1503                "proof for {} should deserialize: {:?}",
1504                hex::encode(addr),
1505                proof.err()
1506            );
1507        }
1508    }
1509
1510    // =========================================================================
1511    // Batch splitting edge cases
1512    // =========================================================================
1513
1514    #[test]
1515    fn test_batch_split_calculation() {
1516        // MAX_LEAVES chunks should fit in 1 batch
1517        let addrs = make_test_addresses(MAX_LEAVES);
1518        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 1);
1519
1520        // MAX_LEAVES + 1 should split into 2
1521        let addrs = make_test_addresses(MAX_LEAVES + 1);
1522        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 2);
1523
1524        // 3 * MAX_LEAVES should split into 3
1525        let addrs = make_test_addresses(3 * MAX_LEAVES);
1526        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 3);
1527    }
1528
1529    // =========================================================================
1530    // merkle_store_with_retry: collect-not-abort + bounded retry (C2.1 / C2.2)
1531    // =========================================================================
1532
1533    use std::sync::{Arc, Mutex};
1534
1535    /// Build `count` (addr, content) pairs for the retry helper.
1536    fn make_chunks(count: usize) -> Vec<([u8; 32], Bytes)> {
1537        make_test_addresses(count)
1538            .into_iter()
1539            .map(|addr| (addr, Bytes::from_static(b"chunk")))
1540            .collect()
1541    }
1542
1543    /// C2.1: a per-chunk `InsufficientPeers` is collected, not propagated —
1544    /// the whole batch must NOT abort. With a single attempt, the failing
1545    /// subset is reported via `failed` and the rest are `stored`.
1546    #[tokio::test]
1547    async fn store_with_retry_collects_failures_instead_of_aborting() {
1548        let chunks = make_chunks(6);
1549        let failing: std::collections::HashSet<[u8; 32]> =
1550            chunks.iter().take(2).map(|(a, _)| *a).collect();
1551        let failing_for_closure = failing.clone();
1552
1553        let store_one = move |addr: [u8; 32], _content: Bytes| {
1554            let fail = failing_for_closure.contains(&addr);
1555            async move {
1556                if fail {
1557                    Err(Error::InsufficientPeers("test shortfall".into()))
1558                } else {
1559                    Ok(std::time::Instant::now())
1560                }
1561            }
1562        };
1563
1564        let outcome = merkle_store_with_retry(chunks, 8, 1, Duration::ZERO, None, 0, 6, store_one)
1565            .await
1566            .expect("quorum shortfalls must not abort the batch");
1567
1568        assert_eq!(outcome.stored, 4);
1569        assert_eq!(outcome.failed, 2);
1570        // Single attempt → all successes recorded in round 0.
1571        assert_eq!(outcome.stats.retries_histogram[0], 4);
1572        assert_eq!(outcome.stats.chunk_attempts_total, 6);
1573    }
1574
1575    /// A non-quorum error (e.g. a missing proof) stays fatal and aborts.
1576    #[tokio::test]
1577    async fn store_with_retry_propagates_non_quorum_errors() {
1578        let chunks = make_chunks(3);
1579        let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1580            Err::<std::time::Instant, _>(Error::Payment("missing proof".into()))
1581        };
1582
1583        let result =
1584            merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, 3, store_one).await;
1585        assert!(matches!(result, Err(Error::Payment(_))));
1586    }
1587
1588    /// C2.2: only the chunks that failed the previous round are retried.
1589    #[tokio::test]
1590    async fn store_with_retry_retries_only_the_failed_set() {
1591        let chunks = make_chunks(5);
1592        let total = chunks.len();
1593        let failing: std::collections::HashSet<[u8; 32]> =
1594            chunks.iter().take(2).map(|(a, _)| *a).collect();
1595        let failing_for_closure = failing.clone();
1596
1597        // Record every (addr) the store op was invoked with, in call order.
1598        let calls = Arc::new(Mutex::new(Vec::<[u8; 32]>::new()));
1599        let calls_for_closure = calls.clone();
1600
1601        let store_one = move |addr: [u8; 32], _content: Bytes| {
1602            let calls = calls_for_closure.clone();
1603            // Fails the first round only; succeeds thereafter.
1604            let already_seen = calls.lock().unwrap().iter().filter(|&&a| a == addr).count();
1605            let fail = failing_for_closure.contains(&addr) && already_seen == 0;
1606            calls.lock().unwrap().push(addr);
1607            async move {
1608                if fail {
1609                    Err(Error::InsufficientPeers("round-1 shortfall".into()))
1610                } else {
1611                    Ok(std::time::Instant::now())
1612                }
1613            }
1614        };
1615
1616        let outcome =
1617            merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1618                .await
1619                .expect("should converge after retry");
1620
1621        assert_eq!(outcome.stored, total);
1622        assert_eq!(outcome.failed, 0);
1623
1624        // Round 1 drains fully before round 2 starts, so the call log is
1625        // segmented: first `total` calls = round 1 (all chunks), the rest =
1626        // the retry round, which must contain ONLY the failing set.
1627        let calls = calls.lock().unwrap();
1628        assert_eq!(calls.len(), total + failing.len());
1629        let round_two: std::collections::HashSet<[u8; 32]> =
1630            calls[total..].iter().copied().collect();
1631        assert_eq!(round_two, failing);
1632    }
1633
1634    /// C2.2: a chunk that fails attempt 1 and succeeds attempt 2 is counted
1635    /// once as stored and recorded as one retry in `retries_histogram[1]`.
1636    #[tokio::test]
1637    async fn store_with_retry_counts_retry_success_once_in_histogram() {
1638        let chunks = make_chunks(4);
1639        let total = chunks.len();
1640        let flaky_addr = chunks[0].0;
1641
1642        let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
1643        let attempts_for_closure = attempts.clone();
1644
1645        let store_one = move |addr: [u8; 32], _content: Bytes| {
1646            let attempts = attempts_for_closure.clone();
1647            let n = {
1648                let mut m = attempts.lock().unwrap();
1649                let entry = m.entry(addr).or_insert(0);
1650                *entry += 1;
1651                *entry
1652            };
1653            let fail = addr == flaky_addr && n == 1;
1654            async move {
1655                if fail {
1656                    Err(Error::InsufficientPeers("transient".into()))
1657                } else {
1658                    Ok(std::time::Instant::now())
1659                }
1660            }
1661        };
1662
1663        let outcome =
1664            merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1665                .await
1666                .expect("flaky chunk should recover on retry");
1667
1668        assert_eq!(outcome.stored, total);
1669        assert_eq!(outcome.failed, 0);
1670        // 3 chunks landed on the first attempt, 1 on the first retry.
1671        assert_eq!(outcome.stats.retries_histogram[0], total - 1);
1672        assert_eq!(outcome.stats.retries_histogram[1], 1);
1673        // One extra store attempt for the flaky chunk.
1674        assert_eq!(outcome.stats.chunk_attempts_total, total + 1);
1675    }
1676
1677    /// C2.2: when every chunk stays short of quorum through the whole attempt
1678    /// budget, the helper still returns `Ok` (collect-not-abort) with the full
1679    /// batch reported as `failed`, having tried each chunk exactly
1680    /// `MERKLE_STORE_MAX_ATTEMPTS` times.
1681    #[tokio::test]
1682    async fn store_with_retry_reports_all_failed_when_retries_exhausted() {
1683        let chunks = make_chunks(3);
1684        let total = chunks.len();
1685
1686        let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1687            Err::<std::time::Instant, _>(Error::InsufficientPeers("never converges".into()))
1688        };
1689
1690        let outcome = merkle_store_with_retry(
1691            chunks,
1692            8,
1693            MERKLE_STORE_MAX_ATTEMPTS,
1694            Duration::ZERO,
1695            None,
1696            0,
1697            total,
1698            store_one,
1699        )
1700        .await
1701        .expect("an exhausted retry budget is reported, not propagated as Err");
1702
1703        assert_eq!(outcome.stored, 0);
1704        assert_eq!(outcome.failed, total);
1705        // Every chunk was attempted once per round across the full budget.
1706        assert_eq!(
1707            outcome.stats.chunk_attempts_total,
1708            total * MERKLE_STORE_MAX_ATTEMPTS
1709        );
1710        // No successes, so the histogram stays empty.
1711        assert_eq!(outcome.stats.retries_histogram, [0; 4]);
1712    }
1713
1714    /// D (CLI path): when retries are exhausted, `failed_addresses` names
1715    /// exactly the still-short-of-quorum chunks (with their last error message)
1716    /// and excludes the ones that stored. This is what `upload_waves_merkle`
1717    /// uses to build `PartialUpload`.
1718    #[tokio::test]
1719    async fn store_with_retry_records_failed_addresses_when_exhausted() {
1720        let chunks = make_chunks(6);
1721        let failing: std::collections::HashSet<[u8; 32]> =
1722            chunks.iter().take(2).map(|(a, _)| *a).collect();
1723        let failing_for_closure = failing.clone();
1724
1725        let store_one = move |addr: [u8; 32], _content: Bytes| {
1726            let fail = failing_for_closure.contains(&addr);
1727            async move {
1728                if fail {
1729                    Err(Error::InsufficientPeers("permanent shortfall".into()))
1730                } else {
1731                    Ok(std::time::Instant::now())
1732                }
1733            }
1734        };
1735
1736        let outcome = merkle_store_with_retry(
1737            chunks,
1738            8,
1739            MERKLE_STORE_MAX_ATTEMPTS,
1740            Duration::ZERO,
1741            None,
1742            0,
1743            6,
1744            store_one,
1745        )
1746        .await
1747        .expect("quorum shortfalls must not abort the batch");
1748
1749        assert_eq!(outcome.stored, 4);
1750        assert_eq!(outcome.failed, 2);
1751        // `failed_addresses` names exactly the failing set, no stored chunks.
1752        assert_eq!(outcome.failed_addresses.len(), 2);
1753        let reported: std::collections::HashSet<[u8; 32]> =
1754            outcome.failed_addresses.iter().map(|(a, _)| *a).collect();
1755        assert_eq!(reported, failing);
1756        // Each carries a non-empty error message for the PartialUpload report.
1757        for (_, msg) in &outcome.failed_addresses {
1758            assert!(msg.contains("permanent shortfall"));
1759        }
1760    }
1761
1762    /// `failed_addresses` is empty when every chunk reaches quorum (no
1763    /// `PartialUpload` is raised by the CLI path in that case).
1764    #[tokio::test]
1765    async fn store_with_retry_failed_addresses_empty_on_full_success() {
1766        let chunks = make_chunks(4);
1767        let total = chunks.len();
1768        let store_one =
1769            |_addr: [u8; 32], _content: Bytes| async move { Ok(std::time::Instant::now()) };
1770
1771        let outcome = merkle_store_with_retry(
1772            chunks,
1773            8,
1774            MERKLE_STORE_MAX_ATTEMPTS,
1775            Duration::ZERO,
1776            None,
1777            0,
1778            total,
1779            store_one,
1780        )
1781        .await
1782        .expect("all chunks store");
1783
1784        assert_eq!(outcome.stored, total);
1785        assert_eq!(outcome.failed, 0);
1786        assert!(outcome.failed_addresses.is_empty());
1787    }
1788}