Skip to main content

ant_core/data/client/
merkle.rs

1//! Merkle batch payment support for the Autonomi client.
2//!
3//! When uploading batches of 64+ chunks, merkle payments reduce gas costs
4//! by paying for the entire batch in a single on-chain transaction instead
5//! of one transaction per chunk.
6
7use crate::data::client::adaptive::{observe_op, Outcome};
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{
13    Amount, MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree,
14    MidpointProof, PoolCommitment, CANDIDATES_PER_POOL, MAX_LEAVES,
15};
16use ant_protocol::payment::{serialize_merkle_proof, verify_merkle_candidate_signature};
17use ant_protocol::transport::PeerId;
18use ant_protocol::{
19    compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
20    MerkleCandidateQuoteRequest, MerkleCandidateQuoteResponse,
21};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use rand::Rng;
25use std::collections::{HashMap, VecDeque};
26use std::time::Duration;
27use tokio::sync::mpsc;
28use tracing::{debug, info, warn};
29use xor_name::XorName;
30
31/// Default threshold: use merkle payments when chunk count >= this value.
32pub const DEFAULT_MERKLE_THRESHOLD: usize = 64;
33
34/// Payment mode for uploads.
35#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum PaymentMode {
38    /// Automatically choose: merkle for batches >= threshold, single otherwise.
39    #[default]
40    Auto,
41    /// Force merkle batch payment regardless of batch size (min 2 chunks).
42    Merkle,
43    /// Force single-node payment (one tx per chunk).
44    Single,
45}
46
47/// Result of a merkle batch payment.
48///
49/// Serializable so it can be persisted across runs for resume after a
50/// partial-upload failure. See `crate::data::client::cached_merkle`.
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct MerkleBatchPaymentResult {
53    /// Map of `XorName` to serialized tagged proof bytes (ready to use in PUT requests).
54    pub proofs: HashMap<[u8; 32], Vec<u8>>,
55    /// Number of chunks in the batch.
56    pub chunk_count: usize,
57    /// Total storage cost in atto (token smallest unit).
58    pub storage_cost_atto: String,
59    /// Total gas cost in wei.
60    pub gas_cost_wei: u128,
61    /// Unix timestamp (seconds) used for the on-chain merkle payment.
62    /// Persisted so resume can check whether the on-chain payment has
63    /// aged out beyond the merkle expiration window and the cached
64    /// receipt must be discarded.
65    #[serde(default)]
66    pub merkle_payment_timestamp: u64,
67}
68
69/// Prepared merkle batch ready for external payment.
70///
71/// Contains everything needed to submit the on-chain merkle payment
72/// and then finalize proof generation without a wallet.
73pub struct PreparedMerkleBatch {
74    /// Merkle tree depth (needed for the on-chain call).
75    pub depth: u8,
76    /// Pool commitments for the on-chain call.
77    pub pool_commitments: Vec<PoolCommitment>,
78    /// Timestamp used for the merkle payment.
79    pub merkle_payment_timestamp: u64,
80    /// Internal: candidate pools (needed for proof generation after payment).
81    candidate_pools: Vec<MerklePaymentCandidatePool>,
82    /// Internal: the merkle tree (needed for proof generation).
83    tree: MerkleTree,
84    /// Internal: chunk addresses in order.
85    addresses: Vec<[u8; 32]>,
86}
87
88/// Result of checking a merkle upload batch before payment.
89#[derive(Debug, Clone, Default)]
90pub(crate) struct MerkleUploadPlan {
91    /// Chunks already confirmed by their close group.
92    pub already_stored: Vec<[u8; 32]>,
93    /// Chunks that still need payment and storage.
94    pub to_upload: Vec<[u8; 32]>,
95    /// Total byte size of chunks in `to_upload`.
96    to_upload_total_bytes: u64,
97}
98
99impl MerkleUploadPlan {
100    /// Average byte size of chunks that still need upload.
101    #[must_use]
102    pub fn to_upload_avg_size(&self) -> u64 {
103        if self.to_upload.is_empty() {
104            return 0;
105        }
106
107        self.to_upload_total_bytes / self.to_upload.len() as u64
108    }
109}
110
111impl std::fmt::Debug for PreparedMerkleBatch {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("PreparedMerkleBatch")
114            .field("depth", &self.depth)
115            .field("pool_commitments", &self.pool_commitments.len())
116            .field("merkle_payment_timestamp", &self.merkle_payment_timestamp)
117            .field("candidate_pools", &self.candidate_pools.len())
118            .field("addresses", &self.addresses.len())
119            .finish()
120    }
121}
122
123/// Select chunk contents that correspond to `addresses`, preserving address order.
124///
125/// Extra chunk contents are ignored; missing contents for any requested address
126/// are treated as corrupted upload state.
127pub(crate) fn chunk_contents_for_upload_addresses(
128    chunk_contents: Vec<Bytes>,
129    addresses: &[[u8; 32]],
130) -> Result<Vec<Bytes>> {
131    if addresses.is_empty() {
132        return Ok(Vec::new());
133    }
134
135    let mut needed_by_address: HashMap<[u8; 32], usize> = HashMap::new();
136    for address in addresses {
137        *needed_by_address.entry(*address).or_default() += 1;
138    }
139
140    let mut chunks_by_address: HashMap<[u8; 32], VecDeque<Bytes>> =
141        HashMap::with_capacity(needed_by_address.len());
142    let mut remaining = addresses.len();
143    for chunk in chunk_contents {
144        let address = compute_address(&chunk);
145        if let Some(needed) = needed_by_address.get_mut(&address) {
146            if *needed > 0 {
147                chunks_by_address
148                    .entry(address)
149                    .or_default()
150                    .push_back(chunk);
151                *needed -= 1;
152                remaining -= 1;
153                if remaining == 0 {
154                    break;
155                }
156            }
157        }
158    }
159
160    for (address, needed) in &needed_by_address {
161        if *needed == 0 {
162            continue;
163        }
164
165        if chunks_by_address.contains_key(address) {
166            return Err(Error::InvalidData(format!(
167                "missing duplicate chunk content for merkle address {}",
168                hex::encode(address)
169            )));
170        }
171
172        return Err(Error::InvalidData(format!(
173            "missing chunk content for merkle address {}",
174            hex::encode(address)
175        )));
176    }
177
178    let mut selected = Vec::with_capacity(addresses.len());
179    for address in addresses {
180        let chunks = chunks_by_address.get_mut(address).ok_or_else(|| {
181            Error::InvalidData(format!(
182                "missing chunk content for merkle address {}",
183                hex::encode(address)
184            ))
185        })?;
186        let chunk = chunks.pop_front().ok_or_else(|| {
187            Error::InvalidData(format!(
188                "missing duplicate chunk content for merkle address {}",
189                hex::encode(address)
190            ))
191        })?;
192        selected.push(chunk);
193    }
194
195    Ok(selected)
196}
197
198/// Map a per-chunk quote-collection result to its merkle-preflight stored
199/// status, degrading gracefully on transient failures.
200///
201/// The preflight only needs to answer "is this chunk already on the network?"
202/// so it can be skipped before payment — it is an optimisation, never a gate.
203/// Therefore:
204/// - `Ok(_)` (quotes gathered) → `Ok(false)`: chunk is not stored, upload it.
205/// - `Err(AlreadyStored)` → `Ok(true)`: skip it.
206/// - a transient failure (timeout / insufficient peers / transport error) →
207///   `Ok(false)`: we could not confirm, so queue it for upload rather than
208///   aborting the whole batch. Re-storing an existing chunk is idempotent.
209/// - any other (application) error → propagate; it would recur on a healthy
210///   link and is not something the preflight should paper over.
211///
212/// Without this, a single chunk's transient quote timeout failed the entire
213/// upload — fatal for forced `--merkle` uploads, which (unlike `Auto`) have no
214/// wave-batch fallback, and catastrophic for large files via the multiplicative
215/// per-chunk effect.
216fn preflight_stored_status<T>(result: Result<T>) -> Result<bool> {
217    match result {
218        Ok(_) => Ok(false),
219        Err(Error::AlreadyStored) => Ok(true),
220        Err(e) if matches!(classify_error(&e), Outcome::Timeout | Outcome::NetworkError) => {
221            Ok(false)
222        }
223        Err(e) => Err(e),
224    }
225}
226
227/// Determine whether to use merkle payments for a given batch size.
228/// Free function — no Client needed.
229#[must_use]
230pub fn should_use_merkle(chunk_count: usize, mode: PaymentMode) -> bool {
231    match mode {
232        PaymentMode::Auto => chunk_count >= DEFAULT_MERKLE_THRESHOLD,
233        PaymentMode::Merkle => chunk_count >= 2,
234        PaymentMode::Single => false,
235    }
236}
237
238impl Client {
239    /// Determine whether to use merkle payments for a given batch size.
240    #[must_use]
241    pub fn should_use_merkle(&self, chunk_count: usize, mode: PaymentMode) -> bool {
242        should_use_merkle(chunk_count, mode)
243    }
244
245    /// Pay for a batch of chunks using merkle batch payment.
246    ///
247    /// Builds a merkle tree, collects candidate pools, pays on-chain in one tx,
248    /// and returns per-chunk proofs. Splits into sub-batches if > `MAX_LEAVES`.
249    ///
250    /// This low-level helper assumes the caller has already selected the
251    /// addresses that need payment. User-facing upload paths first run the
252    /// merkle upload planner to skip chunks already stored on the network.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the batch is too small, candidate collection fails,
257    /// on-chain payment fails, or proof generation fails.
258    pub async fn pay_for_merkle_batch(
259        &self,
260        addresses: &[[u8; 32]],
261        data_type: u32,
262        data_size: u64,
263    ) -> Result<MerkleBatchPaymentResult> {
264        let chunk_count = addresses.len();
265        if chunk_count < 2 {
266            return Err(Error::Payment(
267                "Merkle batch payment requires at least 2 chunks".to_string(),
268            ));
269        }
270
271        if chunk_count > MAX_LEAVES {
272            return self
273                .pay_for_merkle_multi_batch(addresses, data_type, data_size)
274                .await;
275        }
276
277        self.pay_for_merkle_single_batch(addresses, data_type, data_size)
278            .await
279    }
280
281    /// Check which chunks in a merkle upload still need payment/storage.
282    ///
283    /// Uses the normal per-chunk quote path because it already has the
284    /// close-group majority rule for `AlreadyStored`. Non-stored chunks only
285    /// use the quote response as a probe; their actual payment still happens
286    /// through the merkle batch.
287    ///
288    /// `chunks` contains `(address, data_size)` pairs.
289    pub(crate) async fn plan_merkle_upload(
290        &self,
291        chunks: Vec<([u8; 32], u64)>,
292        data_type: u32,
293        progress: Option<&mpsc::Sender<UploadEvent>>,
294    ) -> Result<MerkleUploadPlan> {
295        let total_chunks = chunks.len();
296        if total_chunks == 0 {
297            return Ok(MerkleUploadPlan::default());
298        }
299
300        info!("Checking {total_chunks} merkle chunks for existing storage before payment");
301
302        let quote_limiter = self.controller().quote.clone();
303        let quote_concurrency = quote_limiter.current().min(total_chunks.max(1));
304        let mut check_stream = stream::iter(chunks.into_iter().enumerate())
305            .map(|(index, (address, data_size))| {
306                let limiter = quote_limiter.clone();
307                async move {
308                    let result = observe_op(
309                        &limiter,
310                        || async move {
311                            self.chunk_already_stored_for_merkle(&address, data_type, data_size)
312                                .await
313                        },
314                        classify_error,
315                    )
316                    .await;
317                    (index, address, data_size, result)
318                }
319            })
320            .buffer_unordered(quote_concurrency);
321
322        let mut already_stored: Vec<(usize, [u8; 32])> = Vec::new();
323        let mut to_upload: Vec<(usize, [u8; 32], u64)> = Vec::new();
324        let mut checked = 0usize;
325
326        while let Some((index, address, data_size, result)) = check_stream.next().await {
327            let is_already_stored = result?;
328            checked += 1;
329
330            if let Some(tx) = progress {
331                let _ = tx.try_send(UploadEvent::ChunkQuoted {
332                    quoted: checked,
333                    total: total_chunks,
334                });
335            }
336
337            if is_already_stored {
338                debug!(
339                    "Merkle preflight {checked}/{total_chunks}: chunk {} already stored",
340                    hex::encode(address)
341                );
342                already_stored.push((index, address));
343                if let Some(tx) = progress {
344                    let _ = tx.try_send(UploadEvent::ChunkStored {
345                        stored: already_stored.len(),
346                        total: total_chunks,
347                    });
348                }
349            } else {
350                debug!(
351                    "Merkle preflight {checked}/{total_chunks}: chunk {} needs upload",
352                    hex::encode(address)
353                );
354                to_upload.push((index, address, data_size));
355            }
356        }
357
358        already_stored.sort_by_key(|(index, _)| *index);
359        to_upload.sort_by_key(|(index, _, _)| *index);
360
361        let to_upload_total_bytes = to_upload.iter().fold(0u64, |acc, (_, _, data_size)| {
362            acc.saturating_add(*data_size)
363        });
364
365        let already_stored = already_stored
366            .into_iter()
367            .map(|(_, address)| address)
368            .collect::<Vec<_>>();
369        let to_upload = to_upload
370            .into_iter()
371            .map(|(_, address, _)| address)
372            .collect::<Vec<_>>();
373
374        info!(
375            "Merkle preflight complete: {} already stored, {} need upload",
376            already_stored.len(),
377            to_upload.len()
378        );
379
380        Ok(MerkleUploadPlan {
381            already_stored,
382            to_upload,
383            to_upload_total_bytes,
384        })
385    }
386
387    async fn chunk_already_stored_for_merkle(
388        &self,
389        address: &[u8; 32],
390        data_type: u32,
391        data_size: u64,
392    ) -> Result<bool> {
393        let result = self
394            .get_store_quotes_with_fault_tolerance(address, data_size, data_type)
395            .await;
396        if let Err(e) = &result {
397            if matches!(classify_error(e), Outcome::Timeout | Outcome::NetworkError) {
398                debug!(
399                    "Merkle preflight: could not determine stored status for {} ({e}); \
400                     treating as not stored and queuing for upload",
401                    hex::encode(address)
402                );
403            }
404        }
405        preflight_stored_status(result)
406    }
407
408    /// Phase 1 of external-signer merkle payment: prepare batch without paying.
409    ///
410    /// Builds the merkle tree, collects candidate pools from the network,
411    /// and returns the data needed for the on-chain payment call.
412    /// Requires `EvmNetwork` but NOT a wallet.
413    pub async fn prepare_merkle_batch_external(
414        &self,
415        addresses: &[[u8; 32]],
416        data_type: u32,
417        data_size: u64,
418    ) -> Result<PreparedMerkleBatch> {
419        let chunk_count = addresses.len();
420        let xornames: Vec<XorName> = addresses.iter().map(|a| XorName(*a)).collect();
421
422        debug!("Building merkle tree for {chunk_count} chunks");
423
424        // 1. Build merkle tree
425        let tree = MerkleTree::from_xornames(xornames)
426            .map_err(|e| Error::Payment(format!("Failed to build merkle tree: {e}")))?;
427
428        let depth = tree.depth();
429        let merkle_payment_timestamp = std::time::SystemTime::now()
430            .duration_since(std::time::UNIX_EPOCH)
431            .map_err(|e| Error::Payment(format!("System time error: {e}")))?
432            .as_secs();
433
434        debug!("Merkle tree: depth={depth}, leaves={chunk_count}, ts={merkle_payment_timestamp}");
435
436        // 2. Get reward candidates (midpoint proofs)
437        let midpoint_proofs = tree
438            .reward_candidates(merkle_payment_timestamp)
439            .map_err(|e| Error::Payment(format!("Failed to generate reward candidates: {e}")))?;
440
441        debug!(
442            "Collecting candidate pools from {} midpoints (concurrent)",
443            midpoint_proofs.len()
444        );
445
446        // 3. Collect candidate pools from the network (all pools in parallel)
447        let candidate_pools = self
448            .build_candidate_pools(
449                &midpoint_proofs,
450                data_type,
451                data_size,
452                merkle_payment_timestamp,
453            )
454            .await?;
455
456        // 4. Build pool commitments for on-chain payment
457        let pool_commitments: Vec<PoolCommitment> = candidate_pools
458            .iter()
459            .map(MerklePaymentCandidatePool::to_commitment)
460            .collect();
461
462        Ok(PreparedMerkleBatch {
463            depth,
464            pool_commitments,
465            merkle_payment_timestamp,
466            candidate_pools,
467            tree,
468            addresses: addresses.to_vec(),
469        })
470    }
471
472    /// Pay for a single batch (up to `MAX_LEAVES` chunks).
473    async fn pay_for_merkle_single_batch(
474        &self,
475        addresses: &[[u8; 32]],
476        data_type: u32,
477        data_size: u64,
478    ) -> Result<MerkleBatchPaymentResult> {
479        let wallet = self.require_wallet()?;
480        let prepared = self
481            .prepare_merkle_batch_external(addresses, data_type, data_size)
482            .await?;
483
484        info!(
485            "Submitting merkle batch payment on-chain (depth={})",
486            prepared.depth
487        );
488        let (winner_pool_hash, amount, gas_info) = wallet
489            .pay_for_merkle_tree(
490                prepared.depth,
491                prepared.pool_commitments.clone(),
492                prepared.merkle_payment_timestamp,
493            )
494            .await
495            .map_err(|e| Error::Payment(format!("Merkle batch payment failed: {e}")))?;
496
497        info!(
498            "Merkle payment succeeded: winner pool {}",
499            hex::encode(winner_pool_hash)
500        );
501
502        let mut result = finalize_merkle_batch(prepared, winner_pool_hash)?;
503        result.storage_cost_atto = amount.to_string();
504        result.gas_cost_wei = gas_info.gas_cost_wei;
505        Ok(result)
506    }
507
508    /// Handle batches larger than `MAX_LEAVES` by splitting into sub-batches.
509    async fn pay_for_merkle_multi_batch(
510        &self,
511        addresses: &[[u8; 32]],
512        data_type: u32,
513        data_size: u64,
514    ) -> Result<MerkleBatchPaymentResult> {
515        let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect();
516        let total_sub_batches = sub_batches.len();
517        let mut all_proofs = HashMap::with_capacity(addresses.len());
518        let mut total_storage = Amount::ZERO;
519        let mut total_gas: u128 = 0;
520        // Track the oldest sub-batch timestamp so the overall receipt
521        // expires when the *first* sub-batch's on-chain payment ages
522        // out (worst case for resume).
523        let mut oldest_ts: u64 = 0;
524
525        for (i, chunk) in sub_batches.into_iter().enumerate() {
526            match self
527                .pay_for_merkle_single_batch(chunk, data_type, data_size)
528                .await
529            {
530                Ok(sub_result) => {
531                    if let Ok(cost) = sub_result.storage_cost_atto.parse::<Amount>() {
532                        total_storage += cost;
533                    }
534                    total_gas = total_gas.saturating_add(sub_result.gas_cost_wei);
535                    if oldest_ts == 0
536                        || (sub_result.merkle_payment_timestamp > 0
537                            && sub_result.merkle_payment_timestamp < oldest_ts)
538                    {
539                        oldest_ts = sub_result.merkle_payment_timestamp;
540                    }
541                    all_proofs.extend(sub_result.proofs);
542                }
543                Err(e) => {
544                    if all_proofs.is_empty() {
545                        // First sub-batch failed, nothing paid yet -- propagate directly.
546                        return Err(e);
547                    }
548                    // Return partial result so caller can still store already-paid chunks.
549                    warn!(
550                        "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \
551                         Returning {} proofs from prior sub-batches",
552                        i + 1,
553                        all_proofs.len()
554                    );
555                    return Ok(MerkleBatchPaymentResult {
556                        chunk_count: all_proofs.len(),
557                        proofs: all_proofs,
558                        storage_cost_atto: total_storage.to_string(),
559                        gas_cost_wei: total_gas,
560                        merkle_payment_timestamp: oldest_ts,
561                    });
562                }
563            }
564        }
565
566        Ok(MerkleBatchPaymentResult {
567            chunk_count: addresses.len(),
568            proofs: all_proofs,
569            storage_cost_atto: total_storage.to_string(),
570            gas_cost_wei: total_gas,
571            merkle_payment_timestamp: oldest_ts,
572        })
573    }
574
575    /// Build candidate pools by querying the network for each midpoint (concurrently).
576    async fn build_candidate_pools(
577        &self,
578        midpoint_proofs: &[MidpointProof],
579        data_type: u32,
580        data_size: u64,
581        merkle_payment_timestamp: u64,
582    ) -> Result<Vec<MerklePaymentCandidatePool>> {
583        let mut pool_futures = FuturesUnordered::new();
584
585        for midpoint_proof in midpoint_proofs {
586            let pool_address = midpoint_proof.address();
587            let mp = midpoint_proof.clone();
588            pool_futures.push(async move {
589                let candidate_nodes = self
590                    .get_merkle_candidate_pool(
591                        &pool_address.0,
592                        data_type,
593                        data_size,
594                        merkle_payment_timestamp,
595                    )
596                    .await?;
597                Ok::<_, Error>(MerklePaymentCandidatePool {
598                    midpoint_proof: mp,
599                    candidate_nodes,
600                })
601            });
602        }
603
604        let mut pools = Vec::with_capacity(midpoint_proofs.len());
605        while let Some(result) = pool_futures.next().await {
606            pools.push(result?);
607        }
608
609        Ok(pools)
610    }
611
612    /// Collect `CANDIDATES_PER_POOL` (16) merkle candidate quotes from the network.
613    #[allow(clippy::too_many_lines)]
614    async fn get_merkle_candidate_pool(
615        &self,
616        address: &[u8; 32],
617        data_type: u32,
618        data_size: u64,
619        merkle_payment_timestamp: u64,
620    ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
621        let node = self.network().node();
622        let timeout = Duration::from_secs(self.config().quote_timeout_secs);
623
624        // Query extra peers to handle validation failures (bad sigs, wrong type, etc.)
625        let query_count = CANDIDATES_PER_POOL * 2;
626        let mut remote_peers = self
627            .network()
628            .find_closest_peers(address, query_count)
629            .await?;
630
631        // If DHT closest-nodes didn't return enough, supplement with connected peers.
632        // On small networks the DHT iterative lookup may not discover enough peers
633        // close to a random pool address, but we know more peers via direct connections.
634        if remote_peers.len() < CANDIDATES_PER_POOL {
635            let connected = self.network().connected_peers().await;
636            for peer in connected {
637                if !remote_peers.iter().any(|(id, _)| *id == peer) {
638                    remote_peers.push((peer, vec![]));
639                }
640            }
641        }
642
643        if remote_peers.len() < CANDIDATES_PER_POOL {
644            return Err(Error::InsufficientPeers(format!(
645                "Found {} peers, need {CANDIDATES_PER_POOL} for merkle candidate pool. \
646                 Use --no-merkle or a larger network.",
647                remote_peers.len()
648            )));
649        }
650
651        let mut candidate_futures = FuturesUnordered::new();
652
653        for (peer_id, peer_addrs) in &remote_peers {
654            let request_id = self.next_request_id();
655            let request = MerkleCandidateQuoteRequest {
656                address: *address,
657                data_type,
658                data_size,
659                merkle_payment_timestamp,
660            };
661            let message = ChunkMessage {
662                request_id,
663                body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
664            };
665
666            let message_bytes = match message.encode() {
667                Ok(bytes) => bytes,
668                Err(e) => {
669                    warn!("Failed to encode merkle candidate request for {peer_id}: {e}");
670                    continue;
671                }
672            };
673
674            let peer_id_clone = *peer_id;
675            let addrs_clone = peer_addrs.clone();
676            let node_clone = node.clone();
677
678            let fut = async move {
679                let result = send_and_await_chunk_response(
680                    &node_clone,
681                    &peer_id_clone,
682                    message_bytes,
683                    request_id,
684                    timeout,
685                    &addrs_clone,
686                    |body| match body {
687                        ChunkMessageBody::MerkleCandidateQuoteResponse(
688                            MerkleCandidateQuoteResponse::Success { candidate_node },
689                        ) => {
690                            match rmp_serde::from_slice::<MerklePaymentCandidateNode>(
691                                &candidate_node,
692                            ) {
693                                Ok(node) => Some(Ok(node)),
694                                Err(e) => Some(Err(Error::Serialization(format!(
695                                    "Failed to deserialize candidate node from {peer_id_clone}: {e}"
696                                )))),
697                            }
698                        }
699                        ChunkMessageBody::MerkleCandidateQuoteResponse(
700                            MerkleCandidateQuoteResponse::Error(e),
701                        ) => Some(Err(Error::Protocol(format!(
702                            "Merkle quote error from {peer_id_clone}: {e}"
703                        )))),
704                        _ => None,
705                    },
706                    |e| {
707                        Error::Network(format!(
708                            "Failed to send merkle candidate request to {peer_id_clone}: {e}"
709                        ))
710                    },
711                    || {
712                        Error::Timeout(format!(
713                            "Timeout waiting for merkle candidate from {peer_id_clone}"
714                        ))
715                    },
716                )
717                .await;
718
719                (peer_id_clone, result)
720            };
721
722            candidate_futures.push(fut);
723        }
724
725        self.collect_validated_candidates(&mut candidate_futures, address, merkle_payment_timestamp)
726            .await
727    }
728
729    /// Collect and validate merkle candidate responses, then return the
730    /// `CANDIDATES_PER_POOL` valid responders that are XOR-closest to the
731    /// pool midpoint.
732    ///
733    /// Why distance-sort instead of "first N to respond":
734    /// the storing-node verifier re-runs a network closest-peers lookup of
735    /// the pool midpoint and rejects the pool if fewer than 13 of the 16
736    /// candidate `pub_keys` appear in that authoritative close-set. Pools
737    /// built from the fastest-to-respond quoters fail this check whenever
738    /// truly-close peers are slower (NAT/relay paths) than farther peers.
739    async fn collect_validated_candidates(
740        &self,
741        futures: &mut FuturesUnordered<
742            impl std::future::Future<
743                Output = (
744                    PeerId,
745                    std::result::Result<MerklePaymentCandidateNode, Error>,
746                ),
747            >,
748        >,
749        target_address: &[u8; 32],
750        merkle_payment_timestamp: u64,
751    ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
752        let mut valid: Vec<(PeerId, MerklePaymentCandidateNode)> = Vec::new();
753        let mut failures: Vec<String> = Vec::new();
754
755        while let Some((peer_id, result)) = futures.next().await {
756            match result {
757                Ok(candidate) => {
758                    if !verify_merkle_candidate_signature(&candidate) {
759                        warn!("Invalid ML-DSA-65 signature from merkle candidate {peer_id}");
760                        failures.push(format!("{peer_id}: invalid signature"));
761                        continue;
762                    }
763                    if candidate.merkle_payment_timestamp != merkle_payment_timestamp {
764                        warn!("Timestamp mismatch from merkle candidate {peer_id}");
765                        failures.push(format!("{peer_id}: timestamp mismatch"));
766                        continue;
767                    }
768                    valid.push((peer_id, candidate));
769                }
770                Err(e) => {
771                    debug!("Failed to get merkle candidate from {peer_id}: {e}");
772                    failures.push(format!("{peer_id}: {e}"));
773                }
774            }
775        }
776
777        if valid.len() < CANDIDATES_PER_POOL {
778            return Err(Error::InsufficientPeers(format!(
779                "Got {} merkle candidates, need {CANDIDATES_PER_POOL}. Failures: [{}]",
780                valid.len(),
781                failures.join("; ")
782            )));
783        }
784
785        let target_peer = PeerId::from_bytes(*target_address);
786        valid.sort_by_key(|(peer_id, _)| peer_id.xor_distance(&target_peer));
787
788        let candidates: Vec<MerklePaymentCandidateNode> = valid
789            .into_iter()
790            .take(CANDIDATES_PER_POOL)
791            .map(|(_, candidate)| candidate)
792            .collect();
793
794        candidates
795            .try_into()
796            .map_err(|_| Error::Payment("Failed to convert candidates to fixed array".to_string()))
797    }
798
799    /// Upload chunks using pre-computed merkle proofs from a batch payment.
800    ///
801    /// Each chunk is matched to its proof from `batch_result.proofs`, then
802    /// stored to its close group concurrently. A per-chunk quorum shortfall
803    /// (`InsufficientPeers`) does **not** abort the file: such chunks are
804    /// collected and retried — with the same reusable proof and a freshly
805    /// re-collected close group — for up to [`MERKLE_STORE_MAX_ATTEMPTS`]
806    /// attempts. `stored_offset` carries chunks already confirmed by an earlier
807    /// preflight (used for progress numbering and the returned `stored` count),
808    /// and `total_chunks` is the whole-file total for progress events.
809    ///
810    /// Returns how many chunks are stored (including `stored_offset`), how many
811    /// remained short of quorum after all retries, and the aggregate store
812    /// stats.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error only for non-quorum failures (e.g. a missing proof, or a
817    /// chunk-count/address mismatch); quorum shortfalls are reported via
818    /// [`MerkleStoreOutcome::failed`].
819    pub(crate) async fn merkle_upload_chunks(
820        &self,
821        chunk_contents: Vec<Bytes>,
822        addresses: Vec<[u8; 32]>,
823        batch_result: &MerkleBatchPaymentResult,
824        progress: Option<&mpsc::Sender<UploadEvent>>,
825        stored_offset: usize,
826        total_chunks: usize,
827    ) -> Result<MerkleStoreOutcome> {
828        let store_limiter = self.controller().store.clone();
829        // Clamp fan-out to batch size — partial batches should not
830        // pay for unused slots (see PERF-RESULTS.md).
831        let batch_size = chunk_contents.len();
832        if batch_size != addresses.len() {
833            return Err(Error::InvalidData(format!(
834                "merkle upload has {batch_size} chunk contents but {} addresses",
835                addresses.len()
836            )));
837        }
838        let store_concurrency = store_limiter.current().min(batch_size.max(1));
839
840        let chunks: Vec<([u8; 32], Bytes)> = addresses.into_iter().zip(chunk_contents).collect();
841
842        // Store one chunk to its (freshly re-collected) close group. Called
843        // once per chunk per attempt, so a retry round naturally lands on a
844        // converged routing table. Only `InsufficientPeers` is recoverable;
845        // a missing proof stays fatal.
846        let store_one = |addr: [u8; 32], content: Bytes| {
847            let limiter = store_limiter.clone();
848            let proof_bytes = batch_result.proofs.get(&addr).cloned();
849            async move {
850                let started = std::time::Instant::now();
851                let proof = proof_bytes.ok_or_else(|| {
852                    Error::Payment(format!(
853                        "Missing merkle proof for chunk {}",
854                        hex::encode(addr)
855                    ))
856                })?;
857                let peers = self.close_group_peers(&addr).await?;
858                observe_op(
859                    &limiter,
860                    || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
861                    classify_error,
862                )
863                .await
864                .map(|_| started)
865            }
866        };
867
868        let outcome = merkle_store_with_retry(
869            chunks,
870            store_concurrency,
871            MERKLE_STORE_MAX_ATTEMPTS,
872            MERKLE_RETRY_BACKOFF,
873            progress,
874            stored_offset,
875            total_chunks,
876            store_one,
877        )
878        .await?;
879
880        // The external-signer path treats a non-quorum error as terminal (it
881        // returns a single all-or-nothing `FileUploadResult`), so re-raise the
882        // fatal that `merkle_store_with_retry` now carries in the outcome. The
883        // CLI/spill paths, which can surface `PartialUpload`, read `fatal`
884        // directly instead.
885        if let Some(e) = outcome.fatal {
886            return Err(e);
887        }
888        Ok(outcome)
889    }
890}
891
892/// Total store-attempt budget for a merkle batch: the initial attempt plus up
893/// to three retries. Chosen to match the wave path's contract
894/// (`batch.rs` iterates `0..=MAX_RETRIES` with `MAX_RETRIES = 3`) and the
895/// four-slot [`WaveAggregateStats::retries_histogram`], so a chunk that lands
896/// on the final retry is recorded in `retries_histogram[3]`.
897///
898/// A chunk's close group can transiently reject its `winner_pool` midpoint
899/// while a few nodes' routing tables disagree about that midpoint; the network
900/// converges within minutes. Per-chunk proofs are reusable, so retrying the
901/// same proof after a short backoff recovers these shortfalls for free — no
902/// re-payment and no new pool.
903pub(crate) const MERKLE_STORE_MAX_ATTEMPTS: usize = 4;
904
905/// Base backoff between merkle store attempts. The routing-table divergence
906/// that causes `InsufficientPeers` resolves on the order of minutes, so a short
907/// sleep between rounds is enough to land on a converged close group. The
908/// actual wait is jittered by [`MERKLE_RETRY_JITTER`] so a large failed set
909/// does not re-fire against the same divergent nodes in lockstep.
910pub(crate) const MERKLE_RETRY_BACKOFF: Duration = Duration::from_secs(30);
911
912/// Fractional jitter applied to [`MERKLE_RETRY_BACKOFF`] (±10%), spreading the
913/// retry wave so convergent nodes are not all probed at the same instant.
914const MERKLE_RETRY_JITTER: f64 = 0.1;
915
916/// Outcome of storing a merkle batch: how many chunks landed, how many
917/// remained short of quorum after all retries, and the aggregate store stats.
918#[derive(Debug, Default)]
919pub(crate) struct MerkleStoreOutcome {
920    /// Chunks that reached quorum, including any `stored_offset` carried in
921    /// from a preflight (counted once, even if they needed retries).
922    pub stored: usize,
923    /// Addresses confirmed stored by this call (excludes the `stored_offset`
924    /// preflight carry-in — those have no address here). The caller appends
925    /// these to the file's stored set; using the explicit set (rather than
926    /// inferring "input minus failed") keeps accounting correct even when a
927    /// `fatal` error aborts the pass mid-flight, leaving some input chunks
928    /// neither stored nor in `failed_addresses`.
929    pub stored_addresses: Vec<[u8; 32]>,
930    /// Chunks still short of quorum after [`MERKLE_STORE_MAX_ATTEMPTS`].
931    pub failed: usize,
932    /// Addresses (and the last error message) of chunks still short of quorum
933    /// after all retries. Empty when `failed == 0`. Used by the CLI path to
934    /// build [`crate::data::Error::PartialUpload`]; the external-signer path
935    /// only reads the counts.
936    pub failed_addresses: Vec<([u8; 32], String)>,
937    /// Set when a non-quorum (fatal) store error aborted the pass. Successes
938    /// completed before the abort are still recorded in `stored`/
939    /// `stored_addresses`; the chunks that had already failed quorum are in
940    /// `failed_addresses`; chunks still in flight when the abort hit are in
941    /// neither (the caller treats input-minus-stored as failed). Callers that
942    /// want the old "fatal aborts everything" contract re-raise this as `Err`.
943    pub fatal: Option<Error>,
944    /// Aggregate store stats (durations, attempts, per-round retry histogram).
945    pub stats: crate::data::client::batch::WaveAggregateStats,
946}
947
948/// Drive a set of merkle chunk stores with bounded retry of quorum shortfalls.
949///
950/// Runs `store_one` over all `chunks` concurrently (up to `store_concurrency`),
951/// collecting any `InsufficientPeers` failures rather than aborting. Failed
952/// chunks are retried — `store_one` re-collects their close group on each call,
953/// so a converged routing table can yield a fresh group — for up to
954/// `max_attempts` rounds, sleeping a jittered `backoff` between rounds. A
955/// chunk's success is counted once and recorded in the retry round it landed on
956/// (`retries_histogram[round]`). `stored_offset` seeds the returned `stored`
957/// count and the progress numbering; `total` is the whole-file total reported
958/// in progress events.
959///
960/// A non-quorum error stops the pass but does **not** discard progress: the
961/// successes already completed this pass stay in `stored`/`stored_addresses`,
962/// the quorum shortfalls so far stay in `failed_addresses`, and the error is
963/// returned in [`MerkleStoreOutcome::fatal`] (as `Ok(outcome)`, not `Err`).
964/// Callers that want the old abort-everything behaviour re-raise `fatal` as
965/// `Err`; CLI callers fold it into `PartialUpload` while keeping the stores.
966#[allow(clippy::too_many_arguments)]
967pub(crate) async fn merkle_store_with_retry<F, Fut>(
968    chunks: Vec<([u8; 32], Bytes)>,
969    store_concurrency: usize,
970    max_attempts: usize,
971    backoff: Duration,
972    progress: Option<&mpsc::Sender<UploadEvent>>,
973    stored_offset: usize,
974    total: usize,
975    store_one: F,
976) -> Result<MerkleStoreOutcome>
977where
978    F: Fn([u8; 32], Bytes) -> Fut,
979    Fut: std::future::Future<Output = Result<std::time::Instant>>,
980{
981    let attempts = max_attempts.max(1);
982    let mut outcome = MerkleStoreOutcome {
983        stored: stored_offset,
984        ..MerkleStoreOutcome::default()
985    };
986    let mut pending = chunks;
987
988    for attempt in 0..attempts {
989        let concurrency = store_concurrency.min(pending.len().max(1)).max(1);
990        // Carries the chunk body forward for the next round plus the last
991        // quorum-shortfall message, so an exhausted set can report per-chunk
992        // errors via `failed_addresses`.
993        let mut next_failed: Vec<([u8; 32], Bytes, String)> = Vec::new();
994
995        let mut upload_stream = stream::iter(pending.into_iter().map(|(addr, content)| {
996            let fut = store_one(addr, content.clone());
997            async move { (addr, content, fut.await) }
998        }))
999        .buffer_unordered(concurrency);
1000
1001        while let Some((addr, content, result)) = upload_stream.next().await {
1002            outcome.stats.chunk_attempts_total =
1003                outcome.stats.chunk_attempts_total.saturating_add(1);
1004            match result {
1005                Ok(started) => {
1006                    let duration_ms =
1007                        u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
1008                    outcome.stats.store_durations_ms.push(duration_ms);
1009                    let idx = attempt.min(outcome.stats.retries_histogram.len().saturating_sub(1));
1010                    outcome.stats.retries_histogram[idx] =
1011                        outcome.stats.retries_histogram[idx].saturating_add(1);
1012                    outcome.stored += 1;
1013                    outcome.stored_addresses.push(addr);
1014                    if let Some(tx) = progress {
1015                        let _ = tx.try_send(UploadEvent::ChunkStored {
1016                            stored: outcome.stored,
1017                            total,
1018                        });
1019                    }
1020                }
1021                // A quorum shortfall — whether reported as a transport
1022                // shortfall (`InsufficientPeers`) or an app-only rejection
1023                // (`RemotePut`, e.g. pool-rejected / quote-stale / disk-full,
1024                // which are transient) — is recoverable: defer and retry the
1025                // chunk rather than aborting the whole upload (V2-468).
1026                Err(e @ (Error::InsufficientPeers(_) | Error::RemotePut { .. })) => {
1027                    next_failed.push((addr, content, e.to_string()));
1028                }
1029                Err(e) => {
1030                    // Non-quorum error: fatal. Stop consuming the stream but do
1031                    // NOT discard the outcome — successes already completed this
1032                    // pass stay recorded in `stored`/`stored_addresses`. Record
1033                    // the fatal chunk itself (and any quorum shortfalls seen so
1034                    // far) as failed; anything still in flight is left for the
1035                    // caller to treat as not-stored (input minus
1036                    // `stored_addresses`).
1037                    next_failed.push((addr, content, e.to_string()));
1038                    outcome.fatal = Some(e);
1039                    break;
1040                }
1041            }
1042        }
1043
1044        if outcome.fatal.is_some() {
1045            outcome.failed = next_failed.len();
1046            outcome.failed_addresses = next_failed
1047                .into_iter()
1048                .map(|(addr, _content, msg)| (addr, msg))
1049                .collect();
1050            return Ok(outcome);
1051        }
1052
1053        if next_failed.is_empty() {
1054            break;
1055        }
1056
1057        if attempt + 1 < attempts {
1058            warn!(
1059                failed = next_failed.len(),
1060                attempt = attempt + 1,
1061                "merkle chunks short of quorum, retrying after backoff"
1062            );
1063            pending = next_failed
1064                .into_iter()
1065                .map(|(addr, content, _msg)| (addr, content))
1066                .collect();
1067            if backoff > Duration::ZERO {
1068                // Jitter the wait (±MERKLE_RETRY_JITTER) so a large failed set
1069                // does not re-probe the same divergent nodes in lockstep.
1070                // `thread_rng` is !Send, so the value is computed and the rng
1071                // dropped before the await to keep this future Send.
1072                let wait = {
1073                    let mut rng = rand::thread_rng();
1074                    let factor = 1.0 + rng.gen_range(-MERKLE_RETRY_JITTER..=MERKLE_RETRY_JITTER);
1075                    backoff.mul_f64(factor)
1076                };
1077                tokio::time::sleep(wait).await;
1078            }
1079        } else {
1080            outcome.failed = next_failed.len();
1081            outcome.failed_addresses = next_failed
1082                .into_iter()
1083                .map(|(addr, _content, msg)| (addr, msg))
1084                .collect();
1085            break;
1086        }
1087    }
1088
1089    Ok(outcome)
1090}
1091
1092/// Round delays (seconds) for the merkle upload deferred-retry pass. Round 0
1093/// fires immediately — most quorum shortfalls on a healthy network are
1094/// momentary close-group divergence that clears in well under a second, and
1095/// serializing them behind mandatory sleeps was the single biggest throughput
1096/// sink in the wave path (one bad chunk parked the other 63 slots for minutes).
1097/// Only chunks that survive a round get a longer back-off before the next, so a
1098/// genuinely saturated/diverged group still gets time to settle. Mirrors the
1099/// download path's `DEFERRED_ROUND_DELAYS_SECS`.
1100pub(crate) const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
1101
1102/// Histogram slot for a deferred-retry round's successes.
1103///
1104/// The wave first pass lands in slot 0; deferred round `r` (0-indexed) lands in
1105/// slot `r + 1`, clamped to the last slot so the four-slot
1106/// [`WaveAggregateStats::retries_histogram`] keeps recording "which round a
1107/// chunk landed on" under the post-wave deferred structure.
1108pub(crate) fn deferred_round_histogram_slot(round: usize, hist_len: usize) -> usize {
1109    (round + 1).min(hist_len.saturating_sub(1))
1110}
1111
1112/// Outcome of the post-wave deferred-retry pass.
1113#[derive(Debug, Default)]
1114pub(crate) struct DeferredRetryOutcome {
1115    /// Running total of stored chunks, seeded with the `stored_offset` passed in
1116    /// (i.e. everything the wave passes already stored) and advanced by each
1117    /// deferred round's successes.
1118    pub stored: usize,
1119    /// Addresses that reached quorum during the deferred rounds (to be appended
1120    /// to the file's `stored` set).
1121    pub stored_addresses: Vec<[u8; 32]>,
1122    /// Count of chunks still short of quorum after the final deferred round.
1123    pub failed: usize,
1124    /// Addresses (and last quorum-shortfall message) still short after the final
1125    /// round, or — when `fatal` is set — the chunks that were still pending when
1126    /// a non-quorum error aborted the pass.
1127    pub failed_addresses: Vec<([u8; 32], String)>,
1128    /// Set when a deferred round hit a non-quorum (fatal) store error. The
1129    /// caller surfaces this as `PartialUpload` preserving everything stored so
1130    /// far, mirroring the wave path's fatal handling.
1131    pub fatal: Option<String>,
1132    /// Aggregate store stats merged across rounds, with each round's successes
1133    /// already mapped into its [`deferred_round_histogram_slot`].
1134    pub stats: crate::data::client::batch::WaveAggregateStats,
1135}
1136
1137/// Retry a file-level set of quorum-short merkle chunks in concurrent rounds.
1138///
1139/// This is the upload analogue of the download path's deferred-retry loop. The
1140/// wave passes store each wave in a single pass (no in-wave backoff barrier) and
1141/// hand their quorum-short chunks here. Each round processes the still-pending
1142/// chunks in **bounded batches of `batch_size`**: it re-reads only one batch of
1143/// bodies at a time via `read_bodies` (from the spill file), so peak resident
1144/// memory stays at the wave path's `batch_size × MAX_CHUNK_SIZE` bound rather
1145/// than scaling with the whole file's deferred-chunk count. Each batch is stored
1146/// concurrently at `concurrency_for(len)` via the single-pass
1147/// [`merkle_store_with_retry`] primitive, and survivors carry to the next round
1148/// after a `round_delays_secs` sleep. Chunks still short after the final round
1149/// become `failed_addresses`; a non-quorum store error stops the pass and is
1150/// reported via `fatal` (with every not-yet-stored chunk recorded as
1151/// `failed_addresses`) so the caller can surface `PartialUpload` without
1152/// discarding earlier progress.
1153///
1154/// `store_one`, `progress`, `stored_offset` and `total` mirror
1155/// [`merkle_store_with_retry`].
1156#[allow(clippy::too_many_arguments)]
1157pub(crate) async fn merkle_deferred_retry<RB, CF, SF, Fut>(
1158    deferred: Vec<([u8; 32], String)>,
1159    round_delays_secs: &[u64],
1160    batch_size: usize,
1161    read_bodies: RB,
1162    concurrency_for: CF,
1163    progress: Option<&mpsc::Sender<UploadEvent>>,
1164    stored_offset: usize,
1165    total: usize,
1166    store_one: SF,
1167) -> Result<DeferredRetryOutcome>
1168where
1169    RB: Fn(&[[u8; 32]]) -> Result<Vec<([u8; 32], Bytes)>>,
1170    CF: Fn(usize) -> usize,
1171    SF: Fn([u8; 32], Bytes) -> Fut,
1172    Fut: std::future::Future<Output = Result<std::time::Instant>>,
1173{
1174    let batch_size = batch_size.max(1);
1175    let mut outcome = DeferredRetryOutcome {
1176        stored: stored_offset,
1177        ..DeferredRetryOutcome::default()
1178    };
1179    let mut remaining = deferred;
1180    let rounds = round_delays_secs.len();
1181
1182    for (round, &delay_secs) in round_delays_secs.iter().enumerate() {
1183        if remaining.is_empty() {
1184            break;
1185        }
1186        if delay_secs > 0 {
1187            tokio::time::sleep(Duration::from_secs(delay_secs)).await;
1188        }
1189        info!(
1190            "Deferred merkle retry round {}/{}: {} chunk(s) short of quorum",
1191            round + 1,
1192            rounds,
1193            remaining.len(),
1194        );
1195
1196        // Drain this round's input; survivors accumulate back into `remaining`
1197        // for the next round. A single-pass batch records its successes in
1198        // histogram slot 0, so all of this round's successes redirect to one
1199        // slot.
1200        let slot = deferred_round_histogram_slot(round, outcome.stats.retries_histogram.len());
1201        let round_input = std::mem::take(&mut remaining);
1202        let mut input_iter = round_input.into_iter();
1203
1204        loop {
1205            let batch: Vec<([u8; 32], String)> = input_iter.by_ref().take(batch_size).collect();
1206            if batch.is_empty() {
1207                break;
1208            }
1209            let batch_addrs: Vec<[u8; 32]> = batch.iter().map(|(addr, _)| *addr).collect();
1210            // Re-read only this batch's bodies from the spill (≤ batch_size
1211            // resident at a time), so the deferred path keeps the wave path's
1212            // memory bound regardless of how many chunks were deferred.
1213            let chunks = read_bodies(&batch_addrs)?;
1214            let concurrency = concurrency_for(batch_addrs.len());
1215
1216            let batch_outcome = merkle_store_with_retry(
1217                chunks,
1218                concurrency,
1219                1,
1220                Duration::ZERO,
1221                progress,
1222                outcome.stored,
1223                total,
1224                &store_one,
1225            )
1226            .await?;
1227
1228            outcome.stored = batch_outcome.stored;
1229            outcome
1230                .stored_addresses
1231                .extend(batch_outcome.stored_addresses);
1232
1233            // Merge stats, redirecting this round's successes to its slot.
1234            outcome.stats.chunk_attempts_total = outcome
1235                .stats
1236                .chunk_attempts_total
1237                .saturating_add(batch_outcome.stats.chunk_attempts_total);
1238            outcome
1239                .stats
1240                .store_durations_ms
1241                .extend(batch_outcome.stats.store_durations_ms);
1242            let landed: usize = batch_outcome.stats.retries_histogram.iter().sum();
1243            outcome.stats.retries_histogram[slot] =
1244                outcome.stats.retries_histogram[slot].saturating_add(landed);
1245
1246            if let Some(fatal) = batch_outcome.fatal {
1247                // Fatal mid-pass: confirmed stores are preserved above. Report
1248                // everything not stored as failed — this batch's quorum
1249                // shortfalls, the remaining unprocessed batches in this round,
1250                // and any survivors already carried from earlier batches.
1251                outcome.fatal = Some(fatal.to_string());
1252                let mut failed = batch_outcome.failed_addresses;
1253                failed.extend(input_iter);
1254                failed.extend(std::mem::take(&mut remaining));
1255                outcome.failed = failed.len();
1256                outcome.failed_addresses = failed;
1257                return Ok(outcome);
1258            }
1259
1260            // Quorum-short chunks from this batch survive to the next round.
1261            remaining.extend(batch_outcome.failed_addresses);
1262        }
1263    }
1264
1265    outcome.failed = remaining.len();
1266    outcome.failed_addresses = remaining;
1267    Ok(outcome)
1268}
1269
1270/// Phase 2 of external-signer merkle payment: generate proofs from winner.
1271///
1272/// Takes the prepared batch and the winner pool hash returned by the
1273/// on-chain payment transaction. Generates per-chunk merkle proofs.
1274pub fn finalize_merkle_batch(
1275    prepared: PreparedMerkleBatch,
1276    winner_pool_hash: [u8; 32],
1277) -> Result<MerkleBatchPaymentResult> {
1278    let chunk_count = prepared.addresses.len();
1279    let xornames: Vec<XorName> = prepared.addresses.iter().map(|a| XorName(*a)).collect();
1280
1281    // Find the winner pool
1282    let winner_pool = prepared
1283        .candidate_pools
1284        .iter()
1285        .find(|pool| pool.hash() == winner_pool_hash)
1286        .ok_or_else(|| {
1287            Error::Payment(format!(
1288                "Winner pool {} not found in candidate pools",
1289                hex::encode(winner_pool_hash)
1290            ))
1291        })?;
1292
1293    // Generate proofs for each chunk
1294    info!("Generating merkle proofs for {chunk_count} chunks");
1295    let mut proofs = HashMap::with_capacity(chunk_count);
1296
1297    for (i, xorname) in xornames.iter().enumerate() {
1298        let address_proof = prepared
1299            .tree
1300            .generate_address_proof(i, *xorname)
1301            .map_err(|e| {
1302                Error::Payment(format!(
1303                    "Failed to generate address proof for chunk {i}: {e}"
1304                ))
1305            })?;
1306
1307        let merkle_proof = MerklePaymentProof::new(*xorname, address_proof, winner_pool.clone());
1308
1309        let tagged_bytes = serialize_merkle_proof(&merkle_proof)
1310            .map_err(|e| Error::Serialization(format!("Failed to serialize merkle proof: {e}")))?;
1311
1312        proofs.insert(prepared.addresses[i], tagged_bytes);
1313    }
1314
1315    info!("Merkle batch payment complete: {chunk_count} proofs generated");
1316
1317    Ok(MerkleBatchPaymentResult {
1318        proofs,
1319        chunk_count,
1320        storage_cost_atto: "0".to_string(),
1321        gas_cost_wei: 0,
1322        merkle_payment_timestamp: prepared.merkle_payment_timestamp,
1323    })
1324}
1325
1326/// Compile-time assertions that merkle method futures are Send.
1327#[cfg(test)]
1328mod send_assertions {
1329    use super::*;
1330    use crate::data::client::Client;
1331
1332    fn _assert_send<T: Send>(_: &T) {}
1333
1334    #[allow(
1335        dead_code,
1336        unreachable_code,
1337        unused_variables,
1338        clippy::diverging_sub_expression
1339    )]
1340    async fn _merkle_upload_chunks_is_send(client: &Client) {
1341        let batch_result: MerkleBatchPaymentResult = todo!();
1342        let fut = client.merkle_upload_chunks(Vec::new(), Vec::new(), &batch_result, None, 0, 0);
1343        _assert_send(&fut);
1344    }
1345}
1346
1347#[cfg(test)]
1348#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1349mod tests {
1350    use super::*;
1351    use ant_protocol::evm::{Amount, MerkleTree, RewardsAddress, CANDIDATES_PER_POOL};
1352
1353    // =========================================================================
1354    // should_use_merkle (free function, no Client needed)
1355    // =========================================================================
1356
1357    #[test]
1358    fn test_auto_below_threshold() {
1359        assert!(!should_use_merkle(1, PaymentMode::Auto));
1360        assert!(!should_use_merkle(10, PaymentMode::Auto));
1361        assert!(!should_use_merkle(63, PaymentMode::Auto));
1362    }
1363
1364    #[test]
1365    fn test_auto_at_and_above_threshold() {
1366        assert!(should_use_merkle(64, PaymentMode::Auto));
1367        assert!(should_use_merkle(65, PaymentMode::Auto));
1368        assert!(should_use_merkle(1000, PaymentMode::Auto));
1369    }
1370
1371    #[test]
1372    fn test_merkle_mode_forces_at_2() {
1373        assert!(!should_use_merkle(1, PaymentMode::Merkle));
1374        assert!(should_use_merkle(2, PaymentMode::Merkle));
1375        assert!(should_use_merkle(3, PaymentMode::Merkle));
1376    }
1377
1378    #[test]
1379    fn test_single_mode_always_false() {
1380        assert!(!should_use_merkle(0, PaymentMode::Single));
1381        assert!(!should_use_merkle(64, PaymentMode::Single));
1382        assert!(!should_use_merkle(1000, PaymentMode::Single));
1383    }
1384
1385    #[test]
1386    fn test_default_mode_is_auto() {
1387        assert_eq!(PaymentMode::default(), PaymentMode::Auto);
1388    }
1389
1390    #[test]
1391    fn test_threshold_value() {
1392        assert_eq!(DEFAULT_MERKLE_THRESHOLD, 64);
1393    }
1394
1395    // =========================================================================
1396    // preflight_stored_status — must degrade gracefully, never abort the batch
1397    // =========================================================================
1398
1399    #[test]
1400    fn test_preflight_quotes_gathered_means_not_stored() {
1401        assert!(matches!(preflight_stored_status(Ok(())), Ok(false)));
1402    }
1403
1404    #[test]
1405    fn test_preflight_already_stored_is_stored() {
1406        let r: Result<()> = Err(Error::AlreadyStored);
1407        assert!(matches!(preflight_stored_status(r), Ok(true)));
1408    }
1409
1410    /// The regression: a transient quote-quorum failure during the preflight
1411    /// must NOT propagate (which would abort the whole forced-merkle upload).
1412    /// It is treated as "not known to be stored" → queue the chunk for upload.
1413    #[test]
1414    fn test_preflight_transient_quote_failure_does_not_abort() {
1415        // The exact error STG-01 hit: couldn't gather a 7-quote quorum.
1416        let insufficient: Result<()> =
1417            Err(Error::InsufficientPeers("Got 5 quotes, need 7".to_string()));
1418        assert!(
1419            matches!(preflight_stored_status(insufficient), Ok(false)),
1420            "insufficient-peers during preflight must degrade to not-stored, not error"
1421        );
1422
1423        let timeout: Result<()> = Err(Error::Timeout("Timeout waiting for quote".to_string()));
1424        assert!(matches!(preflight_stored_status(timeout), Ok(false)));
1425
1426        let network: Result<()> = Err(Error::Network("connection reset".to_string()));
1427        assert!(matches!(preflight_stored_status(network), Ok(false)));
1428    }
1429
1430    /// Genuine application errors still propagate — the preflight should not
1431    /// silently swallow problems that would recur on a healthy link.
1432    #[test]
1433    fn test_preflight_application_error_propagates() {
1434        let payment: Result<()> = Err(Error::Payment("bad payment".to_string()));
1435        assert!(matches!(
1436            preflight_stored_status(payment),
1437            Err(Error::Payment(_))
1438        ));
1439    }
1440
1441    #[test]
1442    fn chunk_contents_for_upload_addresses_preserves_requested_order() {
1443        let first = Bytes::from_static(b"first");
1444        let second = Bytes::from_static(b"second");
1445        let first_addr = compute_address(&first);
1446        let second_addr = compute_address(&second);
1447
1448        let selected = chunk_contents_for_upload_addresses(
1449            vec![first.clone(), second.clone()],
1450            &[second_addr, first_addr],
1451        )
1452        .unwrap();
1453
1454        assert_eq!(selected, vec![second, first]);
1455    }
1456
1457    #[test]
1458    fn chunk_contents_for_upload_addresses_preserves_duplicate_requests() {
1459        let repeated = Bytes::from_static(b"same-content");
1460        let other = Bytes::from_static(b"other-content");
1461        let repeated_addr = compute_address(&repeated);
1462
1463        let selected = chunk_contents_for_upload_addresses(
1464            vec![repeated.clone(), other, repeated.clone()],
1465            &[repeated_addr, repeated_addr],
1466        )
1467        .unwrap();
1468
1469        assert_eq!(selected, vec![repeated.clone(), repeated]);
1470    }
1471
1472    #[test]
1473    fn chunk_contents_for_upload_addresses_ignores_unrequested_duplicates() {
1474        let requested = Bytes::from_static(b"requested-content");
1475        let unrequested = Bytes::from_static(b"unrequested-content");
1476        let requested_addr = compute_address(&requested);
1477
1478        let selected = chunk_contents_for_upload_addresses(
1479            vec![
1480                unrequested.clone(),
1481                requested.clone(),
1482                unrequested.clone(),
1483                unrequested,
1484            ],
1485            &[requested_addr],
1486        )
1487        .unwrap();
1488
1489        assert_eq!(selected, vec![requested]);
1490    }
1491
1492    #[test]
1493    fn chunk_contents_for_upload_addresses_errors_for_missing_content() {
1494        let present = Bytes::from_static(b"present-content");
1495        let missing = Bytes::from_static(b"missing-content");
1496        let missing_addr = compute_address(&missing);
1497
1498        let result = chunk_contents_for_upload_addresses(vec![present], &[missing_addr]);
1499
1500        assert!(matches!(result, Err(Error::InvalidData(_))));
1501    }
1502
1503    // =========================================================================
1504    // MerkleTree construction and proof generation (pure, no network)
1505    // =========================================================================
1506
1507    fn make_test_addresses(count: usize) -> Vec<[u8; 32]> {
1508        (0..count)
1509            .map(|i| {
1510                let xn = XorName::from_content(&i.to_le_bytes());
1511                xn.0
1512            })
1513            .collect()
1514    }
1515
1516    #[test]
1517    fn test_tree_depth_for_known_sizes() {
1518        let cases = [(2, 1), (4, 2), (16, 4), (100, 7), (256, 8)];
1519        for (count, expected_depth) in cases {
1520            let addrs = make_test_addresses(count);
1521            let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1522            let tree = MerkleTree::from_xornames(xornames).unwrap();
1523            assert_eq!(
1524                tree.depth(),
1525                expected_depth,
1526                "depth mismatch for {count} leaves"
1527            );
1528        }
1529    }
1530
1531    #[test]
1532    fn test_proof_generation_and_verification_for_all_leaves() {
1533        let addrs = make_test_addresses(16);
1534        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1535        let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1536
1537        for (i, xn) in xornames.iter().enumerate() {
1538            let proof = tree.generate_address_proof(i, *xn).unwrap();
1539            assert!(proof.verify(), "proof for leaf {i} should verify");
1540            assert_eq!(proof.depth(), tree.depth() as usize);
1541        }
1542    }
1543
1544    #[test]
1545    fn test_proof_fails_for_wrong_address() {
1546        let addrs = make_test_addresses(8);
1547        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1548        let tree = MerkleTree::from_xornames(xornames).unwrap();
1549
1550        let wrong = XorName::from_content(b"wrong");
1551        let proof = tree.generate_address_proof(0, wrong).unwrap();
1552        assert!(!proof.verify(), "proof with wrong address should fail");
1553    }
1554
1555    #[test]
1556    fn test_tree_too_few_leaves() {
1557        let xornames = vec![XorName::from_content(b"only_one")];
1558        let result = MerkleTree::from_xornames(xornames);
1559        assert!(result.is_err());
1560    }
1561
1562    #[test]
1563    fn test_tree_at_max_leaves() {
1564        let addrs = make_test_addresses(MAX_LEAVES);
1565        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1566        let tree = MerkleTree::from_xornames(xornames).unwrap();
1567        assert_eq!(tree.leaf_count(), MAX_LEAVES);
1568    }
1569
1570    // =========================================================================
1571    // Proof serialization round-trip
1572    // =========================================================================
1573
1574    #[test]
1575    fn test_merkle_proof_serialize_deserialize_roundtrip() {
1576        use ant_protocol::evm::{Amount, MerklePaymentCandidateNode, RewardsAddress};
1577        use ant_protocol::payment::{deserialize_merkle_proof, serialize_merkle_proof};
1578
1579        let addrs = make_test_addresses(4);
1580        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1581        let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1582
1583        let timestamp = std::time::SystemTime::now()
1584            .duration_since(std::time::UNIX_EPOCH)
1585            .unwrap()
1586            .as_secs();
1587
1588        let candidates = tree.reward_candidates(timestamp).unwrap();
1589        let midpoint = candidates.first().unwrap().clone();
1590
1591        // Build candidate nodes (with dummy signatures — not ML-DSA, just for serialization test)
1592        #[allow(clippy::cast_possible_truncation)]
1593        let candidate_nodes: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
1594            std::array::from_fn(|i| MerklePaymentCandidateNode {
1595                pub_key: vec![i as u8; 32],
1596                price: Amount::from(1024u64),
1597                reward_address: RewardsAddress::new([i as u8; 20]),
1598                merkle_payment_timestamp: timestamp,
1599                signature: vec![i as u8; 64],
1600            });
1601
1602        let pool = MerklePaymentCandidatePool {
1603            midpoint_proof: midpoint,
1604            candidate_nodes,
1605        };
1606
1607        let address_proof = tree.generate_address_proof(0, xornames[0]).unwrap();
1608        let merkle_proof = MerklePaymentProof::new(xornames[0], address_proof, pool);
1609
1610        let tagged = serialize_merkle_proof(&merkle_proof).unwrap();
1611        assert_eq!(
1612            tagged.first().copied(),
1613            Some(0x02),
1614            "tag should be PROOF_TAG_MERKLE"
1615        );
1616
1617        let deserialized = deserialize_merkle_proof(&tagged).unwrap();
1618        assert_eq!(deserialized.address, merkle_proof.address);
1619        assert_eq!(
1620            deserialized.winner_pool.candidate_nodes.len(),
1621            CANDIDATES_PER_POOL
1622        );
1623    }
1624
1625    // =========================================================================
1626    // Candidate validation logic
1627    // =========================================================================
1628
1629    #[test]
1630    fn test_candidate_wrong_timestamp_rejected() {
1631        // Simulates what collect_validated_candidates checks
1632        let candidate = MerklePaymentCandidateNode {
1633            pub_key: vec![0u8; 32],
1634            price: ant_protocol::evm::Amount::ZERO,
1635            reward_address: ant_protocol::evm::RewardsAddress::new([0u8; 20]),
1636            merkle_payment_timestamp: 1000,
1637            signature: vec![0u8; 64],
1638        };
1639
1640        // Timestamp check: 1000 != 2000
1641        assert_ne!(candidate.merkle_payment_timestamp, 2000);
1642    }
1643
1644    // =========================================================================
1645    // finalize_merkle_batch (external signer)
1646    // =========================================================================
1647
1648    fn make_dummy_candidate_nodes(
1649        timestamp: u64,
1650    ) -> [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] {
1651        std::array::from_fn(|i| MerklePaymentCandidateNode {
1652            pub_key: vec![i as u8; 32],
1653            price: Amount::from(1024u64),
1654            reward_address: RewardsAddress::new([i as u8; 20]),
1655            merkle_payment_timestamp: timestamp,
1656            signature: vec![i as u8; 64],
1657        })
1658    }
1659
1660    fn make_prepared_merkle_batch(count: usize) -> PreparedMerkleBatch {
1661        let addrs = make_test_addresses(count);
1662        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1663        let tree = MerkleTree::from_xornames(xornames).unwrap();
1664
1665        let timestamp = std::time::SystemTime::now()
1666            .duration_since(std::time::UNIX_EPOCH)
1667            .unwrap()
1668            .as_secs();
1669
1670        let midpoints = tree.reward_candidates(timestamp).unwrap();
1671
1672        let candidate_pools: Vec<MerklePaymentCandidatePool> = midpoints
1673            .into_iter()
1674            .map(|mp| MerklePaymentCandidatePool {
1675                midpoint_proof: mp,
1676                candidate_nodes: make_dummy_candidate_nodes(timestamp),
1677            })
1678            .collect();
1679
1680        let pool_commitments = candidate_pools
1681            .iter()
1682            .map(MerklePaymentCandidatePool::to_commitment)
1683            .collect();
1684
1685        PreparedMerkleBatch {
1686            depth: tree.depth(),
1687            pool_commitments,
1688            merkle_payment_timestamp: timestamp,
1689            candidate_pools,
1690            tree,
1691            addresses: addrs,
1692        }
1693    }
1694
1695    #[test]
1696    fn test_finalize_merkle_batch_with_valid_winner() {
1697        let prepared = make_prepared_merkle_batch(4);
1698        let winner_hash = prepared.candidate_pools[0].hash();
1699
1700        let result = finalize_merkle_batch(prepared, winner_hash);
1701        assert!(
1702            result.is_ok(),
1703            "should succeed with valid winner: {result:?}"
1704        );
1705
1706        let batch = result.unwrap();
1707        assert_eq!(batch.chunk_count, 4);
1708        assert_eq!(batch.proofs.len(), 4);
1709
1710        // Every proof should be non-empty
1711        for proof_bytes in batch.proofs.values() {
1712            assert!(!proof_bytes.is_empty());
1713        }
1714    }
1715
1716    #[test]
1717    fn test_finalize_merkle_batch_with_invalid_winner() {
1718        let prepared = make_prepared_merkle_batch(4);
1719        let bad_hash = [0xFF; 32];
1720
1721        let result = finalize_merkle_batch(prepared, bad_hash);
1722        assert!(result.is_err());
1723        let err = result.unwrap_err().to_string();
1724        assert!(err.contains("not found in candidate pools"), "got: {err}");
1725    }
1726
1727    #[test]
1728    fn test_finalize_merkle_batch_proofs_are_deserializable() {
1729        use ant_protocol::payment::deserialize_merkle_proof;
1730
1731        let prepared = make_prepared_merkle_batch(8);
1732        let winner_hash = prepared.candidate_pools[0].hash();
1733
1734        let batch = finalize_merkle_batch(prepared, winner_hash).unwrap();
1735
1736        for (addr, proof_bytes) in &batch.proofs {
1737            let proof = deserialize_merkle_proof(proof_bytes);
1738            assert!(
1739                proof.is_ok(),
1740                "proof for {} should deserialize: {:?}",
1741                hex::encode(addr),
1742                proof.err()
1743            );
1744        }
1745    }
1746
1747    // =========================================================================
1748    // Batch splitting edge cases
1749    // =========================================================================
1750
1751    #[test]
1752    fn test_batch_split_calculation() {
1753        // MAX_LEAVES chunks should fit in 1 batch
1754        let addrs = make_test_addresses(MAX_LEAVES);
1755        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 1);
1756
1757        // MAX_LEAVES + 1 should split into 2
1758        let addrs = make_test_addresses(MAX_LEAVES + 1);
1759        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 2);
1760
1761        // 3 * MAX_LEAVES should split into 3
1762        let addrs = make_test_addresses(3 * MAX_LEAVES);
1763        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 3);
1764    }
1765
1766    // =========================================================================
1767    // merkle_store_with_retry: collect-not-abort + bounded retry (C2.1 / C2.2)
1768    // =========================================================================
1769
1770    use std::sync::{Arc, Mutex};
1771
1772    /// Build `count` (addr, content) pairs for the retry helper.
1773    fn make_chunks(count: usize) -> Vec<([u8; 32], Bytes)> {
1774        make_test_addresses(count)
1775            .into_iter()
1776            .map(|addr| (addr, Bytes::from_static(b"chunk")))
1777            .collect()
1778    }
1779
1780    /// C2.1: a per-chunk `InsufficientPeers` is collected, not propagated —
1781    /// the whole batch must NOT abort. With a single attempt, the failing
1782    /// subset is reported via `failed` and the rest are `stored`.
1783    #[tokio::test]
1784    async fn store_with_retry_collects_failures_instead_of_aborting() {
1785        let chunks = make_chunks(6);
1786        let failing: std::collections::HashSet<[u8; 32]> =
1787            chunks.iter().take(2).map(|(a, _)| *a).collect();
1788        let failing_for_closure = failing.clone();
1789
1790        let store_one = move |addr: [u8; 32], _content: Bytes| {
1791            let fail = failing_for_closure.contains(&addr);
1792            async move {
1793                if fail {
1794                    Err(Error::InsufficientPeers("test shortfall".into()))
1795                } else {
1796                    Ok(std::time::Instant::now())
1797                }
1798            }
1799        };
1800
1801        let outcome = merkle_store_with_retry(chunks, 8, 1, Duration::ZERO, None, 0, 6, store_one)
1802            .await
1803            .expect("quorum shortfalls must not abort the batch");
1804
1805        assert_eq!(outcome.stored, 4);
1806        assert_eq!(outcome.failed, 2);
1807        // Single attempt → all successes recorded in round 0.
1808        assert_eq!(outcome.stats.retries_histogram[0], 4);
1809        assert_eq!(outcome.stats.chunk_attempts_total, 6);
1810    }
1811
1812    /// V2-468: an app-only quorum shortfall surfaces as `Error::RemotePut`
1813    /// (pool-rejected / quote-stale / disk-full — transient), which must be
1814    /// treated as recoverable just like `InsufficientPeers`: collected and
1815    /// retried, never aborting the whole batch.
1816    #[tokio::test]
1817    async fn store_with_retry_treats_remote_put_as_recoverable() {
1818        let chunks = make_chunks(6);
1819        let failing: std::collections::HashSet<[u8; 32]> =
1820            chunks.iter().take(2).map(|(a, _)| *a).collect();
1821        let failing_for_closure = failing.clone();
1822
1823        let store_one = move |addr: [u8; 32], _content: Bytes| {
1824            let fail = failing_for_closure.contains(&addr);
1825            async move {
1826                if fail {
1827                    Err(Error::RemotePut {
1828                        address: hex::encode(addr),
1829                        source: ant_protocol::ProtocolError::StorageFailed(
1830                            "insufficient disk space".into(),
1831                        ),
1832                    })
1833                } else {
1834                    Ok(std::time::Instant::now())
1835                }
1836            }
1837        };
1838
1839        let outcome = merkle_store_with_retry(chunks, 8, 1, Duration::ZERO, None, 0, 6, store_one)
1840            .await
1841            .expect("remote app-rejections must not abort the batch");
1842
1843        assert_eq!(outcome.stored, 4);
1844        assert_eq!(outcome.failed, 2);
1845    }
1846
1847    /// A non-quorum error (e.g. a missing proof) is captured in `fatal` rather
1848    /// than discarded — the call returns `Ok(outcome)` so the caller can decide
1849    /// whether to re-raise it or fold it into `PartialUpload`.
1850    #[tokio::test]
1851    async fn store_with_retry_reports_non_quorum_errors_as_fatal() {
1852        let chunks = make_chunks(3);
1853        let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1854            Err::<std::time::Instant, _>(Error::Payment("missing proof".into()))
1855        };
1856
1857        let outcome = merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, 3, store_one)
1858            .await
1859            .expect("fatal is carried in the outcome, not returned as Err");
1860        assert!(matches!(outcome.fatal, Some(Error::Payment(_))));
1861    }
1862
1863    /// A fatal error mid-pass preserves the successes that already completed in
1864    /// the same pass — they are not discarded with the abort. Concurrency 1
1865    /// makes ordering deterministic: the first five chunks store, then the sixth
1866    /// aborts fatally.
1867    #[tokio::test]
1868    async fn store_with_retry_fatal_preserves_same_pass_successes() {
1869        let chunks = make_chunks(6);
1870        let bad = chunks[5].0;
1871        let store_one = move |addr: [u8; 32], _content: Bytes| async move {
1872            if addr == bad {
1873                Err(Error::Payment("fatal".into()))
1874            } else {
1875                Ok(std::time::Instant::now())
1876            }
1877        };
1878
1879        let outcome = merkle_store_with_retry(chunks, 1, 1, Duration::ZERO, None, 0, 6, store_one)
1880            .await
1881            .expect("fatal carried in outcome, not returned as Err");
1882        assert!(matches!(outcome.fatal, Some(Error::Payment(_))));
1883        // The five chunks stored before the abort are preserved, not lost.
1884        assert_eq!(outcome.stored, 5);
1885        assert_eq!(outcome.stored_addresses.len(), 5);
1886        assert!(!outcome.stored_addresses.contains(&bad));
1887        // The fatal chunk is reported as failed (not silently dropped).
1888        assert!(outcome.failed_addresses.iter().any(|(a, _)| *a == bad));
1889    }
1890
1891    /// C2.2: only the chunks that failed the previous round are retried.
1892    #[tokio::test]
1893    async fn store_with_retry_retries_only_the_failed_set() {
1894        let chunks = make_chunks(5);
1895        let total = chunks.len();
1896        let failing: std::collections::HashSet<[u8; 32]> =
1897            chunks.iter().take(2).map(|(a, _)| *a).collect();
1898        let failing_for_closure = failing.clone();
1899
1900        // Record every (addr) the store op was invoked with, in call order.
1901        let calls = Arc::new(Mutex::new(Vec::<[u8; 32]>::new()));
1902        let calls_for_closure = calls.clone();
1903
1904        let store_one = move |addr: [u8; 32], _content: Bytes| {
1905            let calls = calls_for_closure.clone();
1906            // Fails the first round only; succeeds thereafter.
1907            let already_seen = calls.lock().unwrap().iter().filter(|&&a| a == addr).count();
1908            let fail = failing_for_closure.contains(&addr) && already_seen == 0;
1909            calls.lock().unwrap().push(addr);
1910            async move {
1911                if fail {
1912                    Err(Error::InsufficientPeers("round-1 shortfall".into()))
1913                } else {
1914                    Ok(std::time::Instant::now())
1915                }
1916            }
1917        };
1918
1919        let outcome =
1920            merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1921                .await
1922                .expect("should converge after retry");
1923
1924        assert_eq!(outcome.stored, total);
1925        assert_eq!(outcome.failed, 0);
1926
1927        // Round 1 drains fully before round 2 starts, so the call log is
1928        // segmented: first `total` calls = round 1 (all chunks), the rest =
1929        // the retry round, which must contain ONLY the failing set.
1930        let calls = calls.lock().unwrap();
1931        assert_eq!(calls.len(), total + failing.len());
1932        let round_two: std::collections::HashSet<[u8; 32]> =
1933            calls[total..].iter().copied().collect();
1934        assert_eq!(round_two, failing);
1935    }
1936
1937    /// C2.2: a chunk that fails attempt 1 and succeeds attempt 2 is counted
1938    /// once as stored and recorded as one retry in `retries_histogram[1]`.
1939    #[tokio::test]
1940    async fn store_with_retry_counts_retry_success_once_in_histogram() {
1941        let chunks = make_chunks(4);
1942        let total = chunks.len();
1943        let flaky_addr = chunks[0].0;
1944
1945        let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
1946        let attempts_for_closure = attempts.clone();
1947
1948        let store_one = move |addr: [u8; 32], _content: Bytes| {
1949            let attempts = attempts_for_closure.clone();
1950            let n = {
1951                let mut m = attempts.lock().unwrap();
1952                let entry = m.entry(addr).or_insert(0);
1953                *entry += 1;
1954                *entry
1955            };
1956            let fail = addr == flaky_addr && n == 1;
1957            async move {
1958                if fail {
1959                    Err(Error::InsufficientPeers("transient".into()))
1960                } else {
1961                    Ok(std::time::Instant::now())
1962                }
1963            }
1964        };
1965
1966        let outcome =
1967            merkle_store_with_retry(chunks, 8, 3, Duration::ZERO, None, 0, total, store_one)
1968                .await
1969                .expect("flaky chunk should recover on retry");
1970
1971        assert_eq!(outcome.stored, total);
1972        assert_eq!(outcome.failed, 0);
1973        // 3 chunks landed on the first attempt, 1 on the first retry.
1974        assert_eq!(outcome.stats.retries_histogram[0], total - 1);
1975        assert_eq!(outcome.stats.retries_histogram[1], 1);
1976        // One extra store attempt for the flaky chunk.
1977        assert_eq!(outcome.stats.chunk_attempts_total, total + 1);
1978    }
1979
1980    /// C2.2: when every chunk stays short of quorum through the whole attempt
1981    /// budget, the helper still returns `Ok` (collect-not-abort) with the full
1982    /// batch reported as `failed`, having tried each chunk exactly
1983    /// `MERKLE_STORE_MAX_ATTEMPTS` times.
1984    #[tokio::test]
1985    async fn store_with_retry_reports_all_failed_when_retries_exhausted() {
1986        let chunks = make_chunks(3);
1987        let total = chunks.len();
1988
1989        let store_one = |_addr: [u8; 32], _content: Bytes| async move {
1990            Err::<std::time::Instant, _>(Error::InsufficientPeers("never converges".into()))
1991        };
1992
1993        let outcome = merkle_store_with_retry(
1994            chunks,
1995            8,
1996            MERKLE_STORE_MAX_ATTEMPTS,
1997            Duration::ZERO,
1998            None,
1999            0,
2000            total,
2001            store_one,
2002        )
2003        .await
2004        .expect("an exhausted retry budget is reported, not propagated as Err");
2005
2006        assert_eq!(outcome.stored, 0);
2007        assert_eq!(outcome.failed, total);
2008        // Every chunk was attempted once per round across the full budget.
2009        assert_eq!(
2010            outcome.stats.chunk_attempts_total,
2011            total * MERKLE_STORE_MAX_ATTEMPTS
2012        );
2013        // No successes, so the histogram stays empty.
2014        assert_eq!(outcome.stats.retries_histogram, [0; 4]);
2015    }
2016
2017    /// D (CLI path): when retries are exhausted, `failed_addresses` names
2018    /// exactly the still-short-of-quorum chunks (with their last error message)
2019    /// and excludes the ones that stored. This is what `upload_waves_merkle`
2020    /// uses to build `PartialUpload`.
2021    #[tokio::test]
2022    async fn store_with_retry_records_failed_addresses_when_exhausted() {
2023        let chunks = make_chunks(6);
2024        let failing: std::collections::HashSet<[u8; 32]> =
2025            chunks.iter().take(2).map(|(a, _)| *a).collect();
2026        let failing_for_closure = failing.clone();
2027
2028        let store_one = move |addr: [u8; 32], _content: Bytes| {
2029            let fail = failing_for_closure.contains(&addr);
2030            async move {
2031                if fail {
2032                    Err(Error::InsufficientPeers("permanent shortfall".into()))
2033                } else {
2034                    Ok(std::time::Instant::now())
2035                }
2036            }
2037        };
2038
2039        let outcome = merkle_store_with_retry(
2040            chunks,
2041            8,
2042            MERKLE_STORE_MAX_ATTEMPTS,
2043            Duration::ZERO,
2044            None,
2045            0,
2046            6,
2047            store_one,
2048        )
2049        .await
2050        .expect("quorum shortfalls must not abort the batch");
2051
2052        assert_eq!(outcome.stored, 4);
2053        assert_eq!(outcome.failed, 2);
2054        // `failed_addresses` names exactly the failing set, no stored chunks.
2055        assert_eq!(outcome.failed_addresses.len(), 2);
2056        let reported: std::collections::HashSet<[u8; 32]> =
2057            outcome.failed_addresses.iter().map(|(a, _)| *a).collect();
2058        assert_eq!(reported, failing);
2059        // Each carries a non-empty error message for the PartialUpload report.
2060        for (_, msg) in &outcome.failed_addresses {
2061            assert!(msg.contains("permanent shortfall"));
2062        }
2063    }
2064
2065    /// `failed_addresses` is empty when every chunk reaches quorum (no
2066    /// `PartialUpload` is raised by the CLI path in that case).
2067    #[tokio::test]
2068    async fn store_with_retry_failed_addresses_empty_on_full_success() {
2069        let chunks = make_chunks(4);
2070        let total = chunks.len();
2071        let store_one =
2072            |_addr: [u8; 32], _content: Bytes| async move { Ok(std::time::Instant::now()) };
2073
2074        let outcome = merkle_store_with_retry(
2075            chunks,
2076            8,
2077            MERKLE_STORE_MAX_ATTEMPTS,
2078            Duration::ZERO,
2079            None,
2080            0,
2081            total,
2082            store_one,
2083        )
2084        .await
2085        .expect("all chunks store");
2086
2087        assert_eq!(outcome.stored, total);
2088        assert_eq!(outcome.failed, 0);
2089        assert!(outcome.failed_addresses.is_empty());
2090    }
2091
2092    // =========================================================================
2093    // merkle_deferred_retry: download-style concurrent post-wave retry (V2-466)
2094    // =========================================================================
2095
2096    /// The histogram slot mapping: the wave first pass is slot 0; deferred
2097    /// round `r` is slot `r + 1`, clamped to the last slot.
2098    #[test]
2099    fn deferred_round_histogram_slot_maps_and_clamps() {
2100        assert_eq!(deferred_round_histogram_slot(0, 4), 1);
2101        assert_eq!(deferred_round_histogram_slot(1, 4), 2);
2102        assert_eq!(deferred_round_histogram_slot(2, 4), 3);
2103        // Beyond the histogram width, clamp to the final slot.
2104        assert_eq!(deferred_round_histogram_slot(3, 4), 3);
2105        assert_eq!(deferred_round_histogram_slot(9, 4), 3);
2106    }
2107
2108    /// Re-read bodies for a deferred set from a fake "spill": every requested
2109    /// address is returned paired with a stub body. Zero delays so tests do not
2110    /// actually sleep between rounds.
2111    fn fake_read_bodies(addrs: &[[u8; 32]]) -> Result<Vec<([u8; 32], Bytes)>> {
2112        Ok(addrs
2113            .iter()
2114            .map(|a| (*a, Bytes::from_static(b"deferred-body")))
2115            .collect())
2116    }
2117
2118    fn deferred_set(count: usize) -> Vec<([u8; 32], String)> {
2119        make_test_addresses(count)
2120            .into_iter()
2121            .map(|addr| (addr, "short of quorum".to_string()))
2122            .collect()
2123    }
2124
2125    /// A chunk that is quorum-short on early rounds but succeeds on a later
2126    /// round is stored exactly once, recorded in that round's histogram slot,
2127    /// and reported with no failures.
2128    #[tokio::test]
2129    async fn deferred_retry_succeeds_on_a_later_round() {
2130        let deferred = deferred_set(3);
2131        // Each chunk fails its first attempt (round 0) and succeeds the second
2132        // (round 1 → histogram slot 2).
2133        let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2134        let attempts_for_closure = attempts.clone();
2135        let store_one = move |addr: [u8; 32], _content: Bytes| {
2136            let attempts = attempts_for_closure.clone();
2137            async move {
2138                let n = {
2139                    let mut map = attempts.lock().unwrap();
2140                    let e = map.entry(addr).or_insert(0);
2141                    *e += 1;
2142                    *e
2143                };
2144                if n < 2 {
2145                    Err(Error::InsufficientPeers("still short".into()))
2146                } else {
2147                    Ok(std::time::Instant::now())
2148                }
2149            }
2150        };
2151
2152        let outcome = merkle_deferred_retry(
2153            deferred,
2154            &[0, 0, 0],
2155            64,
2156            fake_read_bodies,
2157            |n: usize| n.max(1),
2158            None,
2159            0,
2160            3,
2161            store_one,
2162        )
2163        .await
2164        .expect("deferred retry must not abort on quorum shortfalls");
2165
2166        assert_eq!(outcome.stored, 3, "all three land by round 1");
2167        assert_eq!(outcome.stored_addresses.len(), 3);
2168        assert_eq!(outcome.failed, 0);
2169        assert!(outcome.failed_addresses.is_empty());
2170        assert!(outcome.fatal.is_none());
2171        // Round 1 → slot 2; round 0 (slot 1) saw zero successes.
2172        assert_eq!(outcome.stats.retries_histogram[1], 0);
2173        assert_eq!(outcome.stats.retries_histogram[2], 3);
2174        // Each chunk attempted twice: one failed round + one success round.
2175        assert_eq!(outcome.stats.chunk_attempts_total, 6);
2176    }
2177
2178    /// Chunks still short of quorum after the final deferred round become
2179    /// `failed`, not silently dropped, and no fatal error is set.
2180    #[tokio::test]
2181    async fn deferred_retry_leftovers_become_failed() {
2182        let deferred = deferred_set(2);
2183        let store_one = |_addr: [u8; 32], _content: Bytes| async move {
2184            Err::<std::time::Instant, _>(Error::InsufficientPeers("always short".into()))
2185        };
2186
2187        let outcome = merkle_deferred_retry(
2188            deferred,
2189            &[0, 0, 0],
2190            64,
2191            fake_read_bodies,
2192            |n: usize| n.max(1),
2193            None,
2194            0,
2195            2,
2196            store_one,
2197        )
2198        .await
2199        .expect("exhausted retries report failures, not an error");
2200
2201        assert_eq!(outcome.stored, 0);
2202        assert!(outcome.stored_addresses.is_empty());
2203        assert_eq!(outcome.failed, 2);
2204        assert_eq!(outcome.failed_addresses.len(), 2);
2205        assert!(outcome.fatal.is_none());
2206        // Three rounds × two chunks, all failing.
2207        assert_eq!(outcome.stats.chunk_attempts_total, 6);
2208    }
2209
2210    /// A non-quorum (fatal) error during a deferred round stops the pass, is
2211    /// surfaced via `fatal`, and preserves an earlier round's success in
2212    /// `stored`/`stored_addresses` while the still-pending chunk is reported as
2213    /// failed.
2214    #[tokio::test]
2215    async fn deferred_retry_fatal_error_preserves_prior_progress() {
2216        let addrs = make_test_addresses(2);
2217        let good = addrs[0];
2218        let bad = addrs[1];
2219        let deferred = vec![(good, "short".to_string()), (bad, "short".to_string())];
2220
2221        // `good` succeeds on round 0; `bad` is quorum-short on round 0, then
2222        // hits a fatal Payment error on round 1.
2223        let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2224        let attempts_for_closure = attempts.clone();
2225        let store_one = move |addr: [u8; 32], _content: Bytes| {
2226            let attempts = attempts_for_closure.clone();
2227            async move {
2228                let n = {
2229                    let mut map = attempts.lock().unwrap();
2230                    let e = map.entry(addr).or_insert(0);
2231                    *e += 1;
2232                    *e
2233                };
2234                if addr == good {
2235                    Ok(std::time::Instant::now())
2236                } else if n == 1 {
2237                    Err(Error::InsufficientPeers("short".into()))
2238                } else {
2239                    Err(Error::Payment("fatal on retry".into()))
2240                }
2241            }
2242        };
2243
2244        let outcome = merkle_deferred_retry(
2245            deferred,
2246            &[0, 0, 0],
2247            64,
2248            fake_read_bodies,
2249            |n: usize| n.max(1),
2250            None,
2251            0,
2252            2,
2253            store_one,
2254        )
2255        .await
2256        .expect("a fatal round error is reported via `fatal`, not as Err");
2257
2258        assert!(outcome.fatal.is_some(), "fatal error must be captured");
2259        assert_eq!(outcome.stored, 1, "round-0 success preserved");
2260        assert_eq!(outcome.stored_addresses, vec![good]);
2261        assert_eq!(outcome.failed, 1);
2262        assert_eq!(outcome.failed_addresses.len(), 1);
2263        assert_eq!(outcome.failed_addresses[0].0, bad);
2264    }
2265
2266    /// An empty deferred set is a no-op: no rounds run, nothing stored or failed.
2267    #[tokio::test]
2268    async fn deferred_retry_empty_set_is_a_noop() {
2269        let store_one = |_addr: [u8; 32], _content: Bytes| async move {
2270            Err::<std::time::Instant, _>(Error::InsufficientPeers("unused".into()))
2271        };
2272
2273        let outcome = merkle_deferred_retry(
2274            Vec::new(),
2275            &DEFERRED_ROUND_DELAYS_SECS,
2276            64,
2277            fake_read_bodies,
2278            |n: usize| n.max(1),
2279            None,
2280            7,
2281            7,
2282            store_one,
2283        )
2284        .await
2285        .expect("empty deferred set is a no-op");
2286
2287        assert_eq!(outcome.stored, 7, "stored_offset carried through unchanged");
2288        assert_eq!(outcome.failed, 0);
2289        assert!(outcome.stored_addresses.is_empty());
2290        assert!(outcome.failed_addresses.is_empty());
2291        assert!(outcome.fatal.is_none());
2292    }
2293
2294    /// The memory-bound guard (V2-466 review finding 1): a deferred set far
2295    /// larger than `batch_size` is read from the spill in batches of at most
2296    /// `batch_size`, so peak resident bodies never scale with the file-wide
2297    /// deferred count. All chunks still store.
2298    #[tokio::test]
2299    async fn deferred_retry_reads_bodies_in_bounded_batches() {
2300        let deferred = deferred_set(10);
2301        let batch_size = 4;
2302        // Record the largest single read_bodies request.
2303        let max_batch = Arc::new(Mutex::new(0usize));
2304        let max_batch_for_closure = max_batch.clone();
2305        let read_bodies = move |addrs: &[[u8; 32]]| {
2306            let mut m = max_batch_for_closure.lock().unwrap();
2307            *m = (*m).max(addrs.len());
2308            Ok(addrs
2309                .iter()
2310                .map(|a| (*a, Bytes::from_static(b"body")))
2311                .collect())
2312        };
2313        let store_one =
2314            |_addr: [u8; 32], _content: Bytes| async move { Ok(std::time::Instant::now()) };
2315
2316        let outcome = merkle_deferred_retry(
2317            deferred,
2318            &[0, 0, 0],
2319            batch_size,
2320            read_bodies,
2321            |n: usize| n.max(1),
2322            None,
2323            0,
2324            10,
2325            store_one,
2326        )
2327        .await
2328        .expect("bounded-batch deferred retry stores everything");
2329
2330        assert_eq!(outcome.stored, 10);
2331        assert_eq!(outcome.stored_addresses.len(), 10);
2332        assert_eq!(outcome.failed, 0);
2333        assert!(
2334            *max_batch.lock().unwrap() <= batch_size,
2335            "read_bodies must never be handed more than batch_size addresses at once"
2336        );
2337    }
2338}