Skip to main content

ant_core/data/client/
batch.rs

1//! Batch chunk upload with wave-based pipelined EVM payments.
2//!
3//! Groups chunks into waves of 64 and pays for each
4//! wave in a single EVM transaction. Stores from wave N are pipelined
5//! with quote collection for wave N+1 via `tokio::join!`.
6
7use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, Result};
13use ant_protocol::evm::{
14    Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15    RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment};
18use ant_protocol::transport::{MultiAddr, PeerId};
19use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
20use bytes::Bytes;
21use futures::stream::{self, StreamExt};
22use std::collections::{HashMap, HashSet};
23use std::time::{Duration, Instant};
24use tokio::sync::mpsc;
25use tracing::{debug, info, warn};
26
27/// Number of chunks per payment wave.
28const PAYMENT_WAVE_SIZE: usize = 64;
29
30/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`].
31#[derive(Debug)]
32pub struct PreparedChunk {
33    /// The chunk content bytes.
34    pub content: Bytes,
35    /// Content address (BLAKE3 hash).
36    pub address: XorName,
37    /// Closest peers from quote collection — PUT targets for close-group replication.
38    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
39    /// Payment structure (quotes sorted, median selected, not yet paid on-chain).
40    pub payment: SingleNodePayment,
41    /// Peer quotes for building `ProofOfPayment`.
42    pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
43}
44
45/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`].
46#[derive(Debug, Clone)]
47pub struct PaidChunk {
48    /// The chunk content bytes.
49    pub content: Bytes,
50    /// Content address (BLAKE3 hash).
51    pub address: XorName,
52    /// Closest peers from quote collection — PUT targets for close-group replication.
53    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
54    /// Serialized [`PaymentProof`] bytes.
55    pub proof_bytes: Vec<u8>,
56}
57
58/// Result of storing a wave of paid chunks, with retry tracking.
59#[derive(Debug)]
60pub struct WaveResult {
61    /// Successfully stored chunk addresses.
62    pub stored: Vec<XorName>,
63    /// Chunks that failed to store after all retries.
64    pub failed: Vec<(XorName, String)>,
65    /// Sum of store-RPC attempts across all chunks in this wave (>= stored.len() + failed.len()).
66    pub chunk_attempts_total: usize,
67    /// Per-chunk wall-clock (ms) from first attempt to successful store. Only populated for stored chunks.
68    pub store_durations_ms: Vec<u64>,
69    /// Histogram of which retry-round each stored chunk succeeded on (index 0 = first attempt).
70    pub retries_per_chunk: Vec<u32>,
71}
72
73/// Aggregated retry / wall-clock stats across one or more [`WaveResult`]s.
74///
75/// Used by [`Client::batch_upload_chunks_with_events`] (which may store
76/// multiple waves per call) and surfaced upward into `FileUploadResult` so
77/// downstream tooling can record per-upload retry pressure and per-chunk
78/// store wall-clock without needing log parsing.
79#[derive(Debug, Default, Clone)]
80pub struct WaveAggregateStats {
81    /// Sum of store-RPC attempts across all waves (>= chunks_stored).
82    pub chunk_attempts_total: usize,
83    /// Per-chunk wall-clock (ms) from first attempt to successful store,
84    /// concatenated across waves.
85    pub store_durations_ms: Vec<u64>,
86    /// Count of stored chunks that succeeded on each retry round
87    /// (index 0 = first attempt, 1 = first retry, etc.). Indices match
88    /// the retry rounds emitted by `Client::store_paid_chunks_with_events`
89    /// which caps at `MAX_RETRIES = 3`, so an array of 4 suffices.
90    pub retries_histogram: [usize; 4],
91}
92
93impl WaveAggregateStats {
94    /// Fold one [`WaveResult`]'s stats into the running aggregate.
95    pub fn absorb(&mut self, wave: &WaveResult) {
96        self.chunk_attempts_total = self
97            .chunk_attempts_total
98            .saturating_add(wave.chunk_attempts_total);
99        self.store_durations_ms.extend(&wave.store_durations_ms);
100        for &r in &wave.retries_per_chunk {
101            let idx = (r as usize).min(self.retries_histogram.len() - 1);
102            self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
103        }
104    }
105}
106
107/// Compute a percentile from an unsorted slice of `u64` values.
108///
109/// `p` is in `[0.0, 1.0]`. Returns 0 for an empty slice. Uses nearest-rank;
110/// callers don't need numerical precision here — these are coarse log/metric
111/// summaries.
112fn percentile(values: &[u64], p: f64) -> u64 {
113    if values.is_empty() {
114        return 0;
115    }
116    let mut sorted = values.to_vec();
117    sorted.sort_unstable();
118    let p = p.clamp(0.0, 1.0);
119    // Nearest-rank: ceil(p * n) - 1, clamped to [0, n-1].
120    let n = sorted.len();
121    #[allow(
122        clippy::cast_possible_truncation,
123        clippy::cast_sign_loss,
124        clippy::cast_precision_loss
125    )]
126    let rank = ((p * n as f64).ceil() as usize)
127        .saturating_sub(1)
128        .min(n - 1);
129    sorted[rank]
130}
131
132/// Payment data for external signing.
133///
134/// Contains the information needed to construct and submit the on-chain
135/// payment transaction without requiring a local wallet or private key.
136#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
137pub struct PaymentIntent {
138    /// Individual payment entries: (quote_hash, rewards_address, amount).
139    pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
140    /// Total amount across all payments.
141    pub total_amount: Amount,
142}
143
144impl PaymentIntent {
145    /// Build from a set of prepared chunks.
146    ///
147    /// Collects all non-zero payment entries and computes the total.
148    pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
149        let mut payments = Vec::new();
150        let mut total = Amount::ZERO;
151        for chunk in prepared {
152            for info in &chunk.payment.quotes {
153                if !info.amount.is_zero() {
154                    payments.push((info.quote_hash, info.rewards_address, info.amount));
155                    total += info.amount;
156                }
157            }
158        }
159        Self {
160            payments,
161            total_amount: total,
162        }
163    }
164}
165
166/// Build [`PaidChunk`]s from prepared chunks and externally-provided transaction hashes.
167///
168/// Shared by [`Client::batch_pay`] (wallet flow) and [`finalize_batch_payment`] (external signer).
169///
170/// Returns an error if any non-zero-amount quote hash is missing from `tx_hash_map`,
171/// since chunks uploaded without valid proofs would be rejected by the network.
172fn build_paid_chunks(
173    prepared: Vec<PreparedChunk>,
174    tx_hash_map: &HashMap<QuoteHash, TxHash>,
175) -> Result<Vec<PaidChunk>> {
176    let mut paid_chunks = Vec::with_capacity(prepared.len());
177    for chunk in prepared {
178        let mut tx_hashes = Vec::new();
179        for info in &chunk.payment.quotes {
180            if !info.amount.is_zero() {
181                let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
182                    Error::Payment(format!(
183                        "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
184                        hex::encode(info.quote_hash)
185                    ))
186                })?;
187                tx_hashes.push(tx_hash);
188            }
189        }
190
191        let proof = PaymentProof {
192            proof_of_payment: ProofOfPayment {
193                peer_quotes: chunk.peer_quotes,
194            },
195            tx_hashes,
196        };
197
198        let proof_bytes = serialize_single_node_proof(&proof)
199            .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
200
201        paid_chunks.push(PaidChunk {
202            content: chunk.content,
203            address: chunk.address,
204            quoted_peers: chunk.quoted_peers,
205            proof_bytes,
206        });
207    }
208    Ok(paid_chunks)
209}
210
211/// Finalize a batch payment using externally-provided transaction hashes.
212///
213/// Takes prepared chunks and a map of `quote_hash -> tx_hash` from the
214/// external signer. Builds per-chunk `PaymentProof` bytes without needing a wallet.
215pub fn finalize_batch_payment(
216    prepared: Vec<PreparedChunk>,
217    tx_hash_map: &HashMap<QuoteHash, TxHash>,
218) -> Result<Vec<PaidChunk>> {
219    build_paid_chunks(prepared, tx_hash_map)
220}
221
222impl Client {
223    /// Prepare a single chunk for batch payment.
224    ///
225    /// Collects quotes and uses node-reported prices without making any
226    /// on-chain transaction. Returns `Ok(None)` if the chunk is already
227    /// stored on the network.
228    ///
229    /// # Errors
230    ///
231    /// Returns an error if quote collection or payment construction fails.
232    pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
233        let address = compute_address(&content);
234        let data_size = u64::try_from(content.len())
235            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
236
237        let quotes_with_peers = match self
238            .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK)
239            .await
240        {
241            Ok(quotes) => quotes,
242            Err(Error::AlreadyStored) => {
243                debug!("Chunk {} already stored, skipping", hex::encode(address));
244                return Ok(None);
245            }
246            Err(e) => return Err(e),
247        };
248
249        // Capture all quoted peers for close-group replication.
250        let quoted_peers: Vec<(PeerId, Vec<MultiAddr>)> = quotes_with_peers
251            .iter()
252            .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone()))
253            .collect();
254
255        // Build peer_quotes for ProofOfPayment + quotes for SingleNodePayment.
256        // Use node-reported prices directly — no contract price fetch needed.
257        let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
258        let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
259
260        for (peer_id, _addrs, quote, price) in quotes_with_peers {
261            let encoded = peer_id_to_encoded(&peer_id)?;
262            peer_quotes.push((encoded, quote.clone()));
263            quotes_for_payment.push((quote, price));
264        }
265
266        let payment = SingleNodePayment::from_quotes(quotes_for_payment)
267            .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
268
269        Ok(Some(PreparedChunk {
270            content,
271            address,
272            quoted_peers,
273            payment,
274            peer_quotes,
275        }))
276    }
277
278    /// Pay for multiple chunks in a single EVM transaction.
279    ///
280    /// Flattens all quote payments from the prepared chunks into one
281    /// `wallet.pay_for_quotes()` call, then maps transaction hashes
282    /// back to per-chunk [`PaymentProof`] bytes.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the wallet is not configured or the on-chain
287    /// payment fails.
288    /// Returns `(paid_chunks, storage_cost_atto, gas_cost_wei)`.
289    pub async fn batch_pay(
290        &self,
291        prepared: Vec<PreparedChunk>,
292    ) -> Result<(Vec<PaidChunk>, String, u128)> {
293        if prepared.is_empty() {
294            return Ok((Vec::new(), "0".to_string(), 0));
295        }
296
297        let wallet = self.require_wallet()?;
298
299        // Compute total storage cost from the prepared chunks before paying.
300        let intent = PaymentIntent::from_prepared_chunks(&prepared);
301        let storage_cost_atto = intent.total_amount.to_string();
302
303        // Flatten all quote payments from all chunks into a single batch.
304        let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
305        let mut all_payments = Vec::with_capacity(total_quotes);
306        for chunk in &prepared {
307            for info in &chunk.payment.quotes {
308                all_payments.push((info.quote_hash, info.rewards_address, info.amount));
309            }
310        }
311
312        debug!(
313            "Batch payment for {} chunks ({} quote entries)",
314            prepared.len(),
315            all_payments.len()
316        );
317
318        let (tx_hash_map, gas_info) =
319            wallet
320                .pay_for_quotes(all_payments)
321                .await
322                .map_err(|PayForQuotesError(err, _)| {
323                    Error::Payment(format!("Batch payment failed: {err}"))
324                })?;
325
326        info!(
327            "Batch payment succeeded: {} transactions",
328            tx_hash_map.len()
329        );
330
331        let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
332        let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
333        Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
334    }
335
336    /// Upload chunks in waves with pipelined EVM payments.
337    ///
338    /// Processes chunks in waves of `PAYMENT_WAVE_SIZE` (64). Within each wave:
339    /// 1. **Prepare**: collect quotes for all chunks concurrently
340    /// 2. **Pay**: single EVM transaction for the whole wave
341    /// 3. **Store**: concurrent chunk replication to close group
342    ///
343    /// Stores from wave N overlap with quote collection for wave N+1
344    /// via `tokio::join!`.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if any payment or store operation fails.
349    /// Returns `(addresses, total_storage_cost_atto, total_gas_cost_wei)`.
350    pub async fn batch_upload_chunks(
351        &self,
352        chunks: Vec<Bytes>,
353    ) -> Result<(Vec<XorName>, String, u128)> {
354        let (addresses, storage, gas, _stats) = self
355            .batch_upload_chunks_with_events(chunks, None, 0, 0)
356            .await?;
357        Ok((addresses, storage, gas))
358    }
359
360    /// Same as [`Client::batch_upload_chunks`] but sends [`UploadEvent::ChunkStored`]
361    /// events as each chunk is stored, enabling per-chunk progress bars.
362    ///
363    /// `stored_offset` is the number of chunks already stored in previous waves
364    /// (so events report cumulative progress). `file_total` is the total chunk
365    /// count across ALL waves (for the `total` field in events).
366    pub async fn batch_upload_chunks_with_events(
367        &self,
368        chunks: Vec<Bytes>,
369        progress: Option<&mpsc::Sender<UploadEvent>>,
370        stored_offset: usize,
371        file_total: usize,
372    ) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
373        if chunks.is_empty() {
374            return Ok((
375                Vec::new(),
376                "0".to_string(),
377                0,
378                WaveAggregateStats::default(),
379            ));
380        }
381
382        let total_chunks = chunks.len();
383        let quote_cap = self.controller().quote.current();
384        let store_cap = self.controller().store.current();
385        debug!(
386            "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
387             (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
388        );
389
390        let mut all_addresses = Vec::with_capacity(total_chunks);
391        let mut seen_addresses: HashSet<XorName> = HashSet::new();
392
393        // Accumulate costs across waves.
394        let mut total_storage = Amount::ZERO;
395        let mut total_gas: u128 = 0;
396        let mut agg_stats = WaveAggregateStats::default();
397
398        // Deduplicate chunks by content address.
399        let mut unique_chunks = Vec::with_capacity(total_chunks);
400        for chunk in chunks {
401            let address = compute_address(&chunk);
402            if seen_addresses.insert(address) {
403                unique_chunks.push(chunk);
404            } else {
405                debug!("Skipping duplicate chunk {}", hex::encode(address));
406                all_addresses.push(address);
407                if let Some(tx) = progress {
408                    let _ = tx.try_send(UploadEvent::ChunkStored {
409                        stored: stored_offset + all_addresses.len(),
410                        total: file_total,
411                    });
412                }
413            }
414        }
415
416        // Split into waves.
417        let waves: Vec<Vec<Bytes>> = unique_chunks
418            .chunks(PAYMENT_WAVE_SIZE)
419            .map(<[Bytes]>::to_vec)
420            .collect();
421        let wave_count = waves.len();
422
423        debug!(
424            "{total_chunks} chunks -> {} unique -> {wave_count} waves",
425            seen_addresses.len()
426        );
427
428        let mut pending_store: Option<Vec<PaidChunk>> = None;
429        let mut total_quoted: usize = 0;
430
431        for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
432            let wave_num = wave_idx + 1;
433            let wave_size = wave_chunks.len();
434
435            // Pipeline: store previous wave while preparing this one.
436            let (prepare_result, store_result) = match pending_store.take() {
437                Some(paid_chunks) => {
438                    let store_offset = stored_offset + all_addresses.len();
439                    let quoted_offset = stored_offset + total_quoted;
440                    let (prep, stored) = tokio::join!(
441                        self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
442                        self.store_paid_chunks_with_events(
443                            paid_chunks,
444                            progress,
445                            store_offset,
446                            file_total
447                        )
448                    );
449                    (prep, Some(stored))
450                }
451                None => {
452                    let quoted_offset = stored_offset + total_quoted;
453                    let result = self
454                        .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
455                        .await;
456                    (result, None)
457                }
458            };
459            total_quoted += wave_size;
460
461            // Track partial progress from previous wave.
462            if let Some(wave_result) = store_result {
463                all_addresses.extend(&wave_result.stored);
464                agg_stats.absorb(&wave_result);
465                if !wave_result.failed.is_empty() {
466                    let failed_count = wave_result.failed.len();
467                    warn!("{failed_count} chunks failed to store after retries");
468                    return Err(Error::PartialUpload {
469                        stored: all_addresses.clone(),
470                        stored_count: stored_offset + all_addresses.len(),
471                        failed: wave_result.failed,
472                        failed_count,
473                        total_chunks: file_total,
474                        reason: "wave store failed after retries".into(),
475                    });
476                }
477            }
478
479            let (prepared_chunks, already_stored) = prepare_result?;
480            all_addresses.extend(&already_stored);
481            if let Some(tx) = progress {
482                for _ in &already_stored {
483                    let _ = tx.try_send(UploadEvent::ChunkStored {
484                        stored: stored_offset + all_addresses.len(),
485                        total: file_total,
486                    });
487                }
488            }
489
490            if prepared_chunks.is_empty() {
491                info!("Wave {wave_num}/{wave_count}: all chunks already stored");
492                continue;
493            }
494
495            info!(
496                "Wave {wave_num}/{wave_count}: paying for {} chunks",
497                prepared_chunks.len()
498            );
499            let (paid_chunks, wave_storage, wave_gas) = self.batch_pay(prepared_chunks).await?;
500            if let Ok(cost) = wave_storage.parse::<Amount>() {
501                total_storage += cost;
502            }
503            total_gas = total_gas.saturating_add(wave_gas);
504            pending_store = Some(paid_chunks);
505        }
506
507        // Store the last wave.
508        if let Some(paid_chunks) = pending_store {
509            let store_offset = stored_offset + all_addresses.len();
510            let wave_result = self
511                .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
512                .await;
513            all_addresses.extend(&wave_result.stored);
514            agg_stats.absorb(&wave_result);
515            if !wave_result.failed.is_empty() {
516                let failed_count = wave_result.failed.len();
517                warn!("{failed_count} chunks failed to store after retries (final wave)");
518                return Err(Error::PartialUpload {
519                    stored: all_addresses.clone(),
520                    stored_count: stored_offset + all_addresses.len(),
521                    failed: wave_result.failed,
522                    failed_count,
523                    total_chunks: file_total,
524                    reason: "final wave store failed after retries".into(),
525                });
526            }
527        }
528
529        debug!("Batch upload complete: {} addresses", all_addresses.len());
530        Ok((
531            all_addresses,
532            total_storage.to_string(),
533            total_gas,
534            agg_stats,
535        ))
536    }
537
538    /// Prepare a wave of chunks by collecting quotes concurrently.
539    ///
540    /// Fires [`UploadEvent::ChunkQuoted`] as each chunk's quote completes.
541    /// Returns `(prepared_chunks, already_stored_addresses)`.
542    async fn prepare_wave(
543        &self,
544        chunks: Vec<Bytes>,
545        progress: Option<&mpsc::Sender<UploadEvent>>,
546        quoted_offset: usize,
547        file_total: usize,
548    ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
549        let chunk_count = chunks.len();
550        let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
551            .into_iter()
552            .map(|c| {
553                let addr = compute_address(&c);
554                (c, addr)
555            })
556            .collect();
557
558        let quote_limiter = self.controller().quote.clone();
559        // Batch-aware fan-out: clamp to chunk_count so we never
560        // pay for fan-out slots we cannot fill on a partial wave.
561        // See PERF-RESULTS.md — measured ~30% slowdown when
562        // cap > batch size on quoting workloads (live mainnet).
563        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
564        let mut quote_stream = stream::iter(chunks_with_addr)
565            .map(|(content, address)| {
566                let limiter = quote_limiter.clone();
567                async move {
568                    let result = observe_op(
569                        &limiter,
570                        || async move { self.prepare_chunk_payment(content).await },
571                        classify_error,
572                    )
573                    .await;
574                    (address, result)
575                }
576            })
577            .buffer_unordered(quote_concurrency);
578
579        let mut prepared = Vec::with_capacity(chunk_count);
580        let mut already_stored = Vec::new();
581        let mut quoted_count = 0usize;
582
583        while let Some((address, result)) = quote_stream.next().await {
584            let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
585            match result? {
586                Some(chunk) => prepared.push(chunk),
587                None => already_stored.push(address),
588            }
589            quoted_count += 1;
590            let progress_num = quoted_offset + quoted_count;
591            if file_total > 0 {
592                if chunk_already_stored {
593                    info!("Verified {progress_num}/{file_total} (already stored)");
594                } else {
595                    info!("Quoted {progress_num}/{file_total}");
596                }
597            }
598            if let Some(tx) = progress {
599                let _ = tx.try_send(UploadEvent::ChunkQuoted {
600                    quoted: progress_num,
601                    total: file_total,
602                });
603            }
604        }
605
606        Ok((prepared, already_stored))
607    }
608
609    /// Store a batch of paid chunks concurrently to their close groups.
610    ///
611    /// Retries failed chunks up to 3 times with exponential backoff (500ms, 1s, 2s).
612    /// Returns a [`WaveResult`] with both successes and failures so callers can
613    /// track partial progress instead of losing information about stored chunks.
614    ///
615    /// When `progress` is `Some`, sends [`UploadEvent::ChunkStored`] as each
616    /// chunk is successfully stored. `stored_before` is the count of chunks
617    /// already stored in previous waves so the event reports an accurate
618    /// cumulative total; `total_chunks` is the total across all waves. Pass
619    /// `None`/0/0 when progress reporting is not needed.
620    pub(crate) async fn store_paid_chunks_with_events(
621        &self,
622        paid_chunks: Vec<PaidChunk>,
623        progress: Option<&mpsc::Sender<UploadEvent>>,
624        stored_before: usize,
625        total_chunks: usize,
626    ) -> WaveResult {
627        const MAX_RETRIES: u32 = 3;
628        const BASE_DELAY_MS: u64 = 500;
629
630        let mut stored = Vec::new();
631        let mut to_retry = paid_chunks;
632
633        // Per-chunk first-seen timestamps, keyed by chunk address.
634        // Inserted on first sight; never overwritten so wall-clock spans
635        // first attempt → eventual success across all retry rounds.
636        let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
637        for chunk in &to_retry {
638            first_seen.entry(chunk.address).or_insert_with(Instant::now);
639        }
640
641        let mut chunk_attempts_total: usize = 0;
642        let mut store_durations_ms: Vec<u64> = Vec::new();
643        let mut retries_per_chunk: Vec<u32> = Vec::new();
644
645        for attempt in 0..=MAX_RETRIES {
646            if attempt > 0 {
647                let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
648                tokio::time::sleep(delay).await;
649                info!(
650                    "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
651                    to_retry.len()
652                );
653            }
654
655            // Each chunk in this round counts as one store-RPC attempt.
656            chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());
657
658            let store_limiter = self.controller().store.clone();
659            let store_concurrency = store_limiter.current().min(to_retry.len().max(1));
660            let mut upload_stream = stream::iter(to_retry)
661                .map(|chunk| {
662                    let chunk_clone = chunk.clone();
663                    let limiter = store_limiter.clone();
664                    async move {
665                        let result = observe_op(
666                            &limiter,
667                            || async move {
668                                self.chunk_put_to_close_group(
669                                    chunk.content,
670                                    chunk.proof_bytes,
671                                    &chunk.quoted_peers,
672                                )
673                                .await
674                            },
675                            classify_error,
676                        )
677                        .await;
678                        (chunk_clone, result)
679                    }
680                })
681                .buffer_unordered(store_concurrency);
682
683            let mut failed_this_round = Vec::new();
684            while let Some((chunk, result)) = upload_stream.next().await {
685                match result {
686                    Ok(name) => {
687                        let duration_ms = first_seen
688                            .get(&chunk.address)
689                            .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
690                            .unwrap_or(0);
691                        store_durations_ms.push(duration_ms);
692                        retries_per_chunk.push(attempt);
693                        stored.push(name);
694                        let stored_num = stored_before + stored.len();
695                        if total_chunks > 0 {
696                            info!("Stored {stored_num}/{total_chunks}");
697                        }
698                        if let Some(tx) = progress {
699                            let _ = tx.try_send(UploadEvent::ChunkStored {
700                                stored: stored_num,
701                                total: total_chunks,
702                            });
703                        }
704                    }
705                    Err(e) => failed_this_round.push((chunk, e.to_string())),
706                }
707            }
708
709            if failed_this_round.is_empty() {
710                let result = WaveResult {
711                    stored,
712                    failed: Vec::new(),
713                    chunk_attempts_total,
714                    store_durations_ms,
715                    retries_per_chunk,
716                };
717                log_wave_summary(&result);
718                return result;
719            }
720
721            if attempt == MAX_RETRIES {
722                let failed = failed_this_round
723                    .into_iter()
724                    .map(|(c, e)| (c.address, e))
725                    .collect();
726                let result = WaveResult {
727                    stored,
728                    failed,
729                    chunk_attempts_total,
730                    store_durations_ms,
731                    retries_per_chunk,
732                };
733                log_wave_summary(&result);
734                return result;
735            }
736
737            warn!(
738                "{} chunks failed on attempt {}, will retry",
739                failed_this_round.len(),
740                attempt + 1
741            );
742            to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
743        }
744
745        // Unreachable due to loop structure, but satisfy the compiler.
746        let result = WaveResult {
747            stored,
748            failed: Vec::new(),
749            chunk_attempts_total,
750            store_durations_ms,
751            retries_per_chunk,
752        };
753        log_wave_summary(&result);
754        result
755    }
756}
757
758/// Emit one structured info line summarising a wave's store-side stats.
759///
760/// Surfaces p50/p95/max chunk wall-clock and per-round retry counts so
761/// log-based analysis tooling (Elasticsearch / Kibana) can identify
762/// client-side quorum or retry cost without needing the `--json` output.
763fn log_wave_summary(result: &WaveResult) {
764    let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
765    let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
766    let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
767    let chunk_attempts_total = result.chunk_attempts_total;
768    info!(
769        chunks_stored = result.stored.len(),
770        chunks_failed = result.failed.len(),
771        chunk_attempts_total,
772        retries_round_1,
773        retries_round_2,
774        retries_round_3,
775        store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
776        store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
777        store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
778        "chunk_store_wave_complete"
779    );
780}
781
782/// Compile-time assertions that batch method futures are Send.
783#[cfg(test)]
784mod send_assertions {
785    use super::*;
786
787    fn _assert_send<T: Send>(_: &T) {}
788
789    #[allow(dead_code)]
790    async fn _batch_upload_is_send(client: &Client) {
791        let fut = client.batch_upload_chunks(Vec::new());
792        _assert_send(&fut);
793    }
794}
795
796#[cfg(test)]
797#[allow(clippy::unwrap_used)]
798mod tests {
799    use super::*;
800    use ant_protocol::payment::QuotePaymentInfo;
801    use ant_protocol::CLOSE_GROUP_SIZE;
802
803    /// Median index in the quotes array.
804    const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
805
806    /// Helper: build a `PreparedChunk` with `median_amount` at the median
807    /// quote index and zero for all other quotes. Adapts automatically to
808    /// `CLOSE_GROUP_SIZE` changes.
809    fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
810        let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
811            let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
812            QuotePaymentInfo {
813                quote_hash: QuoteHash::from([i as u8 + 1; 32]),
814                rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
815                amount: Amount::from(amount),
816                price: Amount::from(amount),
817            }
818        });
819
820        PreparedChunk {
821            content: Bytes::from(vec![0xAA; 32]),
822            address: [0u8; 32],
823            quoted_peers: Vec::new(),
824            payment: SingleNodePayment { quotes },
825            peer_quotes: Vec::new(),
826        }
827    }
828
829    #[test]
830    fn payment_intent_from_single_chunk() {
831        let chunk = make_prepared_chunk(300);
832        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
833
834        assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
835        assert_eq!(intent.total_amount, Amount::from(300));
836
837        let (hash, addr, amt) = &intent.payments[0];
838        assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
839        assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
840        assert_eq!(*amt, Amount::from(300));
841    }
842
843    #[test]
844    fn payment_intent_from_multiple_chunks() {
845        let c1 = make_prepared_chunk(100);
846        let c2 = make_prepared_chunk(250);
847        let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
848
849        assert_eq!(intent.payments.len(), 2);
850        assert_eq!(intent.total_amount, Amount::from(350));
851    }
852
853    #[test]
854    fn payment_intent_skips_all_zero_chunks() {
855        let chunk = make_prepared_chunk(0);
856        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
857
858        assert!(intent.payments.is_empty());
859        assert_eq!(intent.total_amount, Amount::ZERO);
860    }
861
862    #[test]
863    fn payment_intent_empty_input() {
864        let intent = PaymentIntent::from_prepared_chunks(&[]);
865        assert!(intent.payments.is_empty());
866        assert_eq!(intent.total_amount, Amount::ZERO);
867    }
868
869    #[test]
870    fn finalize_batch_payment_builds_proofs() {
871        let chunk = make_prepared_chunk(500);
872        let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
873
874        let mut tx_map = HashMap::new();
875        tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
876
877        let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
878
879        assert_eq!(paid.len(), 1);
880        assert!(!paid[0].proof_bytes.is_empty());
881        assert_eq!(paid[0].address, [0u8; 32]);
882    }
883
884    #[test]
885    fn finalize_batch_payment_empty_input() {
886        let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
887        assert!(paid.is_empty());
888    }
889
890    #[test]
891    fn finalize_batch_payment_missing_tx_hash_errors() {
892        // Missing tx hash for a non-zero-amount quote should error,
893        // since the chunk would be rejected by the network without a valid proof.
894        let chunk = make_prepared_chunk(500);
895
896        let result = finalize_batch_payment(vec![chunk], &HashMap::new());
897        assert!(result.is_err());
898        let err = result.unwrap_err().to_string();
899        assert!(err.contains("Missing tx hash"), "got: {err}");
900    }
901
902    #[test]
903    fn finalize_batch_payment_multiple_chunks() {
904        let c1 = make_prepared_chunk(100);
905        let c2 = make_prepared_chunk(200);
906        let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
907        let mut tx_map = HashMap::new();
908        // Both chunks have the same quote_hash (same index/byte pattern)
909        // so one tx_hash covers both
910        tx_map.insert(q1, TxHash::from([0xCC; 32]));
911
912        let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
913        assert_eq!(paid.len(), 2);
914    }
915}