Skip to main content

ant_core/data/client/
merkle.rs

1//! Merkle batch payment support for the Autonomi client.
2//!
3//! When uploading batches of 64+ chunks, merkle payments reduce gas costs
4//! by paying for the entire batch in a single on-chain transaction instead
5//! of one transaction per chunk.
6
7use crate::data::client::adaptive::{observe_op, Outcome};
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::Client;
11use crate::data::error::{Error, Result};
12use ant_protocol::evm::{
13    Amount, MerklePaymentCandidateNode, MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree,
14    MidpointProof, PoolCommitment, CANDIDATES_PER_POOL, MAX_LEAVES,
15};
16use ant_protocol::payment::{serialize_merkle_proof, verify_merkle_candidate_signature};
17use ant_protocol::transport::PeerId;
18use ant_protocol::{
19    compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
20    MerkleCandidateQuoteRequest, MerkleCandidateQuoteResponse,
21};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use rand::Rng;
25use std::collections::{HashMap, VecDeque};
26use std::time::Duration;
27use tokio::sync::mpsc;
28use tracing::{debug, info, warn};
29use xor_name::XorName;
30
31/// Default threshold: use merkle payments when chunk count >= this value.
32pub const DEFAULT_MERKLE_THRESHOLD: usize = 64;
33
34/// Payment mode for uploads.
35#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum PaymentMode {
38    /// Automatically choose: merkle for batches >= threshold, single otherwise.
39    #[default]
40    Auto,
41    /// Force merkle batch payment regardless of batch size (min 2 chunks).
42    Merkle,
43    /// Force single-node payment (one tx per chunk).
44    Single,
45}
46
47/// Result of a merkle batch payment.
48///
49/// Serializable so it can be persisted across runs for resume after a
50/// partial-upload failure. See `crate::data::client::cached_merkle`.
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct MerkleBatchPaymentResult {
53    /// Map of `XorName` to serialized tagged proof bytes (ready to use in PUT requests).
54    pub proofs: HashMap<[u8; 32], Vec<u8>>,
55    /// Number of chunks in the batch.
56    pub chunk_count: usize,
57    /// Total storage cost in atto (token smallest unit).
58    pub storage_cost_atto: String,
59    /// Total gas cost in wei.
60    pub gas_cost_wei: u128,
61    /// Unix timestamp (seconds) used for the on-chain merkle payment.
62    /// Persisted so resume can check whether the on-chain payment has
63    /// aged out beyond the merkle expiration window and the cached
64    /// receipt must be discarded.
65    #[serde(default)]
66    pub merkle_payment_timestamp: u64,
67}
68
69/// Prepared merkle batch ready for external payment.
70///
71/// Contains everything needed to submit the on-chain merkle payment
72/// and then finalize proof generation without a wallet.
73pub struct PreparedMerkleBatch {
74    /// Merkle tree depth (needed for the on-chain call).
75    pub depth: u8,
76    /// Pool commitments for the on-chain call.
77    pub pool_commitments: Vec<PoolCommitment>,
78    /// Timestamp used for the merkle payment.
79    pub merkle_payment_timestamp: u64,
80    /// Internal: candidate pools (needed for proof generation after payment).
81    candidate_pools: Vec<MerklePaymentCandidatePool>,
82    /// Internal: the merkle tree (needed for proof generation).
83    tree: MerkleTree,
84    /// Internal: chunk addresses in order.
85    addresses: Vec<[u8; 32]>,
86}
87
88/// Result of checking a merkle upload batch before payment.
89#[derive(Debug, Clone, Default)]
90pub(crate) struct MerkleUploadPlan {
91    /// Chunks already confirmed by their close group.
92    pub already_stored: Vec<[u8; 32]>,
93    /// Chunks that still need payment and storage.
94    pub to_upload: Vec<[u8; 32]>,
95    /// Total byte size of chunks in `to_upload`.
96    to_upload_total_bytes: u64,
97}
98
99impl MerkleUploadPlan {
100    /// Average byte size of chunks that still need upload.
101    #[must_use]
102    pub fn to_upload_avg_size(&self) -> u64 {
103        if self.to_upload.is_empty() {
104            return 0;
105        }
106
107        self.to_upload_total_bytes / self.to_upload.len() as u64
108    }
109}
110
111impl std::fmt::Debug for PreparedMerkleBatch {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("PreparedMerkleBatch")
114            .field("depth", &self.depth)
115            .field("pool_commitments", &self.pool_commitments.len())
116            .field("merkle_payment_timestamp", &self.merkle_payment_timestamp)
117            .field("candidate_pools", &self.candidate_pools.len())
118            .field("addresses", &self.addresses.len())
119            .finish()
120    }
121}
122
123/// Select chunk contents that correspond to `addresses`, preserving address order.
124///
125/// Extra chunk contents are ignored; missing contents for any requested address
126/// are treated as corrupted upload state.
127pub(crate) fn chunk_contents_for_upload_addresses(
128    chunk_contents: Vec<Bytes>,
129    addresses: &[[u8; 32]],
130) -> Result<Vec<Bytes>> {
131    if addresses.is_empty() {
132        return Ok(Vec::new());
133    }
134
135    let mut needed_by_address: HashMap<[u8; 32], usize> = HashMap::new();
136    for address in addresses {
137        *needed_by_address.entry(*address).or_default() += 1;
138    }
139
140    let mut chunks_by_address: HashMap<[u8; 32], VecDeque<Bytes>> =
141        HashMap::with_capacity(needed_by_address.len());
142    let mut remaining = addresses.len();
143    for chunk in chunk_contents {
144        let address = compute_address(&chunk);
145        if let Some(needed) = needed_by_address.get_mut(&address) {
146            if *needed > 0 {
147                chunks_by_address
148                    .entry(address)
149                    .or_default()
150                    .push_back(chunk);
151                *needed -= 1;
152                remaining -= 1;
153                if remaining == 0 {
154                    break;
155                }
156            }
157        }
158    }
159
160    for (address, needed) in &needed_by_address {
161        if *needed == 0 {
162            continue;
163        }
164
165        if chunks_by_address.contains_key(address) {
166            return Err(Error::InvalidData(format!(
167                "missing duplicate chunk content for merkle address {}",
168                hex::encode(address)
169            )));
170        }
171
172        return Err(Error::InvalidData(format!(
173            "missing chunk content for merkle address {}",
174            hex::encode(address)
175        )));
176    }
177
178    let mut selected = Vec::with_capacity(addresses.len());
179    for address in addresses {
180        let chunks = chunks_by_address.get_mut(address).ok_or_else(|| {
181            Error::InvalidData(format!(
182                "missing chunk content for merkle address {}",
183                hex::encode(address)
184            ))
185        })?;
186        let chunk = chunks.pop_front().ok_or_else(|| {
187            Error::InvalidData(format!(
188                "missing duplicate chunk content for merkle address {}",
189                hex::encode(address)
190            ))
191        })?;
192        selected.push(chunk);
193    }
194
195    Ok(selected)
196}
197
198/// Map a per-chunk quote-collection result to its merkle-preflight stored
199/// status, degrading gracefully on transient failures.
200///
201/// The preflight only needs to answer "is this chunk already on the network?"
202/// so it can be skipped before payment — it is an optimisation, never a gate.
203/// Therefore:
204/// - `Ok(_)` (quotes gathered) → `Ok(false)`: chunk is not stored, upload it.
205/// - `Err(AlreadyStored)` → `Ok(true)`: skip it.
206/// - a transient failure (timeout / insufficient peers / transport error) →
207///   `Ok(false)`: we could not confirm, so queue it for upload rather than
208///   aborting the whole batch. Re-storing an existing chunk is idempotent.
209/// - any other (application) error → propagate; it would recur on a healthy
210///   link and is not something the preflight should paper over.
211///
212/// Without this, a single chunk's transient quote timeout failed the entire
213/// upload — fatal for forced `--merkle` uploads, which (unlike `Auto`) have no
214/// wave-batch fallback, and catastrophic for large files via the multiplicative
215/// per-chunk effect.
216fn preflight_stored_status<T>(result: Result<T>) -> Result<bool> {
217    match result {
218        Ok(_) => Ok(false),
219        Err(Error::AlreadyStored) => Ok(true),
220        Err(e) if matches!(classify_error(&e), Outcome::Timeout | Outcome::NetworkError) => {
221            Ok(false)
222        }
223        Err(e) => Err(e),
224    }
225}
226
227/// Determine whether to use merkle payments for a given batch size.
228/// Free function — no Client needed.
229#[must_use]
230pub fn should_use_merkle(chunk_count: usize, mode: PaymentMode) -> bool {
231    match mode {
232        PaymentMode::Auto => chunk_count >= DEFAULT_MERKLE_THRESHOLD,
233        PaymentMode::Merkle => chunk_count >= 2,
234        PaymentMode::Single => false,
235    }
236}
237
238impl Client {
239    /// Determine whether to use merkle payments for a given batch size.
240    #[must_use]
241    pub fn should_use_merkle(&self, chunk_count: usize, mode: PaymentMode) -> bool {
242        should_use_merkle(chunk_count, mode)
243    }
244
245    /// Pay for a batch of chunks using merkle batch payment.
246    ///
247    /// Builds a merkle tree, collects candidate pools, pays on-chain in one tx,
248    /// and returns per-chunk proofs. Splits into sub-batches if > `MAX_LEAVES`.
249    ///
250    /// This low-level helper assumes the caller has already selected the
251    /// addresses that need payment. User-facing upload paths first run the
252    /// merkle upload planner to skip chunks already stored on the network.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if the batch is too small, candidate collection fails,
257    /// on-chain payment fails, or proof generation fails.
258    pub async fn pay_for_merkle_batch(
259        &self,
260        addresses: &[[u8; 32]],
261        data_type: u32,
262        data_size: u64,
263    ) -> Result<MerkleBatchPaymentResult> {
264        let chunk_count = addresses.len();
265        if chunk_count < 2 {
266            return Err(Error::Payment(
267                "Merkle batch payment requires at least 2 chunks".to_string(),
268            ));
269        }
270
271        if chunk_count > MAX_LEAVES {
272            return self
273                .pay_for_merkle_multi_batch(addresses, data_type, data_size)
274                .await;
275        }
276
277        self.pay_for_merkle_single_batch(addresses, data_type, data_size)
278            .await
279    }
280
281    /// Check which chunks in a merkle upload still need payment/storage.
282    ///
283    /// Uses the normal per-chunk quote path because it already has the
284    /// close-group majority rule for `AlreadyStored`. Non-stored chunks only
285    /// use the quote response as a probe; their actual payment still happens
286    /// through the merkle batch.
287    ///
288    /// `chunks` contains `(address, data_size)` pairs.
289    pub(crate) async fn plan_merkle_upload(
290        &self,
291        chunks: Vec<([u8; 32], u64)>,
292        data_type: u32,
293        progress: Option<&mpsc::Sender<UploadEvent>>,
294    ) -> Result<MerkleUploadPlan> {
295        let total_chunks = chunks.len();
296        if total_chunks == 0 {
297            return Ok(MerkleUploadPlan::default());
298        }
299
300        info!("Checking {total_chunks} merkle chunks for existing storage before payment");
301
302        let quote_limiter = self.controller().quote.clone();
303        let quote_concurrency = quote_limiter.current().min(total_chunks.max(1));
304        let mut check_stream = stream::iter(chunks.into_iter().enumerate())
305            .map(|(index, (address, data_size))| {
306                let limiter = quote_limiter.clone();
307                async move {
308                    let result = observe_op(
309                        &limiter,
310                        || async move {
311                            self.chunk_already_stored_for_merkle(&address, data_type, data_size)
312                                .await
313                        },
314                        classify_error,
315                    )
316                    .await;
317                    (index, address, data_size, result)
318                }
319            })
320            .buffer_unordered(quote_concurrency);
321
322        let mut already_stored: Vec<(usize, [u8; 32])> = Vec::new();
323        let mut to_upload: Vec<(usize, [u8; 32], u64)> = Vec::new();
324        let mut checked = 0usize;
325
326        while let Some((index, address, data_size, result)) = check_stream.next().await {
327            let is_already_stored = result?;
328            checked += 1;
329
330            if let Some(tx) = progress {
331                let _ = tx.try_send(UploadEvent::ChunkQuoted {
332                    quoted: checked,
333                    total: total_chunks,
334                });
335            }
336
337            if is_already_stored {
338                debug!(
339                    "Merkle preflight {checked}/{total_chunks}: chunk {} already stored",
340                    hex::encode(address)
341                );
342                already_stored.push((index, address));
343                if let Some(tx) = progress {
344                    let _ = tx.try_send(UploadEvent::ChunkStored {
345                        stored: already_stored.len(),
346                        total: total_chunks,
347                    });
348                }
349            } else {
350                debug!(
351                    "Merkle preflight {checked}/{total_chunks}: chunk {} needs upload",
352                    hex::encode(address)
353                );
354                to_upload.push((index, address, data_size));
355            }
356        }
357
358        already_stored.sort_by_key(|(index, _)| *index);
359        to_upload.sort_by_key(|(index, _, _)| *index);
360
361        let to_upload_total_bytes = to_upload.iter().fold(0u64, |acc, (_, _, data_size)| {
362            acc.saturating_add(*data_size)
363        });
364
365        let already_stored = already_stored
366            .into_iter()
367            .map(|(_, address)| address)
368            .collect::<Vec<_>>();
369        let to_upload = to_upload
370            .into_iter()
371            .map(|(_, address, _)| address)
372            .collect::<Vec<_>>();
373
374        info!(
375            "Merkle preflight complete: {} already stored, {} need upload",
376            already_stored.len(),
377            to_upload.len()
378        );
379
380        Ok(MerkleUploadPlan {
381            already_stored,
382            to_upload,
383            to_upload_total_bytes,
384        })
385    }
386
387    async fn chunk_already_stored_for_merkle(
388        &self,
389        address: &[u8; 32],
390        data_type: u32,
391        data_size: u64,
392    ) -> Result<bool> {
393        let result = self
394            .get_store_quotes_with_fault_tolerance(address, data_size, data_type)
395            .await;
396        if let Err(e) = &result {
397            if matches!(classify_error(e), Outcome::Timeout | Outcome::NetworkError) {
398                debug!(
399                    "Merkle preflight: could not determine stored status for {} ({e}); \
400                     treating as not stored and queuing for upload",
401                    hex::encode(address)
402                );
403            }
404        }
405        preflight_stored_status(result)
406    }
407
408    /// Phase 1 of external-signer merkle payment: prepare batch without paying.
409    ///
410    /// Builds the merkle tree, collects candidate pools from the network,
411    /// and returns the data needed for the on-chain payment call.
412    /// Requires `EvmNetwork` but NOT a wallet.
413    pub async fn prepare_merkle_batch_external(
414        &self,
415        addresses: &[[u8; 32]],
416        data_type: u32,
417        data_size: u64,
418    ) -> Result<PreparedMerkleBatch> {
419        let chunk_count = addresses.len();
420        let xornames: Vec<XorName> = addresses.iter().map(|a| XorName(*a)).collect();
421
422        debug!("Building merkle tree for {chunk_count} chunks");
423
424        // 1. Build merkle tree
425        let tree = MerkleTree::from_xornames(xornames)
426            .map_err(|e| Error::Payment(format!("Failed to build merkle tree: {e}")))?;
427
428        let depth = tree.depth();
429        let merkle_payment_timestamp = std::time::SystemTime::now()
430            .duration_since(std::time::UNIX_EPOCH)
431            .map_err(|e| Error::Payment(format!("System time error: {e}")))?
432            .as_secs();
433
434        debug!("Merkle tree: depth={depth}, leaves={chunk_count}, ts={merkle_payment_timestamp}");
435
436        // 2. Get reward candidates (midpoint proofs)
437        let midpoint_proofs = tree
438            .reward_candidates(merkle_payment_timestamp)
439            .map_err(|e| Error::Payment(format!("Failed to generate reward candidates: {e}")))?;
440
441        debug!(
442            "Collecting candidate pools from {} midpoints (concurrent)",
443            midpoint_proofs.len()
444        );
445
446        // 3. Collect candidate pools from the network (all pools in parallel)
447        let candidate_pools = self
448            .build_candidate_pools(
449                &midpoint_proofs,
450                data_type,
451                data_size,
452                merkle_payment_timestamp,
453            )
454            .await?;
455
456        // 4. Build pool commitments for on-chain payment
457        let pool_commitments: Vec<PoolCommitment> = candidate_pools
458            .iter()
459            .map(MerklePaymentCandidatePool::to_commitment)
460            .collect();
461
462        Ok(PreparedMerkleBatch {
463            depth,
464            pool_commitments,
465            merkle_payment_timestamp,
466            candidate_pools,
467            tree,
468            addresses: addresses.to_vec(),
469        })
470    }
471
472    /// Pay for a single batch (up to `MAX_LEAVES` chunks).
473    async fn pay_for_merkle_single_batch(
474        &self,
475        addresses: &[[u8; 32]],
476        data_type: u32,
477        data_size: u64,
478    ) -> Result<MerkleBatchPaymentResult> {
479        let wallet = self.require_wallet()?;
480        let prepared = self
481            .prepare_merkle_batch_external(addresses, data_type, data_size)
482            .await?;
483
484        info!(
485            "Submitting merkle batch payment on-chain (depth={})",
486            prepared.depth
487        );
488        let (winner_pool_hash, amount, gas_info) = wallet
489            .pay_for_merkle_tree(
490                prepared.depth,
491                prepared.pool_commitments.clone(),
492                prepared.merkle_payment_timestamp,
493            )
494            .await
495            .map_err(|e| Error::Payment(format!("Merkle batch payment failed: {e}")))?;
496
497        info!(
498            "Merkle payment succeeded: winner pool {}",
499            hex::encode(winner_pool_hash)
500        );
501
502        let mut result = finalize_merkle_batch(prepared, winner_pool_hash)?;
503        result.storage_cost_atto = amount.to_string();
504        result.gas_cost_wei = gas_info.gas_cost_wei;
505        Ok(result)
506    }
507
508    /// Handle batches larger than `MAX_LEAVES` by splitting into sub-batches.
509    async fn pay_for_merkle_multi_batch(
510        &self,
511        addresses: &[[u8; 32]],
512        data_type: u32,
513        data_size: u64,
514    ) -> Result<MerkleBatchPaymentResult> {
515        let sub_batches: Vec<&[[u8; 32]]> = addresses.chunks(MAX_LEAVES).collect();
516        let total_sub_batches = sub_batches.len();
517        let mut all_proofs = HashMap::with_capacity(addresses.len());
518        let mut total_storage = Amount::ZERO;
519        let mut total_gas: u128 = 0;
520        // Track the oldest sub-batch timestamp so the overall receipt
521        // expires when the *first* sub-batch's on-chain payment ages
522        // out (worst case for resume).
523        let mut oldest_ts: u64 = 0;
524
525        for (i, chunk) in sub_batches.into_iter().enumerate() {
526            match self
527                .pay_for_merkle_single_batch(chunk, data_type, data_size)
528                .await
529            {
530                Ok(sub_result) => {
531                    if let Ok(cost) = sub_result.storage_cost_atto.parse::<Amount>() {
532                        total_storage += cost;
533                    }
534                    total_gas = total_gas.saturating_add(sub_result.gas_cost_wei);
535                    if oldest_ts == 0
536                        || (sub_result.merkle_payment_timestamp > 0
537                            && sub_result.merkle_payment_timestamp < oldest_ts)
538                    {
539                        oldest_ts = sub_result.merkle_payment_timestamp;
540                    }
541                    all_proofs.extend(sub_result.proofs);
542                }
543                Err(e) => {
544                    if all_proofs.is_empty() {
545                        // First sub-batch failed, nothing paid yet -- propagate directly.
546                        return Err(e);
547                    }
548                    // Return partial result so caller can still store already-paid chunks.
549                    warn!(
550                        "Merkle sub-batch {}/{total_sub_batches} failed: {e}. \
551                         Returning {} proofs from prior sub-batches",
552                        i + 1,
553                        all_proofs.len()
554                    );
555                    return Ok(MerkleBatchPaymentResult {
556                        chunk_count: all_proofs.len(),
557                        proofs: all_proofs,
558                        storage_cost_atto: total_storage.to_string(),
559                        gas_cost_wei: total_gas,
560                        merkle_payment_timestamp: oldest_ts,
561                    });
562                }
563            }
564        }
565
566        Ok(MerkleBatchPaymentResult {
567            chunk_count: addresses.len(),
568            proofs: all_proofs,
569            storage_cost_atto: total_storage.to_string(),
570            gas_cost_wei: total_gas,
571            merkle_payment_timestamp: oldest_ts,
572        })
573    }
574
575    /// Build candidate pools by querying the network for each midpoint (concurrently).
576    async fn build_candidate_pools(
577        &self,
578        midpoint_proofs: &[MidpointProof],
579        data_type: u32,
580        data_size: u64,
581        merkle_payment_timestamp: u64,
582    ) -> Result<Vec<MerklePaymentCandidatePool>> {
583        let mut pool_futures = FuturesUnordered::new();
584
585        for midpoint_proof in midpoint_proofs {
586            let pool_address = midpoint_proof.address();
587            let mp = midpoint_proof.clone();
588            pool_futures.push(async move {
589                let candidate_nodes = self
590                    .get_merkle_candidate_pool(
591                        &pool_address.0,
592                        data_type,
593                        data_size,
594                        merkle_payment_timestamp,
595                    )
596                    .await?;
597                Ok::<_, Error>(MerklePaymentCandidatePool {
598                    midpoint_proof: mp,
599                    candidate_nodes,
600                })
601            });
602        }
603
604        let mut pools = Vec::with_capacity(midpoint_proofs.len());
605        while let Some(result) = pool_futures.next().await {
606            pools.push(result?);
607        }
608
609        Ok(pools)
610    }
611
612    /// Collect `CANDIDATES_PER_POOL` (16) merkle candidate quotes from the network.
613    #[allow(clippy::too_many_lines)]
614    async fn get_merkle_candidate_pool(
615        &self,
616        address: &[u8; 32],
617        data_type: u32,
618        data_size: u64,
619        merkle_payment_timestamp: u64,
620    ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
621        let node = self.network().node();
622        let timeout = Duration::from_secs(self.config().quote_timeout_secs);
623
624        // Query extra peers to handle validation failures (bad sigs, wrong type, etc.)
625        let query_count = CANDIDATES_PER_POOL * 2;
626        let mut remote_peers = self
627            .network()
628            .find_closest_peers(address, query_count)
629            .await?;
630
631        // If DHT closest-nodes didn't return enough, supplement with connected peers.
632        // On small networks the DHT iterative lookup may not discover enough peers
633        // close to a random pool address, but we know more peers via direct connections.
634        if remote_peers.len() < CANDIDATES_PER_POOL {
635            let connected = self.network().connected_peers().await;
636            for peer in connected {
637                if !remote_peers.iter().any(|(id, _)| *id == peer) {
638                    remote_peers.push((peer, vec![]));
639                }
640            }
641        }
642
643        if remote_peers.len() < CANDIDATES_PER_POOL {
644            return Err(Error::InsufficientPeers(format!(
645                "Found {} peers, need {CANDIDATES_PER_POOL} for merkle candidate pool. \
646                 Use --no-merkle or a larger network.",
647                remote_peers.len()
648            )));
649        }
650
651        let mut candidate_futures = FuturesUnordered::new();
652
653        for (peer_id, peer_addrs) in &remote_peers {
654            let request_id = self.next_request_id();
655            let request = MerkleCandidateQuoteRequest {
656                address: *address,
657                data_type,
658                data_size,
659                merkle_payment_timestamp,
660            };
661            let message = ChunkMessage {
662                request_id,
663                body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
664            };
665
666            let message_bytes = match message.encode() {
667                Ok(bytes) => bytes,
668                Err(e) => {
669                    warn!("Failed to encode merkle candidate request for {peer_id}: {e}");
670                    continue;
671                }
672            };
673
674            let peer_id_clone = *peer_id;
675            let addrs_clone = peer_addrs.clone();
676            let node_clone = node.clone();
677
678            let fut = async move {
679                let result = send_and_await_chunk_response(
680                    &node_clone,
681                    &peer_id_clone,
682                    message_bytes,
683                    request_id,
684                    timeout,
685                    &addrs_clone,
686                    |body| match body {
687                        ChunkMessageBody::MerkleCandidateQuoteResponse(
688                            MerkleCandidateQuoteResponse::Success { candidate_node },
689                        ) => {
690                            match rmp_serde::from_slice::<MerklePaymentCandidateNode>(
691                                &candidate_node,
692                            ) {
693                                Ok(node) => Some(Ok(node)),
694                                Err(e) => Some(Err(Error::Serialization(format!(
695                                    "Failed to deserialize candidate node from {peer_id_clone}: {e}"
696                                )))),
697                            }
698                        }
699                        ChunkMessageBody::MerkleCandidateQuoteResponse(
700                            MerkleCandidateQuoteResponse::Error(e),
701                        ) => Some(Err(Error::Protocol(format!(
702                            "Merkle quote error from {peer_id_clone}: {e}"
703                        )))),
704                        _ => None,
705                    },
706                    |e| {
707                        Error::Network(format!(
708                            "Failed to send merkle candidate request to {peer_id_clone}: {e}"
709                        ))
710                    },
711                    || {
712                        Error::Timeout(format!(
713                            "Timeout waiting for merkle candidate from {peer_id_clone}"
714                        ))
715                    },
716                )
717                .await;
718
719                (peer_id_clone, result)
720            };
721
722            candidate_futures.push(fut);
723        }
724
725        self.collect_validated_candidates(&mut candidate_futures, address, merkle_payment_timestamp)
726            .await
727    }
728
729    /// Collect and validate merkle candidate responses, then return the
730    /// `CANDIDATES_PER_POOL` valid responders that are XOR-closest to the
731    /// pool midpoint.
732    ///
733    /// Why distance-sort instead of "first N to respond":
734    /// the storing-node verifier re-runs a network closest-peers lookup of
735    /// the pool midpoint and rejects the pool if fewer than 13 of the 16
736    /// candidate `pub_keys` appear in that authoritative close-set. Pools
737    /// built from the fastest-to-respond quoters fail this check whenever
738    /// truly-close peers are slower (NAT/relay paths) than farther peers.
739    async fn collect_validated_candidates(
740        &self,
741        futures: &mut FuturesUnordered<
742            impl std::future::Future<
743                Output = (
744                    PeerId,
745                    std::result::Result<MerklePaymentCandidateNode, Error>,
746                ),
747            >,
748        >,
749        target_address: &[u8; 32],
750        merkle_payment_timestamp: u64,
751    ) -> Result<[MerklePaymentCandidateNode; CANDIDATES_PER_POOL]> {
752        let mut valid: Vec<(PeerId, MerklePaymentCandidateNode)> = Vec::new();
753        let mut failures: Vec<String> = Vec::new();
754
755        while let Some((peer_id, result)) = futures.next().await {
756            match result {
757                Ok(candidate) => {
758                    if !verify_merkle_candidate_signature(&candidate) {
759                        warn!("Invalid ML-DSA-65 signature from merkle candidate {peer_id}");
760                        failures.push(format!("{peer_id}: invalid signature"));
761                        continue;
762                    }
763                    if candidate.merkle_payment_timestamp != merkle_payment_timestamp {
764                        warn!("Timestamp mismatch from merkle candidate {peer_id}");
765                        failures.push(format!("{peer_id}: timestamp mismatch"));
766                        continue;
767                    }
768                    valid.push((peer_id, candidate));
769                }
770                Err(e) => {
771                    debug!("Failed to get merkle candidate from {peer_id}: {e}");
772                    failures.push(format!("{peer_id}: {e}"));
773                }
774            }
775        }
776
777        if valid.len() < CANDIDATES_PER_POOL {
778            return Err(Error::InsufficientPeers(format!(
779                "Got {} merkle candidates, need {CANDIDATES_PER_POOL}. Failures: [{}]",
780                valid.len(),
781                failures.join("; ")
782            )));
783        }
784
785        let target_peer = PeerId::from_bytes(*target_address);
786        valid.sort_by_key(|(peer_id, _)| peer_id.xor_distance(&target_peer));
787
788        let candidates: Vec<MerklePaymentCandidateNode> = valid
789            .into_iter()
790            .take(CANDIDATES_PER_POOL)
791            .map(|(_, candidate)| candidate)
792            .collect();
793
794        candidates
795            .try_into()
796            .map_err(|_| Error::Payment("Failed to convert candidates to fixed array".to_string()))
797    }
798
799    /// Upload chunks using pre-computed merkle proofs from a batch payment.
800    ///
801    /// Each chunk is matched to its proof from `batch_result.proofs`, then
802    /// stored to its close group concurrently. A per-chunk quorum shortfall
803    /// (`InsufficientPeers`) does **not** abort the file: such chunks are
804    /// collected and retried — with the same reusable proof and a freshly
805    /// re-collected close group — for up to [`MERKLE_STORE_MAX_ATTEMPTS`]
806    /// attempts. `stored_offset` carries chunks already confirmed by an earlier
807    /// preflight (used for progress numbering and the returned `stored` count),
808    /// and `total_chunks` is the whole-file total for progress events.
809    ///
810    /// Returns how many chunks are stored (including `stored_offset`), how many
811    /// remained short of quorum after all retries, and the aggregate store
812    /// stats.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error only for non-quorum failures (e.g. a missing proof, or a
817    /// chunk-count/address mismatch); quorum shortfalls are reported via
818    /// [`MerkleStoreOutcome::failed`].
819    pub(crate) async fn merkle_upload_chunks(
820        &self,
821        chunk_contents: Vec<Bytes>,
822        addresses: Vec<[u8; 32]>,
823        batch_result: &MerkleBatchPaymentResult,
824        progress: Option<&mpsc::Sender<UploadEvent>>,
825        stored_offset: usize,
826        total_chunks: usize,
827    ) -> Result<MerkleStoreOutcome> {
828        let store_limiter = self.controller().store.clone();
829        // Clamp fan-out to batch size — partial batches should not
830        // pay for unused slots (see PERF-RESULTS.md).
831        let batch_size = chunk_contents.len();
832        if batch_size != addresses.len() {
833            return Err(Error::InvalidData(format!(
834                "merkle upload has {batch_size} chunk contents but {} addresses",
835                addresses.len()
836            )));
837        }
838        // Cap closure re-read per scheduler refill so mid-flight limiter growth
839        // is applied to the rest of the batch (V2-554). Clamped to batch size —
840        // partial batches should not pay for unused slots (see PERF-RESULTS.md).
841        let cap = || store_limiter.current().min(batch_size.max(1));
842
843        // External-signer path: the chunk bodies are already resident in memory,
844        // so `store_one` clones each from this map on demand. (The whole-file
845        // path instead reads bodies from the on-disk spill — same `store_one(addr)`
846        // shape, so the store scheduler never carries a resident set of bodies.)
847        let bodies: std::collections::HashMap<[u8; 32], Bytes> =
848            addresses.iter().copied().zip(chunk_contents).collect();
849        let addrs = addresses;
850
851        // Store one chunk to its (freshly re-collected) close group. Called
852        // once per chunk per attempt, so a retry round naturally lands on a
853        // converged routing table. Only `InsufficientPeers` is recoverable;
854        // a missing proof stays fatal.
855        let store_one = |addr: [u8; 32]| {
856            let limiter = store_limiter.clone();
857            let content = bodies.get(&addr).cloned();
858            let proof_bytes = batch_result.proofs.get(&addr).cloned();
859            async move {
860                let started = std::time::Instant::now();
861                let content = content.ok_or_else(|| {
862                    Error::InvalidData(format!("missing chunk body for {}", hex::encode(addr)))
863                })?;
864                let proof = proof_bytes.ok_or_else(|| {
865                    Error::Payment(format!(
866                        "Missing merkle proof for chunk {}",
867                        hex::encode(addr)
868                    ))
869                })?;
870                let peers = self.put_target_peers(&addr).await?;
871                observe_op(
872                    &limiter,
873                    || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
874                    classify_error,
875                )
876                .await
877                .map(|_| started)
878            }
879        };
880
881        let outcome = merkle_store_with_retry(
882            addrs,
883            cap,
884            MERKLE_STORE_MAX_ATTEMPTS,
885            MERKLE_RETRY_BACKOFF,
886            progress,
887            stored_offset,
888            total_chunks,
889            store_one,
890        )
891        .await?;
892
893        // The external-signer path treats a non-quorum error as terminal (it
894        // returns a single all-or-nothing `FileUploadResult`), so re-raise the
895        // fatal that `merkle_store_with_retry` now carries in the outcome. The
896        // CLI/spill paths, which can surface `PartialUpload`, read `fatal`
897        // directly instead.
898        if let Some(e) = outcome.fatal {
899            return Err(e);
900        }
901        Ok(outcome)
902    }
903}
904
905/// Total store-attempt budget for a merkle batch: the initial attempt plus up
906/// to three retries. Chosen to match the wave path's contract
907/// (`batch.rs` iterates `0..=MAX_RETRIES` with `MAX_RETRIES = 3`) and the
908/// four-slot [`WaveAggregateStats::retries_histogram`], so a chunk that lands
909/// on the final retry is recorded in `retries_histogram[3]`.
910///
911/// A chunk's close group can transiently reject its `winner_pool` midpoint
912/// while a few nodes' routing tables disagree about that midpoint; the network
913/// converges within minutes. Per-chunk proofs are reusable, so retrying the
914/// same proof after a short backoff recovers these shortfalls for free — no
915/// re-payment and no new pool.
916pub(crate) const MERKLE_STORE_MAX_ATTEMPTS: usize = 4;
917
918/// Base backoff between merkle store attempts. The routing-table divergence
919/// that causes `InsufficientPeers` resolves on the order of minutes, so a short
920/// sleep between rounds is enough to land on a converged close group. The
921/// actual wait is jittered by [`MERKLE_RETRY_JITTER`] so a large failed set
922/// does not re-fire against the same divergent nodes in lockstep.
923pub(crate) const MERKLE_RETRY_BACKOFF: Duration = Duration::from_secs(30);
924
925/// Fractional jitter applied to [`MERKLE_RETRY_BACKOFF`] (±10%), spreading the
926/// retry wave so convergent nodes are not all probed at the same instant.
927const MERKLE_RETRY_JITTER: f64 = 0.1;
928
929/// Outcome of storing a merkle batch: how many chunks landed, how many
930/// remained short of quorum after all retries, and the aggregate store stats.
931#[derive(Debug, Default)]
932pub(crate) struct MerkleStoreOutcome {
933    /// Chunks that reached quorum, including any `stored_offset` carried in
934    /// from a preflight (counted once, even if they needed retries).
935    pub stored: usize,
936    /// Addresses confirmed stored by this call (excludes the `stored_offset`
937    /// preflight carry-in — those have no address here). The caller appends
938    /// these to the file's stored set; using the explicit set (rather than
939    /// inferring "input minus failed") keeps accounting correct even when a
940    /// `fatal` error aborts the pass mid-flight, leaving some input chunks
941    /// neither stored nor in `failed_addresses`.
942    pub stored_addresses: Vec<[u8; 32]>,
943    /// Chunks still short of quorum after [`MERKLE_STORE_MAX_ATTEMPTS`].
944    pub failed: usize,
945    /// Addresses (and the last error message) of chunks still short of quorum
946    /// after all retries. Empty when `failed == 0`. Used by the CLI path to
947    /// build [`crate::data::Error::PartialUpload`]; the external-signer path
948    /// only reads the counts.
949    pub failed_addresses: Vec<([u8; 32], String)>,
950    /// Set when a non-quorum (fatal) store error aborted the pass. Successes
951    /// completed before the abort are still recorded in `stored`/
952    /// `stored_addresses`; the chunks that had already failed quorum are in
953    /// `failed_addresses`; chunks still in flight when the abort hit are in
954    /// neither (the caller treats input-minus-stored as failed). Callers that
955    /// want the old "fatal aborts everything" contract re-raise this as `Err`.
956    pub fatal: Option<Error>,
957    /// Aggregate store stats (durations, attempts, per-round retry histogram).
958    pub stats: crate::data::client::batch::WaveAggregateStats,
959}
960
961/// Drive a set of merkle chunk stores with bounded retry of quorum shortfalls.
962///
963/// Runs `store_one(addr)` over all `addrs` concurrently, keeping up to `cap()`
964/// stores in flight and RE-READING `cap()` as each slot frees — so mid-flight
965/// adaptive growth is applied to the rest of the round instead of being frozen
966/// at a per-round snapshot (V2-554). `store_one` acquires the chunk body itself
967/// (e.g. reads it from the on-disk spill), so only the ≤`cap()` in-flight stores
968/// hold a body in memory — the caller passes addresses, never a resident set of
969/// bodies, which is what lets the whole-file store run as one cap-bounded
970/// fan-out instead of memory-bounded waves. Collects quorum shortfalls
971/// (`InsufficientPeers`, `CloseGroupShortfall`, `RemotePut`) rather than
972/// aborting. Failed chunks are retried — `store_one` re-collects their close
973/// group on each call, so a converged routing table can yield a fresh group —
974/// for up to `max_attempts` rounds, sleeping a jittered `backoff` between
975/// rounds. A chunk's success is counted once and recorded in the retry round it
976/// landed on (`retries_histogram[round]`). `stored_offset` seeds the returned
977/// `stored` count and the progress numbering; `total` is the whole-file total
978/// reported in progress events.
979///
980/// A non-quorum error stops the pass but does **not** discard progress: the
981/// successes already completed this pass stay in `stored`/`stored_addresses`,
982/// the quorum shortfalls so far stay in `failed_addresses`, and the error is
983/// returned in [`MerkleStoreOutcome::fatal`] (as `Ok(outcome)`, not `Err`).
984/// Callers that want the old abort-everything behaviour re-raise `fatal` as
985/// `Err`; CLI callers fold it into `PartialUpload` while keeping the stores.
986#[allow(clippy::too_many_arguments)]
987pub(crate) async fn merkle_store_with_retry<F, Fut, C>(
988    addrs: Vec<[u8; 32]>,
989    cap: C,
990    max_attempts: usize,
991    backoff: Duration,
992    progress: Option<&mpsc::Sender<UploadEvent>>,
993    stored_offset: usize,
994    total: usize,
995    store_one: F,
996) -> Result<MerkleStoreOutcome>
997where
998    F: Fn([u8; 32]) -> Fut,
999    Fut: std::future::Future<Output = Result<std::time::Instant>>,
1000    C: Fn() -> usize,
1001{
1002    let attempts = max_attempts.max(1);
1003    let mut outcome = MerkleStoreOutcome {
1004        stored: stored_offset,
1005        ..MerkleStoreOutcome::default()
1006    };
1007    let mut pending = addrs;
1008
1009    for attempt in 0..attempts {
1010        // Carries the failing address forward for the next round plus the last
1011        // quorum-shortfall message, so an exhausted set can report per-chunk
1012        // errors via `failed_addresses`. The chunk BODY is not carried — each
1013        // `store_one` re-reads it on demand, so at most `cap` bodies are ever
1014        // resident regardless of the total chunk count.
1015        let mut next_failed: Vec<([u8; 32], String)> = Vec::new();
1016
1017        // Rolling scheduler: keep up to `cap()` stores in flight, re-reading the
1018        // cap as each slot frees so limiter growth mid-round is applied to the
1019        // remaining chunks (V2-554). Iterator exhaustion bounds the launch count
1020        // to the pending set, so no explicit clamp to `pending.len()` is needed.
1021        let mut pending_iter = pending.into_iter();
1022        let mut in_flight = FuturesUnordered::new();
1023        loop {
1024            let slots = cap().max(1);
1025            while in_flight.len() < slots {
1026                match pending_iter.next() {
1027                    Some(addr) => {
1028                        let fut = store_one(addr);
1029                        in_flight.push(async move { (addr, fut.await) });
1030                    }
1031                    None => break,
1032                }
1033            }
1034            let Some((addr, result)) = in_flight.next().await else {
1035                break;
1036            };
1037            outcome.stats.chunk_attempts_total =
1038                outcome.stats.chunk_attempts_total.saturating_add(1);
1039            match result {
1040                Ok(started) => {
1041                    let duration_ms =
1042                        u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
1043                    outcome.stats.store_durations_ms.push(duration_ms);
1044                    let idx = attempt.min(outcome.stats.retries_histogram.len().saturating_sub(1));
1045                    outcome.stats.retries_histogram[idx] =
1046                        outcome.stats.retries_histogram[idx].saturating_add(1);
1047                    outcome.stored += 1;
1048                    outcome.stored_addresses.push(addr);
1049                    if let Some(tx) = progress {
1050                        let _ = tx.try_send(UploadEvent::ChunkStored {
1051                            stored: outcome.stored,
1052                            total,
1053                        });
1054                    }
1055                }
1056                // A quorum shortfall — whether a timeout-bearing capacity
1057                // shortfall (`InsufficientPeers`), a pure dial-churn shortfall
1058                // (`CloseGroupShortfall`, V2-554), or an app-only rejection
1059                // (`RemotePut`, e.g. pool-rejected / quote-stale / disk-full,
1060                // which are transient) — is recoverable: defer and retry the
1061                // chunk rather than aborting the whole upload (V2-468 / V2-554).
1062                Err(
1063                    e @ (Error::InsufficientPeers(_)
1064                    | Error::CloseGroupShortfall(_)
1065                    | Error::RemotePut { .. }),
1066                ) => {
1067                    next_failed.push((addr, e.to_string()));
1068                }
1069                Err(e) => {
1070                    // Non-quorum error: fatal. Stop consuming the stream but do
1071                    // NOT discard the outcome — successes already completed this
1072                    // pass stay recorded in `stored`/`stored_addresses`. Record
1073                    // the fatal chunk itself (and any quorum shortfalls seen so
1074                    // far) as failed; anything still in flight is left for the
1075                    // caller to treat as not-stored (input minus
1076                    // `stored_addresses`).
1077                    next_failed.push((addr, e.to_string()));
1078                    outcome.fatal = Some(e);
1079                    break;
1080                }
1081            }
1082        }
1083
1084        if outcome.fatal.is_some() {
1085            outcome.failed = next_failed.len();
1086            outcome.failed_addresses = next_failed;
1087            return Ok(outcome);
1088        }
1089
1090        if next_failed.is_empty() {
1091            break;
1092        }
1093
1094        if attempt + 1 < attempts {
1095            warn!(
1096                failed = next_failed.len(),
1097                attempt = attempt + 1,
1098                "merkle chunks short of quorum, retrying after backoff"
1099            );
1100            pending = next_failed.into_iter().map(|(addr, _msg)| addr).collect();
1101            if backoff > Duration::ZERO {
1102                // Jitter the wait (±MERKLE_RETRY_JITTER) so a large failed set
1103                // does not re-probe the same divergent nodes in lockstep.
1104                // `thread_rng` is !Send, so the value is computed and the rng
1105                // dropped before the await to keep this future Send.
1106                let wait = {
1107                    let mut rng = rand::thread_rng();
1108                    let factor = 1.0 + rng.gen_range(-MERKLE_RETRY_JITTER..=MERKLE_RETRY_JITTER);
1109                    backoff.mul_f64(factor)
1110                };
1111                tokio::time::sleep(wait).await;
1112            }
1113        } else {
1114            outcome.failed = next_failed.len();
1115            outcome.failed_addresses = next_failed;
1116            break;
1117        }
1118    }
1119
1120    Ok(outcome)
1121}
1122
1123/// Round delays (seconds) for the merkle upload deferred-retry pass. Round 0
1124/// fires immediately — most quorum shortfalls on a healthy network are
1125/// momentary close-group divergence that clears in well under a second, and
1126/// serializing them behind mandatory sleeps was the single biggest throughput
1127/// sink in the wave path (one bad chunk parked the other 63 slots for minutes).
1128/// Only chunks that survive a round get a longer back-off before the next, so a
1129/// genuinely saturated/diverged group still gets time to settle. Mirrors the
1130/// download path's `DEFERRED_ROUND_DELAYS_SECS`.
1131pub(crate) const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
1132
1133/// Histogram slot for a deferred-retry round's successes.
1134///
1135/// The wave first pass lands in slot 0; deferred round `r` (0-indexed) lands in
1136/// slot `r + 1`, clamped to the last slot so the four-slot
1137/// [`WaveAggregateStats::retries_histogram`] keeps recording "which round a
1138/// chunk landed on" under the post-wave deferred structure.
1139pub(crate) fn deferred_round_histogram_slot(round: usize, hist_len: usize) -> usize {
1140    (round + 1).min(hist_len.saturating_sub(1))
1141}
1142
1143/// Outcome of the post-wave deferred-retry pass.
1144#[derive(Debug, Default)]
1145pub(crate) struct DeferredRetryOutcome {
1146    /// Running total of stored chunks, seeded with the `stored_offset` passed in
1147    /// (i.e. everything the wave passes already stored) and advanced by each
1148    /// deferred round's successes.
1149    pub stored: usize,
1150    /// Addresses that reached quorum during the deferred rounds (to be appended
1151    /// to the file's `stored` set).
1152    pub stored_addresses: Vec<[u8; 32]>,
1153    /// Count of chunks still short of quorum after the final deferred round.
1154    pub failed: usize,
1155    /// Addresses (and last quorum-shortfall message) still short after the final
1156    /// round, or — when `fatal` is set — the chunks that were still pending when
1157    /// a non-quorum error aborted the pass.
1158    pub failed_addresses: Vec<([u8; 32], String)>,
1159    /// Set when a deferred round hit a non-quorum (fatal) store error. The
1160    /// caller surfaces this as `PartialUpload` preserving everything stored so
1161    /// far, mirroring the wave path's fatal handling.
1162    pub fatal: Option<String>,
1163    /// Aggregate store stats merged across rounds, with each round's successes
1164    /// already mapped into its [`deferred_round_histogram_slot`].
1165    pub stats: crate::data::client::batch::WaveAggregateStats,
1166}
1167
1168/// Retry a file-level set of quorum-short merkle chunks in concurrent rounds.
1169///
1170/// This is the upload analogue of the download path's deferred-retry loop. The
1171/// whole-file store pass hands its quorum-short chunks here. Each round stores
1172/// all still-pending chunks in a single cap-bounded pass via
1173/// [`merkle_store_with_retry`] at `concurrency_for(len)` — `store_one(addr)`
1174/// re-reads each body on demand (from the spill), so only the ≤`concurrency_for`
1175/// in-flight stores hold a body and peak resident memory stays bounded
1176/// regardless of the deferred-chunk count. Survivors carry to the next round
1177/// after a `round_delays_secs` sleep. Chunks still short after the final round
1178/// become `failed_addresses`; a non-quorum store error stops the pass and is
1179/// reported via `fatal` (with the quorum shortfalls seen so far recorded as
1180/// `failed_addresses`) so the caller can surface `PartialUpload` — reconciled
1181/// against the full address list — without discarding earlier progress.
1182///
1183/// `store_one`, `progress`, `stored_offset` and `total` mirror
1184/// [`merkle_store_with_retry`].
1185#[allow(clippy::too_many_arguments)]
1186pub(crate) async fn merkle_deferred_retry<CF, SF, Fut>(
1187    deferred: Vec<([u8; 32], String)>,
1188    round_delays_secs: &[u64],
1189    concurrency_for: CF,
1190    progress: Option<&mpsc::Sender<UploadEvent>>,
1191    stored_offset: usize,
1192    total: usize,
1193    store_one: SF,
1194) -> Result<DeferredRetryOutcome>
1195where
1196    CF: Fn(usize) -> usize,
1197    SF: Fn([u8; 32]) -> Fut,
1198    Fut: std::future::Future<Output = Result<std::time::Instant>>,
1199{
1200    let mut outcome = DeferredRetryOutcome {
1201        stored: stored_offset,
1202        ..DeferredRetryOutcome::default()
1203    };
1204    let mut remaining = deferred;
1205    let rounds = round_delays_secs.len();
1206
1207    for (round, &delay_secs) in round_delays_secs.iter().enumerate() {
1208        if remaining.is_empty() {
1209            break;
1210        }
1211        if delay_secs > 0 {
1212            tokio::time::sleep(Duration::from_secs(delay_secs)).await;
1213        }
1214        info!(
1215            "Deferred merkle retry round {}/{}: {} chunk(s) short of quorum",
1216            round + 1,
1217            rounds,
1218            remaining.len(),
1219        );
1220
1221        // Store this round's whole pending set in one cap-bounded pass. A
1222        // single-pass round records its successes in histogram slot 0, so
1223        // redirect them into the round's own slot.
1224        let slot = deferred_round_histogram_slot(round, outcome.stats.retries_histogram.len());
1225        let round_addrs: Vec<[u8; 32]> = std::mem::take(&mut remaining)
1226            .into_iter()
1227            .map(|(addr, _msg)| addr)
1228            .collect();
1229        let round_len = round_addrs.len();
1230        // Re-read the cap per scheduler refill (V2-554) via `concurrency_for`,
1231        // which re-samples the store limiter clamped to this round's size.
1232        let cap = || concurrency_for(round_len);
1233
1234        let round_outcome = merkle_store_with_retry(
1235            round_addrs,
1236            cap,
1237            1,
1238            Duration::ZERO,
1239            progress,
1240            outcome.stored,
1241            total,
1242            &store_one,
1243        )
1244        .await?;
1245
1246        outcome.stored = round_outcome.stored;
1247        outcome
1248            .stored_addresses
1249            .extend(round_outcome.stored_addresses);
1250
1251        // Merge stats, redirecting this round's successes to its slot.
1252        outcome.stats.chunk_attempts_total = outcome
1253            .stats
1254            .chunk_attempts_total
1255            .saturating_add(round_outcome.stats.chunk_attempts_total);
1256        outcome
1257            .stats
1258            .store_durations_ms
1259            .extend(round_outcome.stats.store_durations_ms);
1260        let landed: usize = round_outcome.stats.retries_histogram.iter().sum();
1261        outcome.stats.retries_histogram[slot] =
1262            outcome.stats.retries_histogram[slot].saturating_add(landed);
1263
1264        if let Some(fatal) = round_outcome.fatal {
1265            // Fatal mid-pass: confirmed stores are preserved above. The store
1266            // helper left this round's quorum shortfalls in `failed_addresses`;
1267            // chunks still in flight / not yet launched are reconciled against
1268            // the full address list by the caller's `partial_upload_after_fatal`.
1269            outcome.fatal = Some(fatal.to_string());
1270            outcome.failed = round_outcome.failed_addresses.len();
1271            outcome.failed_addresses = round_outcome.failed_addresses;
1272            return Ok(outcome);
1273        }
1274
1275        // Quorum-short chunks from this round survive to the next.
1276        remaining = round_outcome.failed_addresses;
1277    }
1278
1279    outcome.failed = remaining.len();
1280    outcome.failed_addresses = remaining;
1281    Ok(outcome)
1282}
1283
1284/// Phase 2 of external-signer merkle payment: generate proofs from winner.
1285///
1286/// Takes the prepared batch and the winner pool hash returned by the
1287/// on-chain payment transaction. Generates per-chunk merkle proofs.
1288pub fn finalize_merkle_batch(
1289    prepared: PreparedMerkleBatch,
1290    winner_pool_hash: [u8; 32],
1291) -> Result<MerkleBatchPaymentResult> {
1292    let chunk_count = prepared.addresses.len();
1293    let xornames: Vec<XorName> = prepared.addresses.iter().map(|a| XorName(*a)).collect();
1294
1295    // Find the winner pool
1296    let winner_pool = prepared
1297        .candidate_pools
1298        .iter()
1299        .find(|pool| pool.hash() == winner_pool_hash)
1300        .ok_or_else(|| {
1301            Error::Payment(format!(
1302                "Winner pool {} not found in candidate pools",
1303                hex::encode(winner_pool_hash)
1304            ))
1305        })?;
1306
1307    // Generate proofs for each chunk
1308    info!("Generating merkle proofs for {chunk_count} chunks");
1309    let mut proofs = HashMap::with_capacity(chunk_count);
1310
1311    for (i, xorname) in xornames.iter().enumerate() {
1312        let address_proof = prepared
1313            .tree
1314            .generate_address_proof(i, *xorname)
1315            .map_err(|e| {
1316                Error::Payment(format!(
1317                    "Failed to generate address proof for chunk {i}: {e}"
1318                ))
1319            })?;
1320
1321        let merkle_proof = MerklePaymentProof::new(*xorname, address_proof, winner_pool.clone());
1322
1323        let tagged_bytes = serialize_merkle_proof(&merkle_proof)
1324            .map_err(|e| Error::Serialization(format!("Failed to serialize merkle proof: {e}")))?;
1325
1326        proofs.insert(prepared.addresses[i], tagged_bytes);
1327    }
1328
1329    info!("Merkle batch payment complete: {chunk_count} proofs generated");
1330
1331    Ok(MerkleBatchPaymentResult {
1332        proofs,
1333        chunk_count,
1334        storage_cost_atto: "0".to_string(),
1335        gas_cost_wei: 0,
1336        merkle_payment_timestamp: prepared.merkle_payment_timestamp,
1337    })
1338}
1339
1340/// Compile-time assertions that merkle method futures are Send.
1341#[cfg(test)]
1342mod send_assertions {
1343    use super::*;
1344    use crate::data::client::Client;
1345
1346    fn _assert_send<T: Send>(_: &T) {}
1347
1348    #[allow(
1349        dead_code,
1350        unreachable_code,
1351        unused_variables,
1352        clippy::diverging_sub_expression
1353    )]
1354    async fn _merkle_upload_chunks_is_send(client: &Client) {
1355        let batch_result: MerkleBatchPaymentResult = todo!();
1356        let fut = client.merkle_upload_chunks(Vec::new(), Vec::new(), &batch_result, None, 0, 0);
1357        _assert_send(&fut);
1358    }
1359}
1360
1361#[cfg(test)]
1362#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1363mod tests {
1364    use super::*;
1365    use ant_protocol::evm::{Amount, MerkleTree, RewardsAddress, CANDIDATES_PER_POOL};
1366
1367    // =========================================================================
1368    // should_use_merkle (free function, no Client needed)
1369    // =========================================================================
1370
1371    #[test]
1372    fn test_auto_below_threshold() {
1373        assert!(!should_use_merkle(1, PaymentMode::Auto));
1374        assert!(!should_use_merkle(10, PaymentMode::Auto));
1375        assert!(!should_use_merkle(63, PaymentMode::Auto));
1376    }
1377
1378    #[test]
1379    fn test_auto_at_and_above_threshold() {
1380        assert!(should_use_merkle(64, PaymentMode::Auto));
1381        assert!(should_use_merkle(65, PaymentMode::Auto));
1382        assert!(should_use_merkle(1000, PaymentMode::Auto));
1383    }
1384
1385    #[test]
1386    fn test_merkle_mode_forces_at_2() {
1387        assert!(!should_use_merkle(1, PaymentMode::Merkle));
1388        assert!(should_use_merkle(2, PaymentMode::Merkle));
1389        assert!(should_use_merkle(3, PaymentMode::Merkle));
1390    }
1391
1392    #[test]
1393    fn test_single_mode_always_false() {
1394        assert!(!should_use_merkle(0, PaymentMode::Single));
1395        assert!(!should_use_merkle(64, PaymentMode::Single));
1396        assert!(!should_use_merkle(1000, PaymentMode::Single));
1397    }
1398
1399    #[test]
1400    fn test_default_mode_is_auto() {
1401        assert_eq!(PaymentMode::default(), PaymentMode::Auto);
1402    }
1403
1404    #[test]
1405    fn test_threshold_value() {
1406        assert_eq!(DEFAULT_MERKLE_THRESHOLD, 64);
1407    }
1408
1409    // =========================================================================
1410    // preflight_stored_status — must degrade gracefully, never abort the batch
1411    // =========================================================================
1412
1413    #[test]
1414    fn test_preflight_quotes_gathered_means_not_stored() {
1415        assert!(matches!(preflight_stored_status(Ok(())), Ok(false)));
1416    }
1417
1418    #[test]
1419    fn test_preflight_already_stored_is_stored() {
1420        let r: Result<()> = Err(Error::AlreadyStored);
1421        assert!(matches!(preflight_stored_status(r), Ok(true)));
1422    }
1423
1424    /// The regression: a transient quote-quorum failure during the preflight
1425    /// must NOT propagate (which would abort the whole forced-merkle upload).
1426    /// It is treated as "not known to be stored" → queue the chunk for upload.
1427    #[test]
1428    fn test_preflight_transient_quote_failure_does_not_abort() {
1429        // The exact error STG-01 hit: couldn't gather a 7-quote quorum.
1430        let insufficient: Result<()> =
1431            Err(Error::InsufficientPeers("Got 5 quotes, need 7".to_string()));
1432        assert!(
1433            matches!(preflight_stored_status(insufficient), Ok(false)),
1434            "insufficient-peers during preflight must degrade to not-stored, not error"
1435        );
1436
1437        let timeout: Result<()> = Err(Error::Timeout("Timeout waiting for quote".to_string()));
1438        assert!(matches!(preflight_stored_status(timeout), Ok(false)));
1439
1440        let network: Result<()> = Err(Error::Network("connection reset".to_string()));
1441        assert!(matches!(preflight_stored_status(network), Ok(false)));
1442    }
1443
1444    /// Genuine application errors still propagate — the preflight should not
1445    /// silently swallow problems that would recur on a healthy link.
1446    #[test]
1447    fn test_preflight_application_error_propagates() {
1448        let payment: Result<()> = Err(Error::Payment("bad payment".to_string()));
1449        assert!(matches!(
1450            preflight_stored_status(payment),
1451            Err(Error::Payment(_))
1452        ));
1453    }
1454
1455    #[test]
1456    fn chunk_contents_for_upload_addresses_preserves_requested_order() {
1457        let first = Bytes::from_static(b"first");
1458        let second = Bytes::from_static(b"second");
1459        let first_addr = compute_address(&first);
1460        let second_addr = compute_address(&second);
1461
1462        let selected = chunk_contents_for_upload_addresses(
1463            vec![first.clone(), second.clone()],
1464            &[second_addr, first_addr],
1465        )
1466        .unwrap();
1467
1468        assert_eq!(selected, vec![second, first]);
1469    }
1470
1471    #[test]
1472    fn chunk_contents_for_upload_addresses_preserves_duplicate_requests() {
1473        let repeated = Bytes::from_static(b"same-content");
1474        let other = Bytes::from_static(b"other-content");
1475        let repeated_addr = compute_address(&repeated);
1476
1477        let selected = chunk_contents_for_upload_addresses(
1478            vec![repeated.clone(), other, repeated.clone()],
1479            &[repeated_addr, repeated_addr],
1480        )
1481        .unwrap();
1482
1483        assert_eq!(selected, vec![repeated.clone(), repeated]);
1484    }
1485
1486    #[test]
1487    fn chunk_contents_for_upload_addresses_ignores_unrequested_duplicates() {
1488        let requested = Bytes::from_static(b"requested-content");
1489        let unrequested = Bytes::from_static(b"unrequested-content");
1490        let requested_addr = compute_address(&requested);
1491
1492        let selected = chunk_contents_for_upload_addresses(
1493            vec![
1494                unrequested.clone(),
1495                requested.clone(),
1496                unrequested.clone(),
1497                unrequested,
1498            ],
1499            &[requested_addr],
1500        )
1501        .unwrap();
1502
1503        assert_eq!(selected, vec![requested]);
1504    }
1505
1506    #[test]
1507    fn chunk_contents_for_upload_addresses_errors_for_missing_content() {
1508        let present = Bytes::from_static(b"present-content");
1509        let missing = Bytes::from_static(b"missing-content");
1510        let missing_addr = compute_address(&missing);
1511
1512        let result = chunk_contents_for_upload_addresses(vec![present], &[missing_addr]);
1513
1514        assert!(matches!(result, Err(Error::InvalidData(_))));
1515    }
1516
1517    // =========================================================================
1518    // MerkleTree construction and proof generation (pure, no network)
1519    // =========================================================================
1520
1521    fn make_test_addresses(count: usize) -> Vec<[u8; 32]> {
1522        (0..count)
1523            .map(|i| {
1524                let xn = XorName::from_content(&i.to_le_bytes());
1525                xn.0
1526            })
1527            .collect()
1528    }
1529
1530    #[test]
1531    fn test_tree_depth_for_known_sizes() {
1532        let cases = [(2, 1), (4, 2), (16, 4), (100, 7), (256, 8)];
1533        for (count, expected_depth) in cases {
1534            let addrs = make_test_addresses(count);
1535            let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1536            let tree = MerkleTree::from_xornames(xornames).unwrap();
1537            assert_eq!(
1538                tree.depth(),
1539                expected_depth,
1540                "depth mismatch for {count} leaves"
1541            );
1542        }
1543    }
1544
1545    #[test]
1546    fn test_proof_generation_and_verification_for_all_leaves() {
1547        let addrs = make_test_addresses(16);
1548        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1549        let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1550
1551        for (i, xn) in xornames.iter().enumerate() {
1552            let proof = tree.generate_address_proof(i, *xn).unwrap();
1553            assert!(proof.verify(), "proof for leaf {i} should verify");
1554            assert_eq!(proof.depth(), tree.depth() as usize);
1555        }
1556    }
1557
1558    #[test]
1559    fn test_proof_fails_for_wrong_address() {
1560        let addrs = make_test_addresses(8);
1561        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1562        let tree = MerkleTree::from_xornames(xornames).unwrap();
1563
1564        let wrong = XorName::from_content(b"wrong");
1565        let proof = tree.generate_address_proof(0, wrong).unwrap();
1566        assert!(!proof.verify(), "proof with wrong address should fail");
1567    }
1568
1569    #[test]
1570    fn test_tree_too_few_leaves() {
1571        let xornames = vec![XorName::from_content(b"only_one")];
1572        let result = MerkleTree::from_xornames(xornames);
1573        assert!(result.is_err());
1574    }
1575
1576    #[test]
1577    fn test_tree_at_max_leaves() {
1578        let addrs = make_test_addresses(MAX_LEAVES);
1579        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1580        let tree = MerkleTree::from_xornames(xornames).unwrap();
1581        assert_eq!(tree.leaf_count(), MAX_LEAVES);
1582    }
1583
1584    // =========================================================================
1585    // Proof serialization round-trip
1586    // =========================================================================
1587
1588    #[test]
1589    fn test_merkle_proof_serialize_deserialize_roundtrip() {
1590        use ant_protocol::evm::{Amount, MerklePaymentCandidateNode, RewardsAddress};
1591        use ant_protocol::payment::{deserialize_merkle_proof, serialize_merkle_proof};
1592
1593        let addrs = make_test_addresses(4);
1594        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1595        let tree = MerkleTree::from_xornames(xornames.clone()).unwrap();
1596
1597        let timestamp = std::time::SystemTime::now()
1598            .duration_since(std::time::UNIX_EPOCH)
1599            .unwrap()
1600            .as_secs();
1601
1602        let candidates = tree.reward_candidates(timestamp).unwrap();
1603        let midpoint = candidates.first().unwrap().clone();
1604
1605        // Build candidate nodes (with dummy signatures — not ML-DSA, just for serialization test)
1606        #[allow(clippy::cast_possible_truncation)]
1607        let candidate_nodes: [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] =
1608            std::array::from_fn(|i| MerklePaymentCandidateNode {
1609                pub_key: vec![i as u8; 32],
1610                price: Amount::from(1024u64),
1611                reward_address: RewardsAddress::new([i as u8; 20]),
1612                merkle_payment_timestamp: timestamp,
1613                signature: vec![i as u8; 64],
1614            });
1615
1616        let pool = MerklePaymentCandidatePool {
1617            midpoint_proof: midpoint,
1618            candidate_nodes,
1619        };
1620
1621        let address_proof = tree.generate_address_proof(0, xornames[0]).unwrap();
1622        let merkle_proof = MerklePaymentProof::new(xornames[0], address_proof, pool);
1623
1624        let tagged = serialize_merkle_proof(&merkle_proof).unwrap();
1625        assert_eq!(
1626            tagged.first().copied(),
1627            Some(0x02),
1628            "tag should be PROOF_TAG_MERKLE"
1629        );
1630
1631        let deserialized = deserialize_merkle_proof(&tagged).unwrap();
1632        assert_eq!(deserialized.address, merkle_proof.address);
1633        assert_eq!(
1634            deserialized.winner_pool.candidate_nodes.len(),
1635            CANDIDATES_PER_POOL
1636        );
1637    }
1638
1639    // =========================================================================
1640    // Candidate validation logic
1641    // =========================================================================
1642
1643    #[test]
1644    fn test_candidate_wrong_timestamp_rejected() {
1645        // Simulates what collect_validated_candidates checks
1646        let candidate = MerklePaymentCandidateNode {
1647            pub_key: vec![0u8; 32],
1648            price: ant_protocol::evm::Amount::ZERO,
1649            reward_address: ant_protocol::evm::RewardsAddress::new([0u8; 20]),
1650            merkle_payment_timestamp: 1000,
1651            signature: vec![0u8; 64],
1652        };
1653
1654        // Timestamp check: 1000 != 2000
1655        assert_ne!(candidate.merkle_payment_timestamp, 2000);
1656    }
1657
1658    // =========================================================================
1659    // finalize_merkle_batch (external signer)
1660    // =========================================================================
1661
1662    fn make_dummy_candidate_nodes(
1663        timestamp: u64,
1664    ) -> [MerklePaymentCandidateNode; CANDIDATES_PER_POOL] {
1665        std::array::from_fn(|i| MerklePaymentCandidateNode {
1666            pub_key: vec![i as u8; 32],
1667            price: Amount::from(1024u64),
1668            reward_address: RewardsAddress::new([i as u8; 20]),
1669            merkle_payment_timestamp: timestamp,
1670            signature: vec![i as u8; 64],
1671        })
1672    }
1673
1674    fn make_prepared_merkle_batch(count: usize) -> PreparedMerkleBatch {
1675        let addrs = make_test_addresses(count);
1676        let xornames: Vec<XorName> = addrs.iter().map(|a| XorName(*a)).collect();
1677        let tree = MerkleTree::from_xornames(xornames).unwrap();
1678
1679        let timestamp = std::time::SystemTime::now()
1680            .duration_since(std::time::UNIX_EPOCH)
1681            .unwrap()
1682            .as_secs();
1683
1684        let midpoints = tree.reward_candidates(timestamp).unwrap();
1685
1686        let candidate_pools: Vec<MerklePaymentCandidatePool> = midpoints
1687            .into_iter()
1688            .map(|mp| MerklePaymentCandidatePool {
1689                midpoint_proof: mp,
1690                candidate_nodes: make_dummy_candidate_nodes(timestamp),
1691            })
1692            .collect();
1693
1694        let pool_commitments = candidate_pools
1695            .iter()
1696            .map(MerklePaymentCandidatePool::to_commitment)
1697            .collect();
1698
1699        PreparedMerkleBatch {
1700            depth: tree.depth(),
1701            pool_commitments,
1702            merkle_payment_timestamp: timestamp,
1703            candidate_pools,
1704            tree,
1705            addresses: addrs,
1706        }
1707    }
1708
1709    #[test]
1710    fn test_finalize_merkle_batch_with_valid_winner() {
1711        let prepared = make_prepared_merkle_batch(4);
1712        let winner_hash = prepared.candidate_pools[0].hash();
1713
1714        let result = finalize_merkle_batch(prepared, winner_hash);
1715        assert!(
1716            result.is_ok(),
1717            "should succeed with valid winner: {result:?}"
1718        );
1719
1720        let batch = result.unwrap();
1721        assert_eq!(batch.chunk_count, 4);
1722        assert_eq!(batch.proofs.len(), 4);
1723
1724        // Every proof should be non-empty
1725        for proof_bytes in batch.proofs.values() {
1726            assert!(!proof_bytes.is_empty());
1727        }
1728    }
1729
1730    #[test]
1731    fn test_finalize_merkle_batch_with_invalid_winner() {
1732        let prepared = make_prepared_merkle_batch(4);
1733        let bad_hash = [0xFF; 32];
1734
1735        let result = finalize_merkle_batch(prepared, bad_hash);
1736        assert!(result.is_err());
1737        let err = result.unwrap_err().to_string();
1738        assert!(err.contains("not found in candidate pools"), "got: {err}");
1739    }
1740
1741    #[test]
1742    fn test_finalize_merkle_batch_proofs_are_deserializable() {
1743        use ant_protocol::payment::deserialize_merkle_proof;
1744
1745        let prepared = make_prepared_merkle_batch(8);
1746        let winner_hash = prepared.candidate_pools[0].hash();
1747
1748        let batch = finalize_merkle_batch(prepared, winner_hash).unwrap();
1749
1750        for (addr, proof_bytes) in &batch.proofs {
1751            let proof = deserialize_merkle_proof(proof_bytes);
1752            assert!(
1753                proof.is_ok(),
1754                "proof for {} should deserialize: {:?}",
1755                hex::encode(addr),
1756                proof.err()
1757            );
1758        }
1759    }
1760
1761    // =========================================================================
1762    // Batch splitting edge cases
1763    // =========================================================================
1764
1765    #[test]
1766    fn test_batch_split_calculation() {
1767        // MAX_LEAVES chunks should fit in 1 batch
1768        let addrs = make_test_addresses(MAX_LEAVES);
1769        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 1);
1770
1771        // MAX_LEAVES + 1 should split into 2
1772        let addrs = make_test_addresses(MAX_LEAVES + 1);
1773        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 2);
1774
1775        // 3 * MAX_LEAVES should split into 3
1776        let addrs = make_test_addresses(3 * MAX_LEAVES);
1777        assert_eq!(addrs.chunks(MAX_LEAVES).count(), 3);
1778    }
1779
1780    // =========================================================================
1781    // merkle_store_with_retry: collect-not-abort + bounded retry (C2.1 / C2.2)
1782    // =========================================================================
1783
1784    use std::sync::{Arc, Mutex};
1785
1786    /// Build `count` chunk addresses for the store helper. Bodies are read on
1787    /// demand by `store_one`, so the tests pass addresses only.
1788    fn make_addrs(count: usize) -> Vec<[u8; 32]> {
1789        make_test_addresses(count)
1790    }
1791
1792    /// C2.1: a per-chunk `InsufficientPeers` is collected, not propagated —
1793    /// the whole batch must NOT abort. With a single attempt, the failing
1794    /// subset is reported via `failed` and the rest are `stored`.
1795    #[tokio::test]
1796    async fn store_with_retry_collects_failures_instead_of_aborting() {
1797        let chunks = make_addrs(6);
1798        let failing: std::collections::HashSet<[u8; 32]> = chunks.iter().take(2).copied().collect();
1799        let failing_for_closure = failing.clone();
1800
1801        let store_one = move |addr: [u8; 32]| {
1802            let fail = failing_for_closure.contains(&addr);
1803            async move {
1804                if fail {
1805                    Err(Error::InsufficientPeers("test shortfall".into()))
1806                } else {
1807                    Ok(std::time::Instant::now())
1808                }
1809            }
1810        };
1811
1812        let outcome =
1813            merkle_store_with_retry(chunks, || 8, 1, Duration::ZERO, None, 0, 6, store_one)
1814                .await
1815                .expect("quorum shortfalls must not abort the batch");
1816
1817        assert_eq!(outcome.stored, 4);
1818        assert_eq!(outcome.failed, 2);
1819        // Single attempt → all successes recorded in round 0.
1820        assert_eq!(outcome.stats.retries_histogram[0], 4);
1821        assert_eq!(outcome.stats.chunk_attempts_total, 6);
1822    }
1823
1824    /// V2-554: the store scheduler must RE-READ the cap as each slot frees
1825    /// (rolling), not snapshot it once like `buffer_unordered`. A snapshot would
1826    /// invoke the cap closure once per attempt; the rolling scheduler invokes it
1827    /// once per drained slot, so mid-flight limiter growth reaches the rest of
1828    /// the round. Proven here by counting cap-closure invocations.
1829    #[tokio::test]
1830    async fn store_with_retry_rereads_cap_per_slot() {
1831        let count = 6;
1832        let chunks = make_addrs(count);
1833        let cap_calls = Arc::new(Mutex::new(0usize));
1834        let cap_calls_for_closure = cap_calls.clone();
1835        let cap = move || {
1836            *cap_calls_for_closure.lock().expect("cap counter poisoned") += 1;
1837            2
1838        };
1839        let store_one = move |_addr: [u8; 32]| async move { Ok(std::time::Instant::now()) };
1840
1841        let outcome =
1842            merkle_store_with_retry(chunks, cap, 1, Duration::ZERO, None, 0, count, store_one)
1843                .await
1844                .expect("all stores succeed");
1845
1846        assert_eq!(outcome.stored, count);
1847        let calls = *cap_calls.lock().expect("cap counter poisoned");
1848        assert!(
1849            calls >= count,
1850            "cap must be re-read per drained slot (rolling), not snapshotted once — \
1851             expected >= {count} invocations, got {calls}",
1852        );
1853    }
1854
1855    /// The whole-file store pass is a single cap-bounded fan-out with NO wave
1856    /// barrier: one slow straggler (a chunk whose peers take a long time) must
1857    /// not block the rest of the file, and must be driven to completion
1858    /// concurrently by the fast chunks. If the scheduler serialized (a barrier),
1859    /// the fast chunks could not run until the straggler returned — a deadlock
1860    /// the surrounding timeout would catch.
1861    #[tokio::test]
1862    async fn store_pass_has_no_barrier() {
1863        use std::sync::atomic::{AtomicUsize, Ordering};
1864        let count = 8;
1865        let addrs = make_addrs(count);
1866        let slow = addrs[0];
1867        let fast_completed = Arc::new(AtomicUsize::new(0));
1868        let release_slow = Arc::new(tokio::sync::Notify::new());
1869
1870        let store_one = move |addr: [u8; 32]| {
1871            let fast_completed = fast_completed.clone();
1872            let release_slow = release_slow.clone();
1873            async move {
1874                if addr == slow {
1875                    // Block until every fast chunk has finished. This can only
1876                    // resolve if the fast chunks run WHILE this one is parked —
1877                    // i.e. there is no barrier serializing the store pass.
1878                    release_slow.notified().await;
1879                } else if fast_completed.fetch_add(1, Ordering::SeqCst) + 1 == count - 1 {
1880                    release_slow.notify_one();
1881                }
1882                Ok(std::time::Instant::now())
1883            }
1884        };
1885
1886        let outcome = tokio::time::timeout(
1887            Duration::from_secs(5),
1888            merkle_store_with_retry(addrs, || 8, 1, Duration::ZERO, None, 0, count, store_one),
1889        )
1890        .await
1891        .expect("store pass must not deadlock — a slow chunk must not block the others")
1892        .expect("all stores succeed");
1893
1894        assert_eq!(outcome.stored, count);
1895    }
1896
1897    /// Peak memory is bounded by the store cap, not the file size: `store_one`
1898    /// reads each body on demand, so the scheduler holds at most `cap` stores
1899    /// (hence at most `cap` bodies) in flight at once — the property that lets
1900    /// the whole file store as one fan-out without the old 64-chunk waves.
1901    #[tokio::test]
1902    async fn store_pass_keeps_at_most_cap_in_flight() {
1903        use std::sync::atomic::{AtomicUsize, Ordering};
1904        let count = 40;
1905        let cap = 4;
1906        let addrs = make_addrs(count);
1907        let in_flight = Arc::new(AtomicUsize::new(0));
1908        let max_in_flight = Arc::new(AtomicUsize::new(0));
1909        let max_in_flight_for_closure = max_in_flight.clone();
1910
1911        let store_one = move |_addr: [u8; 32]| {
1912            let in_flight = in_flight.clone();
1913            let max_in_flight = max_in_flight_for_closure.clone();
1914            async move {
1915                let now = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
1916                max_in_flight.fetch_max(now, Ordering::SeqCst);
1917                // Yield so sibling stores get a chance to start concurrently,
1918                // maximising the observed in-flight count.
1919                tokio::task::yield_now().await;
1920                in_flight.fetch_sub(1, Ordering::SeqCst);
1921                Ok(std::time::Instant::now())
1922            }
1923        };
1924
1925        let outcome = merkle_store_with_retry(
1926            addrs,
1927            move || cap,
1928            1,
1929            Duration::ZERO,
1930            None,
1931            0,
1932            count,
1933            store_one,
1934        )
1935        .await
1936        .expect("all stores succeed");
1937
1938        assert_eq!(outcome.stored, count);
1939        let peak = max_in_flight.load(Ordering::SeqCst);
1940        assert!(
1941            peak <= cap,
1942            "at most `cap` bodies may be in flight (memory bound), got peak {peak} > cap {cap}",
1943        );
1944        assert!(
1945            peak > 1,
1946            "the pass must actually run concurrently, not serialize (peak {peak})",
1947        );
1948    }
1949
1950    /// V2-468: an app-only quorum shortfall surfaces as `Error::RemotePut`
1951    /// (pool-rejected / quote-stale / disk-full — transient), which must be
1952    /// treated as recoverable just like `InsufficientPeers`: collected and
1953    /// retried, never aborting the whole batch.
1954    #[tokio::test]
1955    async fn store_with_retry_treats_remote_put_as_recoverable() {
1956        let chunks = make_addrs(6);
1957        let failing: std::collections::HashSet<[u8; 32]> = chunks.iter().take(2).copied().collect();
1958        let failing_for_closure = failing.clone();
1959
1960        let store_one = move |addr: [u8; 32]| {
1961            let fail = failing_for_closure.contains(&addr);
1962            async move {
1963                if fail {
1964                    Err(Error::RemotePut {
1965                        address: hex::encode(addr),
1966                        source: ant_protocol::ProtocolError::StorageFailed(
1967                            "insufficient disk space".into(),
1968                        ),
1969                    })
1970                } else {
1971                    Ok(std::time::Instant::now())
1972                }
1973            }
1974        };
1975
1976        let outcome =
1977            merkle_store_with_retry(chunks, || 8, 1, Duration::ZERO, None, 0, 6, store_one)
1978                .await
1979                .expect("remote app-rejections must not abort the batch");
1980
1981        assert_eq!(outcome.stored, 4);
1982        assert_eq!(outcome.failed, 2);
1983    }
1984
1985    /// A non-quorum error (e.g. a missing proof) is captured in `fatal` rather
1986    /// than discarded — the call returns `Ok(outcome)` so the caller can decide
1987    /// whether to re-raise it or fold it into `PartialUpload`.
1988    #[tokio::test]
1989    async fn store_with_retry_reports_non_quorum_errors_as_fatal() {
1990        let chunks = make_addrs(3);
1991        let store_one = |_addr: [u8; 32]| async move {
1992            Err::<std::time::Instant, _>(Error::Payment("missing proof".into()))
1993        };
1994
1995        let outcome =
1996            merkle_store_with_retry(chunks, || 8, 3, Duration::ZERO, None, 0, 3, store_one)
1997                .await
1998                .expect("fatal is carried in the outcome, not returned as Err");
1999        assert!(matches!(outcome.fatal, Some(Error::Payment(_))));
2000    }
2001
2002    /// A fatal error mid-pass preserves the successes that already completed in
2003    /// the same pass — they are not discarded with the abort. Concurrency 1
2004    /// makes ordering deterministic: the first five chunks store, then the sixth
2005    /// aborts fatally.
2006    #[tokio::test]
2007    async fn store_with_retry_fatal_preserves_same_pass_successes() {
2008        let chunks = make_addrs(6);
2009        let bad = chunks[5];
2010        let store_one = move |addr: [u8; 32]| async move {
2011            if addr == bad {
2012                Err(Error::Payment("fatal".into()))
2013            } else {
2014                Ok(std::time::Instant::now())
2015            }
2016        };
2017
2018        let outcome =
2019            merkle_store_with_retry(chunks, || 1, 1, Duration::ZERO, None, 0, 6, store_one)
2020                .await
2021                .expect("fatal carried in outcome, not returned as Err");
2022        assert!(matches!(outcome.fatal, Some(Error::Payment(_))));
2023        // The five chunks stored before the abort are preserved, not lost.
2024        assert_eq!(outcome.stored, 5);
2025        assert_eq!(outcome.stored_addresses.len(), 5);
2026        assert!(!outcome.stored_addresses.contains(&bad));
2027        // The fatal chunk is reported as failed (not silently dropped).
2028        assert!(outcome.failed_addresses.iter().any(|(a, _)| *a == bad));
2029    }
2030
2031    /// C2.2: only the chunks that failed the previous round are retried.
2032    #[tokio::test]
2033    async fn store_with_retry_retries_only_the_failed_set() {
2034        let chunks = make_addrs(5);
2035        let total = chunks.len();
2036        let failing: std::collections::HashSet<[u8; 32]> = chunks.iter().take(2).copied().collect();
2037        let failing_for_closure = failing.clone();
2038
2039        // Record every (addr) the store op was invoked with, in call order.
2040        let calls = Arc::new(Mutex::new(Vec::<[u8; 32]>::new()));
2041        let calls_for_closure = calls.clone();
2042
2043        let store_one = move |addr: [u8; 32]| {
2044            let calls = calls_for_closure.clone();
2045            // Fails the first round only; succeeds thereafter.
2046            let already_seen = calls.lock().unwrap().iter().filter(|&&a| a == addr).count();
2047            let fail = failing_for_closure.contains(&addr) && already_seen == 0;
2048            calls.lock().unwrap().push(addr);
2049            async move {
2050                if fail {
2051                    Err(Error::InsufficientPeers("round-1 shortfall".into()))
2052                } else {
2053                    Ok(std::time::Instant::now())
2054                }
2055            }
2056        };
2057
2058        let outcome =
2059            merkle_store_with_retry(chunks, || 8, 3, Duration::ZERO, None, 0, total, store_one)
2060                .await
2061                .expect("should converge after retry");
2062
2063        assert_eq!(outcome.stored, total);
2064        assert_eq!(outcome.failed, 0);
2065
2066        // Round 1 drains fully before round 2 starts, so the call log is
2067        // segmented: first `total` calls = round 1 (all chunks), the rest =
2068        // the retry round, which must contain ONLY the failing set.
2069        let calls = calls.lock().unwrap();
2070        assert_eq!(calls.len(), total + failing.len());
2071        let round_two: std::collections::HashSet<[u8; 32]> =
2072            calls[total..].iter().copied().collect();
2073        assert_eq!(round_two, failing);
2074    }
2075
2076    /// C2.2: a chunk that fails attempt 1 and succeeds attempt 2 is counted
2077    /// once as stored and recorded as one retry in `retries_histogram[1]`.
2078    #[tokio::test]
2079    async fn store_with_retry_counts_retry_success_once_in_histogram() {
2080        let chunks = make_addrs(4);
2081        let total = chunks.len();
2082        let flaky_addr = chunks[0];
2083
2084        let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2085        let attempts_for_closure = attempts.clone();
2086
2087        let store_one = move |addr: [u8; 32]| {
2088            let attempts = attempts_for_closure.clone();
2089            let n = {
2090                let mut m = attempts.lock().unwrap();
2091                let entry = m.entry(addr).or_insert(0);
2092                *entry += 1;
2093                *entry
2094            };
2095            let fail = addr == flaky_addr && n == 1;
2096            async move {
2097                if fail {
2098                    Err(Error::InsufficientPeers("transient".into()))
2099                } else {
2100                    Ok(std::time::Instant::now())
2101                }
2102            }
2103        };
2104
2105        let outcome =
2106            merkle_store_with_retry(chunks, || 8, 3, Duration::ZERO, None, 0, total, store_one)
2107                .await
2108                .expect("flaky chunk should recover on retry");
2109
2110        assert_eq!(outcome.stored, total);
2111        assert_eq!(outcome.failed, 0);
2112        // 3 chunks landed on the first attempt, 1 on the first retry.
2113        assert_eq!(outcome.stats.retries_histogram[0], total - 1);
2114        assert_eq!(outcome.stats.retries_histogram[1], 1);
2115        // One extra store attempt for the flaky chunk.
2116        assert_eq!(outcome.stats.chunk_attempts_total, total + 1);
2117    }
2118
2119    /// C2.2: when every chunk stays short of quorum through the whole attempt
2120    /// budget, the helper still returns `Ok` (collect-not-abort) with the full
2121    /// batch reported as `failed`, having tried each chunk exactly
2122    /// `MERKLE_STORE_MAX_ATTEMPTS` times.
2123    #[tokio::test]
2124    async fn store_with_retry_reports_all_failed_when_retries_exhausted() {
2125        let chunks = make_addrs(3);
2126        let total = chunks.len();
2127
2128        let store_one = |_addr: [u8; 32]| async move {
2129            Err::<std::time::Instant, _>(Error::InsufficientPeers("never converges".into()))
2130        };
2131
2132        let outcome = merkle_store_with_retry(
2133            chunks,
2134            || 8,
2135            MERKLE_STORE_MAX_ATTEMPTS,
2136            Duration::ZERO,
2137            None,
2138            0,
2139            total,
2140            store_one,
2141        )
2142        .await
2143        .expect("an exhausted retry budget is reported, not propagated as Err");
2144
2145        assert_eq!(outcome.stored, 0);
2146        assert_eq!(outcome.failed, total);
2147        // Every chunk was attempted once per round across the full budget.
2148        assert_eq!(
2149            outcome.stats.chunk_attempts_total,
2150            total * MERKLE_STORE_MAX_ATTEMPTS
2151        );
2152        // No successes, so the histogram stays empty.
2153        assert_eq!(outcome.stats.retries_histogram, [0; 4]);
2154    }
2155
2156    /// D (CLI path): when retries are exhausted, `failed_addresses` names
2157    /// exactly the still-short-of-quorum chunks (with their last error message)
2158    /// and excludes the ones that stored. This is what `upload_merkle_from_spill`
2159    /// uses to build `PartialUpload`.
2160    #[tokio::test]
2161    async fn store_with_retry_records_failed_addresses_when_exhausted() {
2162        let chunks = make_addrs(6);
2163        let failing: std::collections::HashSet<[u8; 32]> = chunks.iter().take(2).copied().collect();
2164        let failing_for_closure = failing.clone();
2165
2166        let store_one = move |addr: [u8; 32]| {
2167            let fail = failing_for_closure.contains(&addr);
2168            async move {
2169                if fail {
2170                    Err(Error::InsufficientPeers("permanent shortfall".into()))
2171                } else {
2172                    Ok(std::time::Instant::now())
2173                }
2174            }
2175        };
2176
2177        let outcome = merkle_store_with_retry(
2178            chunks,
2179            || 8,
2180            MERKLE_STORE_MAX_ATTEMPTS,
2181            Duration::ZERO,
2182            None,
2183            0,
2184            6,
2185            store_one,
2186        )
2187        .await
2188        .expect("quorum shortfalls must not abort the batch");
2189
2190        assert_eq!(outcome.stored, 4);
2191        assert_eq!(outcome.failed, 2);
2192        // `failed_addresses` names exactly the failing set, no stored chunks.
2193        assert_eq!(outcome.failed_addresses.len(), 2);
2194        let reported: std::collections::HashSet<[u8; 32]> =
2195            outcome.failed_addresses.iter().map(|(a, _)| *a).collect();
2196        assert_eq!(reported, failing);
2197        // Each carries a non-empty error message for the PartialUpload report.
2198        for (_, msg) in &outcome.failed_addresses {
2199            assert!(msg.contains("permanent shortfall"));
2200        }
2201    }
2202
2203    /// `failed_addresses` is empty when every chunk reaches quorum (no
2204    /// `PartialUpload` is raised by the CLI path in that case).
2205    #[tokio::test]
2206    async fn store_with_retry_failed_addresses_empty_on_full_success() {
2207        let chunks = make_addrs(4);
2208        let total = chunks.len();
2209        let store_one = |_addr: [u8; 32]| async move { Ok(std::time::Instant::now()) };
2210
2211        let outcome = merkle_store_with_retry(
2212            chunks,
2213            || 8,
2214            MERKLE_STORE_MAX_ATTEMPTS,
2215            Duration::ZERO,
2216            None,
2217            0,
2218            total,
2219            store_one,
2220        )
2221        .await
2222        .expect("all chunks store");
2223
2224        assert_eq!(outcome.stored, total);
2225        assert_eq!(outcome.failed, 0);
2226        assert!(outcome.failed_addresses.is_empty());
2227    }
2228
2229    // =========================================================================
2230    // merkle_deferred_retry: download-style concurrent post-wave retry (V2-466)
2231    // =========================================================================
2232
2233    /// The histogram slot mapping: the wave first pass is slot 0; deferred
2234    /// round `r` is slot `r + 1`, clamped to the last slot.
2235    #[test]
2236    fn deferred_round_histogram_slot_maps_and_clamps() {
2237        assert_eq!(deferred_round_histogram_slot(0, 4), 1);
2238        assert_eq!(deferred_round_histogram_slot(1, 4), 2);
2239        assert_eq!(deferred_round_histogram_slot(2, 4), 3);
2240        // Beyond the histogram width, clamp to the final slot.
2241        assert_eq!(deferred_round_histogram_slot(3, 4), 3);
2242        assert_eq!(deferred_round_histogram_slot(9, 4), 3);
2243    }
2244
2245    fn deferred_set(count: usize) -> Vec<([u8; 32], String)> {
2246        make_test_addresses(count)
2247            .into_iter()
2248            .map(|addr| (addr, "short of quorum".to_string()))
2249            .collect()
2250    }
2251
2252    /// A chunk that is quorum-short on early rounds but succeeds on a later
2253    /// round is stored exactly once, recorded in that round's histogram slot,
2254    /// and reported with no failures.
2255    #[tokio::test]
2256    async fn deferred_retry_succeeds_on_a_later_round() {
2257        let deferred = deferred_set(3);
2258        // Each chunk fails its first attempt (round 0) and succeeds the second
2259        // (round 1 → histogram slot 2).
2260        let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2261        let attempts_for_closure = attempts.clone();
2262        let store_one = move |addr: [u8; 32]| {
2263            let attempts = attempts_for_closure.clone();
2264            async move {
2265                let n = {
2266                    let mut map = attempts.lock().unwrap();
2267                    let e = map.entry(addr).or_insert(0);
2268                    *e += 1;
2269                    *e
2270                };
2271                if n < 2 {
2272                    Err(Error::InsufficientPeers("still short".into()))
2273                } else {
2274                    Ok(std::time::Instant::now())
2275                }
2276            }
2277        };
2278
2279        let outcome = merkle_deferred_retry(
2280            deferred,
2281            &[0, 0, 0],
2282            |n: usize| n.max(1),
2283            None,
2284            0,
2285            3,
2286            store_one,
2287        )
2288        .await
2289        .expect("deferred retry must not abort on quorum shortfalls");
2290
2291        assert_eq!(outcome.stored, 3, "all three land by round 1");
2292        assert_eq!(outcome.stored_addresses.len(), 3);
2293        assert_eq!(outcome.failed, 0);
2294        assert!(outcome.failed_addresses.is_empty());
2295        assert!(outcome.fatal.is_none());
2296        // Round 1 → slot 2; round 0 (slot 1) saw zero successes.
2297        assert_eq!(outcome.stats.retries_histogram[1], 0);
2298        assert_eq!(outcome.stats.retries_histogram[2], 3);
2299        // Each chunk attempted twice: one failed round + one success round.
2300        assert_eq!(outcome.stats.chunk_attempts_total, 6);
2301    }
2302
2303    /// Chunks still short of quorum after the final deferred round become
2304    /// `failed`, not silently dropped, and no fatal error is set.
2305    #[tokio::test]
2306    async fn deferred_retry_leftovers_become_failed() {
2307        let deferred = deferred_set(2);
2308        let store_one = |_addr: [u8; 32]| async move {
2309            Err::<std::time::Instant, _>(Error::InsufficientPeers("always short".into()))
2310        };
2311
2312        let outcome = merkle_deferred_retry(
2313            deferred,
2314            &[0, 0, 0],
2315            |n: usize| n.max(1),
2316            None,
2317            0,
2318            2,
2319            store_one,
2320        )
2321        .await
2322        .expect("exhausted retries report failures, not an error");
2323
2324        assert_eq!(outcome.stored, 0);
2325        assert!(outcome.stored_addresses.is_empty());
2326        assert_eq!(outcome.failed, 2);
2327        assert_eq!(outcome.failed_addresses.len(), 2);
2328        assert!(outcome.fatal.is_none());
2329        // Three rounds × two chunks, all failing.
2330        assert_eq!(outcome.stats.chunk_attempts_total, 6);
2331    }
2332
2333    /// A non-quorum (fatal) error during a deferred round stops the pass, is
2334    /// surfaced via `fatal`, and preserves an earlier round's success in
2335    /// `stored`/`stored_addresses` while the still-pending chunk is reported as
2336    /// failed.
2337    #[tokio::test]
2338    async fn deferred_retry_fatal_error_preserves_prior_progress() {
2339        let addrs = make_test_addresses(2);
2340        let good = addrs[0];
2341        let bad = addrs[1];
2342        let deferred = vec![(good, "short".to_string()), (bad, "short".to_string())];
2343
2344        // `good` succeeds on round 0; `bad` is quorum-short on round 0, then
2345        // hits a fatal Payment error on round 1.
2346        let attempts = Arc::new(Mutex::new(HashMap::<[u8; 32], usize>::new()));
2347        let attempts_for_closure = attempts.clone();
2348        let store_one = move |addr: [u8; 32]| {
2349            let attempts = attempts_for_closure.clone();
2350            async move {
2351                let n = {
2352                    let mut map = attempts.lock().unwrap();
2353                    let e = map.entry(addr).or_insert(0);
2354                    *e += 1;
2355                    *e
2356                };
2357                if addr == good {
2358                    Ok(std::time::Instant::now())
2359                } else if n == 1 {
2360                    Err(Error::InsufficientPeers("short".into()))
2361                } else {
2362                    Err(Error::Payment("fatal on retry".into()))
2363                }
2364            }
2365        };
2366
2367        let outcome = merkle_deferred_retry(
2368            deferred,
2369            &[0, 0, 0],
2370            |n: usize| n.max(1),
2371            None,
2372            0,
2373            2,
2374            store_one,
2375        )
2376        .await
2377        .expect("a fatal round error is reported via `fatal`, not as Err");
2378
2379        assert!(outcome.fatal.is_some(), "fatal error must be captured");
2380        assert_eq!(outcome.stored, 1, "round-0 success preserved");
2381        assert_eq!(outcome.stored_addresses, vec![good]);
2382        assert_eq!(outcome.failed, 1);
2383        assert_eq!(outcome.failed_addresses.len(), 1);
2384        assert_eq!(outcome.failed_addresses[0].0, bad);
2385    }
2386
2387    /// An empty deferred set is a no-op: no rounds run, nothing stored or failed.
2388    #[tokio::test]
2389    async fn deferred_retry_empty_set_is_a_noop() {
2390        let store_one = |_addr: [u8; 32]| async move {
2391            Err::<std::time::Instant, _>(Error::InsufficientPeers("unused".into()))
2392        };
2393
2394        let outcome = merkle_deferred_retry(
2395            Vec::new(),
2396            &DEFERRED_ROUND_DELAYS_SECS,
2397            |n: usize| n.max(1),
2398            None,
2399            7,
2400            7,
2401            store_one,
2402        )
2403        .await
2404        .expect("empty deferred set is a no-op");
2405
2406        assert_eq!(outcome.stored, 7, "stored_offset carried through unchanged");
2407        assert_eq!(outcome.failed, 0);
2408        assert!(outcome.stored_addresses.is_empty());
2409        assert!(outcome.failed_addresses.is_empty());
2410        assert!(outcome.fatal.is_none());
2411    }
2412}