Skip to main content

ant_core/data/client/
batch.rs

1//! Batch chunk upload with wave-based pipelined EVM payments.
2//!
3//! Groups chunks into waves of 64 and pays for each
4//! wave in a single EVM transaction. Stores from wave N are pipelined
5//! with quote collection for wave N+1 via `tokio::join!`.
6
7use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, PartialUploadSpend, Result};
13use ant_protocol::evm::{
14    Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15    RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{
18    deserialize_proof, serialize_single_node_proof, PaymentProof, SingleNodePayment,
19};
20use ant_protocol::transport::{MultiAddr, PeerId};
21use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use std::collections::{HashMap, HashSet};
25use std::time::{Duration, Instant};
26use tokio::sync::mpsc;
27use tracing::{debug, info, warn};
28
29/// Number of chunks per payment wave.
30const PAYMENT_WAVE_SIZE: usize = 64;
31
32/// Soft ceiling on the combined body size of chunks stored concurrently in a
33/// single wave. Caps store concurrency for large chunks so the send path's
34/// per-peer body buffers can't pin multiple GB at once (see V2-461). At ~4 MB
35/// chunks this permits ~16 concurrent stores; small chunks hit the chunk-count
36/// / adaptive limits instead and are unaffected.
37const STORE_INFLIGHT_BYTE_BUDGET: usize = 64 * 1024 * 1024;
38
39/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`].
40#[derive(Debug)]
41pub struct PreparedChunk {
42    /// The chunk content bytes.
43    pub content: Bytes,
44    /// Content address (BLAKE3 hash).
45    pub address: XorName,
46    /// Ordered PUT targets from quote planning.
47    ///
48    /// Kept under the legacy `quoted_peers` name for API compatibility; the
49    /// list can include non-quoted fallback peers beyond the quoted close
50    /// group.
51    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
52    /// Payment structure (quotes sorted, median selected, not yet paid on-chain).
53    pub payment: SingleNodePayment,
54    /// Peer quotes for building `ProofOfPayment`.
55    pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
56}
57
58/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`].
59#[derive(Debug, Clone)]
60pub struct PaidChunk {
61    /// The chunk content bytes.
62    pub content: Bytes,
63    /// Content address (BLAKE3 hash).
64    pub address: XorName,
65    /// Ordered PUT targets from quote planning.
66    ///
67    /// Kept under the legacy `quoted_peers` name for API compatibility; the
68    /// list can include non-quoted fallback peers beyond the quoted close
69    /// group.
70    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
71    /// Serialized [`PaymentProof`] bytes.
72    pub proof_bytes: Vec<u8>,
73}
74
75/// Result of storing a wave of paid chunks, with retry tracking.
76#[derive(Debug)]
77pub struct WaveResult {
78    /// Successfully stored chunk addresses.
79    pub stored: Vec<XorName>,
80    /// Chunks that failed to store after all retries.
81    pub failed: Vec<(XorName, String)>,
82    /// Sum of store-RPC attempts across all chunks in this wave (>= stored.len() + failed.len()).
83    pub chunk_attempts_total: usize,
84    /// Per-chunk wall-clock (ms) from first attempt to successful store. Only populated for stored chunks.
85    pub store_durations_ms: Vec<u64>,
86    /// Histogram of which retry-round each stored chunk succeeded on (index 0 = first attempt).
87    pub retries_per_chunk: Vec<u32>,
88}
89
90/// Aggregated retry / wall-clock stats across one or more [`WaveResult`]s.
91///
92/// Used by [`Client::batch_upload_chunks_with_events`] (which may store
93/// multiple waves per call) and surfaced upward into `FileUploadResult` so
94/// downstream tooling can record per-upload retry pressure and per-chunk
95/// store wall-clock without needing log parsing.
96#[derive(Debug, Default, Clone)]
97pub struct WaveAggregateStats {
98    /// Sum of store-RPC attempts across all waves (>= chunks_stored).
99    pub chunk_attempts_total: usize,
100    /// Per-chunk wall-clock (ms) from first attempt to successful store,
101    /// concatenated across waves.
102    pub store_durations_ms: Vec<u64>,
103    /// Count of stored chunks that succeeded on each retry round
104    /// (index 0 = first attempt, 1 = first retry, etc.). Indices match
105    /// the retry rounds emitted by `Client::store_paid_chunks_with_events`
106    /// which caps at `MAX_RETRIES = 3`, so an array of 4 suffices.
107    pub retries_histogram: [usize; 4],
108}
109
110impl WaveAggregateStats {
111    /// Fold one [`WaveResult`]'s stats into the running aggregate.
112    pub fn absorb(&mut self, wave: &WaveResult) {
113        self.chunk_attempts_total = self
114            .chunk_attempts_total
115            .saturating_add(wave.chunk_attempts_total);
116        self.store_durations_ms.extend(&wave.store_durations_ms);
117        for &r in &wave.retries_per_chunk {
118            let idx = (r as usize).min(self.retries_histogram.len() - 1);
119            self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
120        }
121    }
122}
123
124/// Compute a percentile from an unsorted slice of `u64` values.
125///
126/// `p` is in `[0.0, 1.0]`. Returns 0 for an empty slice. Uses nearest-rank;
127/// callers don't need numerical precision here — these are coarse log/metric
128/// summaries.
129fn percentile(values: &[u64], p: f64) -> u64 {
130    if values.is_empty() {
131        return 0;
132    }
133    let mut sorted = values.to_vec();
134    sorted.sort_unstable();
135    let p = p.clamp(0.0, 1.0);
136    // Nearest-rank: ceil(p * n) - 1, clamped to [0, n-1].
137    let n = sorted.len();
138    #[allow(
139        clippy::cast_possible_truncation,
140        clippy::cast_sign_loss,
141        clippy::cast_precision_loss
142    )]
143    let rank = ((p * n as f64).ceil() as usize)
144        .saturating_sub(1)
145        .min(n - 1);
146    sorted[rank]
147}
148
149/// Payment data for external signing.
150///
151/// Contains the information needed to construct and submit the on-chain
152/// payment transaction without requiring a local wallet or private key.
153#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
154pub struct PaymentIntent {
155    /// Individual payment entries: (quote_hash, rewards_address, amount).
156    pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
157    /// Total amount across all payments.
158    pub total_amount: Amount,
159}
160
161impl PaymentIntent {
162    /// Build from a set of prepared chunks.
163    ///
164    /// Collects all non-zero payment entries and computes the total.
165    pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
166        let mut payments = Vec::new();
167        let mut total = Amount::ZERO;
168        for chunk in prepared {
169            for info in &chunk.payment.quotes {
170                if !info.amount.is_zero() {
171                    payments.push((info.quote_hash, info.rewards_address, info.amount));
172                    total += info.amount;
173                }
174            }
175        }
176        Self {
177            payments,
178            total_amount: total,
179        }
180    }
181}
182
183/// Build [`PaidChunk`]s from prepared chunks and externally-provided transaction hashes.
184///
185/// Shared by [`Client::batch_pay`] (wallet flow) and [`finalize_batch_payment`] (external signer).
186///
187/// Returns an error if any non-zero-amount quote hash is missing from `tx_hash_map`,
188/// since chunks uploaded without valid proofs would be rejected by the network.
189fn build_paid_chunks(
190    prepared: Vec<PreparedChunk>,
191    tx_hash_map: &HashMap<QuoteHash, TxHash>,
192) -> Result<Vec<PaidChunk>> {
193    let mut paid_chunks = Vec::with_capacity(prepared.len());
194    for chunk in prepared {
195        let mut tx_hashes = Vec::new();
196        for info in &chunk.payment.quotes {
197            if !info.amount.is_zero() {
198                let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
199                    Error::Payment(format!(
200                        "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
201                        hex::encode(info.quote_hash)
202                    ))
203                })?;
204                tx_hashes.push(tx_hash);
205            }
206        }
207
208        let proof = PaymentProof {
209            proof_of_payment: ProofOfPayment {
210                peer_quotes: chunk.peer_quotes,
211            },
212            tx_hashes,
213        };
214
215        let proof_bytes = serialize_single_node_proof(&proof)
216            .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
217
218        paid_chunks.push(PaidChunk {
219            content: chunk.content,
220            address: chunk.address,
221            quoted_peers: chunk.quoted_peers,
222            proof_bytes,
223        });
224    }
225    Ok(paid_chunks)
226}
227
228/// Finalize a batch payment using externally-provided transaction hashes.
229///
230/// Takes prepared chunks and a map of `quote_hash -> tx_hash` from the
231/// external signer. Builds per-chunk `PaymentProof` bytes without needing a wallet.
232pub fn finalize_batch_payment(
233    prepared: Vec<PreparedChunk>,
234    tx_hash_map: &HashMap<QuoteHash, TxHash>,
235) -> Result<Vec<PaidChunk>> {
236    build_paid_chunks(prepared, tx_hash_map)
237}
238
239impl Client {
240    /// Prepare a single chunk for batch payment.
241    ///
242    /// Collects quotes and uses node-reported prices without making any
243    /// on-chain transaction. Returns `Ok(None)` if the chunk is already
244    /// stored on the network.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if quote collection or payment construction fails.
249    pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
250        let address = compute_address(&content);
251        let data_size = u64::try_from(content.len())
252            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
253
254        let quote_plan = match self
255            .get_store_quote_plan(&address, data_size, DATA_TYPE_CHUNK)
256            .await
257        {
258            Ok(plan) => plan,
259            Err(Error::AlreadyStored) => {
260                debug!("Chunk {} already stored, skipping", hex::encode(address));
261                return Ok(None);
262            }
263            Err(e) => return Err(e),
264        };
265        let quotes_with_peers = quote_plan.quotes;
266
267        // Capture the ordered PUT target set for replication. This can be
268        // wider than the peers that supplied the paid quotes.
269        let quoted_peers = quote_plan.put_peers;
270
271        // Build peer_quotes for ProofOfPayment + quotes for SingleNodePayment.
272        // Use node-reported prices directly — no contract price fetch needed.
273        let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
274        let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
275
276        for (peer_id, _addrs, quote, price) in quotes_with_peers {
277            let encoded = peer_id_to_encoded(&peer_id)?;
278            peer_quotes.push((encoded, quote.clone()));
279            quotes_for_payment.push((quote, price));
280        }
281
282        let payment = SingleNodePayment::from_quotes(quotes_for_payment)
283            .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
284
285        Ok(Some(PreparedChunk {
286            content,
287            address,
288            quoted_peers,
289            payment,
290            peer_quotes,
291        }))
292    }
293
294    /// Pay for multiple chunks in a single EVM transaction.
295    ///
296    /// Flattens all quote payments from the prepared chunks into one
297    /// `wallet.pay_for_quotes()` call, then maps transaction hashes
298    /// back to per-chunk [`PaymentProof`] bytes.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the wallet is not configured or the on-chain
303    /// payment fails.
304    /// Returns `(paid_chunks, storage_cost_atto, gas_cost_wei)`.
305    pub async fn batch_pay(
306        &self,
307        prepared: Vec<PreparedChunk>,
308    ) -> Result<(Vec<PaidChunk>, String, u128)> {
309        if prepared.is_empty() {
310            return Ok((Vec::new(), "0".to_string(), 0));
311        }
312
313        let wallet = self.require_wallet()?;
314
315        // Compute total storage cost from the prepared chunks before paying.
316        let intent = PaymentIntent::from_prepared_chunks(&prepared);
317        let storage_cost_atto = intent.total_amount.to_string();
318
319        // Flatten all quote payments from all chunks into a single batch.
320        let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
321        let mut all_payments = Vec::with_capacity(total_quotes);
322        for chunk in &prepared {
323            for info in &chunk.payment.quotes {
324                all_payments.push((info.quote_hash, info.rewards_address, info.amount));
325            }
326        }
327
328        debug!(
329            "Batch payment for {} chunks ({} quote entries)",
330            prepared.len(),
331            all_payments.len()
332        );
333
334        let (tx_hash_map, gas_info) =
335            wallet
336                .pay_for_quotes(all_payments)
337                .await
338                .map_err(|PayForQuotesError(err, _)| {
339                    Error::Payment(format!("Batch payment failed: {err}"))
340                })?;
341
342        info!(
343            "Batch payment succeeded: {} transactions",
344            tx_hash_map.len()
345        );
346
347        let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
348        let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
349        Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
350    }
351
352    /// Upload chunks in waves with pipelined EVM payments.
353    ///
354    /// Processes chunks in waves of `PAYMENT_WAVE_SIZE` (64). Within each wave:
355    /// 1. **Prepare**: collect quotes for all chunks concurrently
356    /// 2. **Pay**: single EVM transaction for the whole wave
357    /// 3. **Store**: concurrent chunk replication to close group
358    ///
359    /// Stores from wave N overlap with quote collection for wave N+1
360    /// via `tokio::join!`.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if any payment or store operation fails.
365    /// Returns `(addresses, total_storage_cost_atto, total_gas_cost_wei)`.
366    pub async fn batch_upload_chunks(
367        &self,
368        chunks: Vec<Bytes>,
369    ) -> Result<(Vec<XorName>, String, u128)> {
370        let (addresses, storage, gas, _stats) = self
371            .batch_upload_chunks_with_events(chunks, None, 0, 0, None)
372            .await?;
373        Ok((addresses, storage, gas))
374    }
375
376    /// Same as [`Client::batch_upload_chunks`] but sends [`UploadEvent::ChunkStored`]
377    /// events as each chunk is stored, enabling per-chunk progress bars.
378    ///
379    /// `stored_offset` is the number of chunks already stored in previous waves
380    /// (so events report cumulative progress). `file_total` is the total chunk
381    /// count across ALL waves (for the `total` field in events).
382    ///
383    /// When `resume_key` is `Some`, per-wave payment proofs are persisted
384    /// to `<data_dir>/payments/single/<ts>_<hash(resume_key)>` via
385    /// `crate::data::client::cached_single` so that a partial-upload
386    /// failure can be resumed on the next attempt without paying twice.
387    /// The caller is responsible for deleting the cache entry on full
388    /// success (typically `upload_with_options` in `file.rs`).
389    pub async fn batch_upload_chunks_with_events(
390        &self,
391        chunks: Vec<Bytes>,
392        progress: Option<&mpsc::Sender<UploadEvent>>,
393        stored_offset: usize,
394        file_total: usize,
395        resume_key: Option<&str>,
396    ) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
397        if chunks.is_empty() {
398            return Ok((
399                Vec::new(),
400                "0".to_string(),
401                0,
402                WaveAggregateStats::default(),
403            ));
404        }
405
406        let total_chunks = chunks.len();
407        let quote_cap = self.controller().quote.current();
408        let store_cap = self.controller().store.current();
409        debug!(
410            "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
411             (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
412        );
413
414        // Load any previously-cached single-node receipt for this
415        // upload. Each chunk whose address is in the cache will skip
416        // the quote + pay phases and have its `PaidChunk` constructed
417        // directly from the cached proof + fresh quoted peers. The
418        // caller is responsible for deleting the cache on full
419        // success; we only read here, never write the load result back.
420        //
421        // Before trusting any cached proof, decode it locally and drop
422        // any whose quote.timestamp is past the storer's per-quote age
423        // budget (`QUOTE_MAX_AGE_SECS`, mirrored here as
424        // `CACHED_PROOF_EXPIRY_SECS`). The previous design trusted a
425        // substring match on remote error text, which a Byzantine
426        // storer could spoof to force double-payment. Local pre-flight
427        // is decision-pure: we never hand a doomed proof to a storer,
428        // and the cache is updated under our own lock with no remote
429        // text involved.
430        // Load only the cached PROOFS (for reuse). The cost this function
431        // returns is a per-call DELTA — what was freshly paid in THIS call —
432        // not the cache's cumulative. The single-node wave driver
433        // (`upload_spill_addresses_single`) calls this once per wave and SUMS
434        // the per-call costs, so seeding the return with the cumulative cache
435        // (which grows as each wave appends to it) double-counts:
436        // A + (A+B) + (A+B+C) instead of A+B+C.
437        let cached_proofs: HashMap<XorName, Vec<u8>> = match resume_key {
438            Some(key) => match crate::data::client::cached_single::try_load_for_file(key) {
439                Some((_, receipt)) => prune_locally_expired_proofs(key, receipt.proofs),
440                None => HashMap::new(),
441            },
442            None => HashMap::new(),
443        };
444
445        let mut all_addresses = Vec::with_capacity(total_chunks);
446        let mut seen_addresses: HashSet<XorName> = HashSet::new();
447
448        // Accumulate only THIS call's freshly-paid cost (per-call delta; see
449        // the proof-load comment above for why this must not include the cache).
450        let mut total_storage = Amount::ZERO;
451        let mut total_gas: u128 = 0;
452        let mut agg_stats = WaveAggregateStats::default();
453
454        // Deduplicate chunks by content address.
455        let mut unique_chunks = Vec::with_capacity(total_chunks);
456        for chunk in chunks {
457            let address = compute_address(&chunk);
458            if seen_addresses.insert(address) {
459                unique_chunks.push(chunk);
460            } else {
461                debug!("Skipping duplicate chunk {}", hex::encode(address));
462                all_addresses.push(address);
463                if let Some(tx) = progress {
464                    let _ = tx.try_send(UploadEvent::ChunkStored {
465                        stored: stored_offset + all_addresses.len(),
466                        total: file_total,
467                    });
468                }
469            }
470        }
471
472        // Split into waves.
473        let waves: Vec<Vec<Bytes>> = unique_chunks
474            .chunks(PAYMENT_WAVE_SIZE)
475            .map(<[Bytes]>::to_vec)
476            .collect();
477        let wave_count = waves.len();
478
479        debug!(
480            "{total_chunks} chunks -> {} unique -> {wave_count} waves",
481            seen_addresses.len()
482        );
483
484        let mut pending_store: Option<Vec<PaidChunk>> = None;
485        let mut total_quoted: usize = 0;
486
487        for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
488            let wave_num = wave_idx + 1;
489            let wave_size = wave_chunks.len();
490
491            // Pipeline: store previous wave while preparing this one.
492            let (prepare_result, store_result) = match pending_store.take() {
493                Some(paid_chunks) => {
494                    let store_offset = stored_offset + all_addresses.len();
495                    let quoted_offset = stored_offset + total_quoted;
496                    let (prep, stored) = tokio::join!(
497                        self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
498                        self.store_paid_chunks_with_events(
499                            paid_chunks,
500                            progress,
501                            store_offset,
502                            file_total
503                        )
504                    );
505                    (prep, Some(stored))
506                }
507                None => {
508                    let quoted_offset = stored_offset + total_quoted;
509                    let result = self
510                        .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
511                        .await;
512                    (result, None)
513                }
514            };
515            total_quoted += wave_size;
516
517            // Track partial progress from previous wave.
518            if let Some(wave_result) = store_result {
519                all_addresses.extend(&wave_result.stored);
520                agg_stats.absorb(&wave_result);
521                if !wave_result.failed.is_empty() {
522                    let failed_count = wave_result.failed.len();
523                    warn!("{failed_count} chunks failed to store after retries");
524                    return Err(Error::PartialUpload {
525                        stored: all_addresses.clone(),
526                        stored_count: stored_offset + all_addresses.len(),
527                        failed: wave_result.failed,
528                        failed_count,
529                        total_chunks: file_total,
530                        spend: Box::new(PartialUploadSpend {
531                            storage_cost_atto: total_storage.to_string(),
532                            gas_cost_wei: total_gas,
533                        }),
534                        reason: "wave store failed after retries".into(),
535                    });
536                }
537            }
538
539            let (prepared_chunks, already_stored) = prepare_result?;
540            all_addresses.extend(&already_stored);
541            if let Some(tx) = progress {
542                for _ in &already_stored {
543                    let _ = tx.try_send(UploadEvent::ChunkStored {
544                        stored: stored_offset + all_addresses.len(),
545                        total: file_total,
546                    });
547                }
548            }
549
550            if prepared_chunks.is_empty() {
551                info!("Wave {wave_num}/{wave_count}: all chunks already stored");
552                continue;
553            }
554
555            // Split prepared chunks into "already paid in a previous
556            // attempt" (cached) and "needs payment" (fresh). Cached
557            // chunks build a `PaidChunk` from the cached proof + the
558            // freshly-quoted peers, bypassing the EVM transaction.
559            let mut needs_pay: Vec<PreparedChunk> = Vec::with_capacity(prepared_chunks.len());
560            let mut cached_paid: Vec<PaidChunk> = Vec::new();
561            for prep in prepared_chunks {
562                if let Some(proof_bytes) = cached_proofs.get(&prep.address).cloned() {
563                    cached_paid.push(PaidChunk {
564                        content: prep.content,
565                        address: prep.address,
566                        quoted_peers: prep.quoted_peers,
567                        proof_bytes,
568                    });
569                } else {
570                    needs_pay.push(prep);
571                }
572            }
573            if !cached_paid.is_empty() {
574                info!(
575                    "Wave {wave_num}/{wave_count}: reusing {} cached payment proofs",
576                    cached_paid.len()
577                );
578            }
579
580            let (mut paid_chunks, wave_storage, wave_gas) = if needs_pay.is_empty() {
581                (Vec::new(), "0".to_string(), 0u128)
582            } else {
583                info!(
584                    "Wave {wave_num}/{wave_count}: paying for {} chunks",
585                    needs_pay.len()
586                );
587                self.batch_pay(needs_pay).await?
588            };
589            if let Ok(cost) = wave_storage.parse::<Amount>() {
590                total_storage += cost;
591            }
592            total_gas = total_gas.saturating_add(wave_gas);
593
594            // Persist the freshly-paid wave's proofs so a later
595            // failure can resume without re-paying.
596            if let Some(key) = resume_key {
597                if !paid_chunks.is_empty() {
598                    let new_proofs: HashMap<[u8; 32], Vec<u8>> = paid_chunks
599                        .iter()
600                        .map(|pc| (pc.address, pc.proof_bytes.clone()))
601                        .collect();
602                    crate::data::client::cached_single::try_append_wave(
603                        key,
604                        new_proofs,
605                        &wave_storage,
606                        wave_gas,
607                    );
608                }
609            }
610
611            paid_chunks.extend(cached_paid);
612            pending_store = Some(paid_chunks);
613        }
614
615        // Store the last wave.
616        if let Some(paid_chunks) = pending_store {
617            let store_offset = stored_offset + all_addresses.len();
618            let wave_result = self
619                .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
620                .await;
621            all_addresses.extend(&wave_result.stored);
622            agg_stats.absorb(&wave_result);
623            if !wave_result.failed.is_empty() {
624                let failed_count = wave_result.failed.len();
625                warn!("{failed_count} chunks failed to store after retries (final wave)");
626                return Err(Error::PartialUpload {
627                    stored: all_addresses.clone(),
628                    stored_count: stored_offset + all_addresses.len(),
629                    failed: wave_result.failed,
630                    failed_count,
631                    total_chunks: file_total,
632                    spend: Box::new(PartialUploadSpend {
633                        storage_cost_atto: total_storage.to_string(),
634                        gas_cost_wei: total_gas,
635                    }),
636                    reason: "final wave store failed after retries".into(),
637                });
638            }
639        }
640
641        debug!("Batch upload complete: {} addresses", all_addresses.len());
642        Ok((
643            all_addresses,
644            total_storage.to_string(),
645            total_gas,
646            agg_stats,
647        ))
648    }
649
650    /// Prepare a wave of chunks by collecting quotes concurrently.
651    ///
652    /// Fires [`UploadEvent::ChunkQuoted`] as each chunk's quote completes.
653    /// Returns `(prepared_chunks, already_stored_addresses)`.
654    async fn prepare_wave(
655        &self,
656        chunks: Vec<Bytes>,
657        progress: Option<&mpsc::Sender<UploadEvent>>,
658        quoted_offset: usize,
659        file_total: usize,
660    ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
661        let chunk_count = chunks.len();
662        let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
663            .into_iter()
664            .map(|c| {
665                let addr = compute_address(&c);
666                (c, addr)
667            })
668            .collect();
669
670        let quote_limiter = self.controller().quote.clone();
671        // Batch-aware fan-out: clamp to chunk_count so we never
672        // pay for fan-out slots we cannot fill on a partial wave.
673        // See PERF-RESULTS.md — measured ~30% slowdown when
674        // cap > batch size on quoting workloads (live mainnet).
675        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
676        let mut quote_stream = stream::iter(chunks_with_addr)
677            .map(|(content, address)| {
678                let limiter = quote_limiter.clone();
679                async move {
680                    let result = observe_op(
681                        &limiter,
682                        || async move { self.prepare_chunk_payment(content).await },
683                        classify_error,
684                    )
685                    .await;
686                    (address, result)
687                }
688            })
689            .buffer_unordered(quote_concurrency);
690
691        let mut prepared = Vec::with_capacity(chunk_count);
692        let mut already_stored = Vec::new();
693        let mut quoted_count = 0usize;
694
695        while let Some((address, result)) = quote_stream.next().await {
696            let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
697            match result? {
698                Some(chunk) => prepared.push(chunk),
699                None => already_stored.push(address),
700            }
701            quoted_count += 1;
702            let progress_num = quoted_offset + quoted_count;
703            if file_total > 0 {
704                if chunk_already_stored {
705                    info!("Verified {progress_num}/{file_total} (already stored)");
706                } else {
707                    info!("Quoted {progress_num}/{file_total}");
708                }
709            }
710            if let Some(tx) = progress {
711                let _ = tx.try_send(UploadEvent::ChunkQuoted {
712                    quoted: progress_num,
713                    total: file_total,
714                });
715            }
716        }
717
718        Ok((prepared, already_stored))
719    }
720
721    /// Store a batch of paid chunks concurrently to their close groups.
722    ///
723    /// Retries failed chunks up to 3 times with exponential backoff (500ms, 1s, 2s).
724    /// Returns a [`WaveResult`] with both successes and failures so callers can
725    /// track partial progress instead of losing information about stored chunks.
726    ///
727    /// When `progress` is `Some`, sends [`UploadEvent::ChunkStored`] as each
728    /// chunk is successfully stored. `stored_before` is the count of chunks
729    /// already stored in previous waves so the event reports an accurate
730    /// cumulative total; `total_chunks` is the total across all waves. Pass
731    /// `None`/0/0 when progress reporting is not needed.
732    pub(crate) async fn store_paid_chunks_with_events(
733        &self,
734        paid_chunks: Vec<PaidChunk>,
735        progress: Option<&mpsc::Sender<UploadEvent>>,
736        stored_before: usize,
737        total_chunks: usize,
738    ) -> WaveResult {
739        const MAX_RETRIES: u32 = 3;
740        const BASE_DELAY_MS: u64 = 500;
741
742        let mut stored = Vec::new();
743        let mut to_retry = paid_chunks;
744
745        // Per-chunk first-seen timestamps, keyed by chunk address.
746        // Inserted on first sight; never overwritten so wall-clock spans
747        // first attempt → eventual success across all retry rounds.
748        let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
749        for chunk in &to_retry {
750            first_seen.entry(chunk.address).or_insert_with(Instant::now);
751        }
752
753        // Bound concurrency by IN-FLIGHT BYTES, not just chunk count. Each
754        // concurrently-stored chunk is held in memory while it is sent to its
755        // close group, and the send path re-serializes the body once per peer,
756        // so a wave of large (~4 MB) chunks at full store concurrency can pin
757        // multiple GB and OOM a small host. Cap how many chunks store at once
758        // so their combined body size stays under the budget; small chunks are
759        // unaffected (the byte bound exceeds the chunk-count bound). The budget
760        // is deliberately conservative for the current per-peer send
761        // amplification and can be raised once that is reduced upstream.
762        let max_chunk_bytes = to_retry.iter().map(|c| c.content.len()).max().unwrap_or(0);
763        // `checked_div` yields `None` only when `max_chunk_bytes == 0` (an
764        // empty/zero-length wave), in which case there is no byte limit.
765        let byte_bound = STORE_INFLIGHT_BYTE_BUDGET
766            .checked_div(max_chunk_bytes)
767            .map_or(usize::MAX, |n| n.max(1));
768
769        let mut chunk_attempts_total: usize = 0;
770        let mut store_durations_ms: Vec<u64> = Vec::new();
771        let mut retries_per_chunk: Vec<u32> = Vec::new();
772
773        for attempt in 0..=MAX_RETRIES {
774            if attempt > 0 {
775                let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
776                tokio::time::sleep(delay).await;
777                info!(
778                    "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
779                    to_retry.len()
780                );
781            }
782
783            // Each chunk in this round counts as one store-RPC attempt.
784            chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());
785
786            let store_limiter = self.controller().store.clone();
787            // Rolling scheduler: keep up to `cap()` stores in flight and re-read
788            // the cap as each slot frees, so mid-flight limiter growth reaches
789            // the rest of this wave instead of being frozen at a per-wave
790            // snapshot (V2-554). The in-flight BYTE budget (`byte_bound`) stays
791            // enforced so a wave of large chunks can't OOM a small host;
792            // iterator exhaustion bounds launches to the wave, so no explicit
793            // clamp to `to_retry.len()` is needed.
794            let make_store = |chunk: PaidChunk| {
795                let chunk_clone = chunk.clone();
796                let limiter = store_limiter.clone();
797                async move {
798                    let result = observe_op(
799                        &limiter,
800                        || async move {
801                            self.chunk_put_to_close_group(
802                                chunk.content,
803                                chunk.proof_bytes,
804                                &chunk.quoted_peers,
805                            )
806                            .await
807                        },
808                        classify_error,
809                    )
810                    .await;
811                    (chunk_clone, result)
812                }
813            };
814            let mut chunk_iter = to_retry.into_iter();
815            let mut in_flight = FuturesUnordered::new();
816
817            let mut failed_this_round = Vec::new();
818            loop {
819                let slots = store_limiter.current().min(byte_bound).max(1);
820                while in_flight.len() < slots {
821                    match chunk_iter.next() {
822                        Some(chunk) => in_flight.push(make_store(chunk)),
823                        None => break,
824                    }
825                }
826                let Some((chunk, result)) = in_flight.next().await else {
827                    break;
828                };
829                match result {
830                    Ok(name) => {
831                        let duration_ms = first_seen
832                            .get(&chunk.address)
833                            .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
834                            .unwrap_or(0);
835                        store_durations_ms.push(duration_ms);
836                        retries_per_chunk.push(attempt);
837                        stored.push(name);
838                        let stored_num = stored_before + stored.len();
839                        if total_chunks > 0 {
840                            info!("Stored {stored_num}/{total_chunks}");
841                        }
842                        if let Some(tx) = progress {
843                            let _ = tx.try_send(UploadEvent::ChunkStored {
844                                stored: stored_num,
845                                total: total_chunks,
846                            });
847                        }
848                    }
849                    Err(e) => failed_this_round.push((chunk, e.to_string())),
850                }
851            }
852
853            if failed_this_round.is_empty() {
854                let result = WaveResult {
855                    stored,
856                    failed: Vec::new(),
857                    chunk_attempts_total,
858                    store_durations_ms,
859                    retries_per_chunk,
860                };
861                log_wave_summary(&result);
862                return result;
863            }
864
865            if attempt == MAX_RETRIES {
866                let failed = failed_this_round
867                    .into_iter()
868                    .map(|(c, e)| (c.address, e))
869                    .collect();
870                let result = WaveResult {
871                    stored,
872                    failed,
873                    chunk_attempts_total,
874                    store_durations_ms,
875                    retries_per_chunk,
876                };
877                log_wave_summary(&result);
878                return result;
879            }
880
881            warn!(
882                "{} chunks failed on attempt {}, will retry",
883                failed_this_round.len(),
884                attempt + 1
885            );
886            to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
887        }
888
889        // Unreachable due to loop structure, but satisfy the compiler.
890        let result = WaveResult {
891            stored,
892            failed: Vec::new(),
893            chunk_attempts_total,
894            store_durations_ms,
895            retries_per_chunk,
896        };
897        log_wave_summary(&result);
898        result
899    }
900}
901
902/// Emit one structured info line summarising a wave's store-side stats.
903///
904/// Surfaces p50/p95/max chunk wall-clock and per-round retry counts so
905/// log-based analysis tooling (Elasticsearch / Kibana) can identify
906/// client-side quorum or retry cost without needing the `--json` output.
907fn log_wave_summary(result: &WaveResult) {
908    let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
909    let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
910    let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
911    let chunk_attempts_total = result.chunk_attempts_total;
912    info!(
913        chunks_stored = result.stored.len(),
914        chunks_failed = result.failed.len(),
915        chunk_attempts_total,
916        retries_round_1,
917        retries_round_2,
918        retries_round_3,
919        store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
920        store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
921        store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
922        "chunk_store_wave_complete"
923    );
924}
925
926/// Safety margin subtracted from the storer's `QUOTE_MAX_AGE_SECS` (24 h)
927/// when deciding to trust a cached proof.
928///
929/// A proof whose oldest `quote.timestamp` is closer than this to the
930/// storer's hard limit is treated as already-expired locally. The
931/// margin covers (a) clock skew between client and storer, (b) the
932/// in-flight time between the local check and the storer's
933/// `validate_quote_timestamps` call, and (c) the time spent uploading
934/// the chunk body. 5 minutes is generous for all three combined and
935/// cheap: a wrongly-kept proof costs an extra retry round trip, a
936/// wrongly-dropped proof costs one re-pay (cheap chunk).
937const CACHED_PROOF_SAFETY_MARGIN_SECS: u64 = 300;
938
939/// Storer-side budget for a quote's age. Mirrors `QUOTE_MAX_AGE_SECS`
940/// in `ant-node/src/payment/verifier.rs`. If this value drifts on the
941/// node side, the worst case is the client either keeps proofs slightly
942/// past the storer limit (forced re-pay on next retry, no money lost)
943/// or drops them slightly early (one extra re-pay, no money lost).
944/// Either way, no payment is double-spent or stranded.
945const CACHED_PROOF_MAX_AGE_SECS: u64 = 24 * 60 * 60;
946
947/// How far a cached quote's `timestamp` may be in the future before we
948/// classify it as too-skewed-to-trust and prune.
949///
950/// Mirrors `QUOTE_FUTURE_SKEW_TOLERANCE_SECS = 300` in
951/// `ant-node/src/payment/verifier.rs`. If the client's clock runs
952/// slow relative to the storer that issued the quote, a perfectly
953/// valid proof can appear future-dated to the client — rejecting any
954/// forward drift would re-pay those chunks on every retry. Allow the
955/// same 5-minute window the storer does so the client and node agree
956/// on which proofs are fresh.
957const CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS: u64 = 300;
958
959/// Drop cached `proof_bytes` whose quote timestamps are too close to
960/// the storer's expiry window to safely reuse.
961///
962/// Why this exists
963/// ---------------
964/// The cache stores `(chunk_address, proof_bytes)` so a retried upload
965/// can skip re-paying. The proof bytes embed `quote.timestamp`s. Each
966/// storer evaluates each `quote.timestamp` independently against its
967/// 24 h `QUOTE_MAX_AGE_SECS` budget, so close to the 24 h boundary
968/// (or on a multi-day-old cache that survived past the receipt's outer
969/// expiry for some reason) the storer rejects what the client still
970/// believes is fresh.
971///
972/// The previous design trusted a substring match on the storer's
973/// returned error text to detect this and invalidate the cache after
974/// the fact. That allowed a Byzantine storer to spoof the marker and
975/// force the client to re-pay fresh proofs (double-payment). This
976/// implementation is decision-pure: we decode the proof locally and
977/// only re-use it if every embedded quote is comfortably within the
978/// budget. No remote text involved.
979///
980/// Side-effect: dropped entries are removed from the on-disk cache so
981/// they don't reappear on the next load.
982fn prune_locally_expired_proofs(
983    resume_key: &str,
984    proofs: HashMap<[u8; 32], Vec<u8>>,
985) -> HashMap<XorName, Vec<u8>> {
986    let now = std::time::SystemTime::now();
987    let max_safe_age = Duration::from_secs(
988        CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
989    );
990    let max_future_skew = Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS);
991    let mut kept: HashMap<XorName, Vec<u8>> = HashMap::with_capacity(proofs.len());
992    // Pair each expired address with the EXACT bytes we observed at
993    // load time. The cache-side drop only removes the entry if those
994    // bytes still match, so a concurrent re-pay that refreshed the
995    // proof under its own lock is not clobbered (CAS semantics, fixes
996    // the TOCTOU between unlocked-load and locked-drop).
997    let mut expired: Vec<([u8; 32], Vec<u8>)> = Vec::new();
998    for (addr, bytes) in proofs {
999        match deserialize_proof(&bytes) {
1000            Ok((proof, _tx_hashes)) => {
1001                if proof_is_safely_fresh(&proof, now, max_safe_age, max_future_skew) {
1002                    kept.insert(addr, bytes);
1003                } else {
1004                    expired.push((addr, bytes));
1005                }
1006            }
1007            Err(_) => {
1008                // Unreadable cached entry: drop it so it doesn't sit
1009                // here forever. The chunk will re-quote+re-pay.
1010                expired.push((addr, bytes));
1011            }
1012        }
1013    }
1014    if !expired.is_empty() {
1015        info!(
1016            "Pruning {} stale cached proofs (quote.timestamp past safe-reuse window) \
1017             before resume",
1018            expired.len()
1019        );
1020        crate::data::client::cached_single::try_drop_proofs_for_file(resume_key, &expired);
1021    }
1022    kept
1023}
1024
1025/// True iff every quote in the proof has a timestamp not older than
1026/// `now - max_safe_age` AND not further in the future than
1027/// `max_future_skew`. The forward-skew check mirrors the storer's
1028/// `QUOTE_FUTURE_SKEW_TOLERANCE_SECS` (300s) so a slow-running client
1029/// clock doesn't cause us to wrongly prune perfectly fresh proofs
1030/// that the storer would still accept.
1031fn proof_is_safely_fresh(
1032    proof: &ProofOfPayment,
1033    now: std::time::SystemTime,
1034    max_safe_age: Duration,
1035    max_future_skew: Duration,
1036) -> bool {
1037    for (_peer, quote) in &proof.peer_quotes {
1038        match now.duration_since(quote.timestamp) {
1039            Ok(age) => {
1040                if age > max_safe_age {
1041                    return false;
1042                }
1043            }
1044            Err(future) => {
1045                if future.duration() > max_future_skew {
1046                    return false;
1047                }
1048            }
1049        }
1050    }
1051    true
1052}
1053
1054/// Compile-time assertions that batch method futures are Send.
1055#[cfg(test)]
1056mod send_assertions {
1057    use super::*;
1058
1059    fn _assert_send<T: Send>(_: &T) {}
1060
1061    #[allow(dead_code)]
1062    async fn _batch_upload_is_send(client: &Client) {
1063        let fut = client.batch_upload_chunks(Vec::new());
1064        _assert_send(&fut);
1065    }
1066}
1067
1068#[cfg(test)]
1069#[allow(clippy::unwrap_used)]
1070mod tests {
1071    use super::*;
1072    use ant_protocol::payment::QuotePaymentInfo;
1073    use ant_protocol::CLOSE_GROUP_SIZE;
1074
1075    /// Median index in the quotes array.
1076    const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
1077
1078    /// Helper: build a `PreparedChunk` with `median_amount` at the median
1079    /// quote index and zero for all other quotes. Adapts automatically to
1080    /// `CLOSE_GROUP_SIZE` changes.
1081    fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
1082        let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
1083            let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
1084            QuotePaymentInfo {
1085                quote_hash: QuoteHash::from([i as u8 + 1; 32]),
1086                rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
1087                amount: Amount::from(amount),
1088                price: Amount::from(amount),
1089            }
1090        });
1091
1092        PreparedChunk {
1093            content: Bytes::from(vec![0xAA; 32]),
1094            address: [0u8; 32],
1095            quoted_peers: Vec::new(),
1096            payment: SingleNodePayment { quotes },
1097            peer_quotes: Vec::new(),
1098        }
1099    }
1100
1101    #[test]
1102    fn payment_intent_from_single_chunk() {
1103        let chunk = make_prepared_chunk(300);
1104        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1105
1106        assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
1107        assert_eq!(intent.total_amount, Amount::from(300));
1108
1109        let (hash, addr, amt) = &intent.payments[0];
1110        assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
1111        assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
1112        assert_eq!(*amt, Amount::from(300));
1113    }
1114
1115    #[test]
1116    fn payment_intent_from_multiple_chunks() {
1117        let c1 = make_prepared_chunk(100);
1118        let c2 = make_prepared_chunk(250);
1119        let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
1120
1121        assert_eq!(intent.payments.len(), 2);
1122        assert_eq!(intent.total_amount, Amount::from(350));
1123    }
1124
1125    #[test]
1126    fn payment_intent_skips_all_zero_chunks() {
1127        let chunk = make_prepared_chunk(0);
1128        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1129
1130        assert!(intent.payments.is_empty());
1131        assert_eq!(intent.total_amount, Amount::ZERO);
1132    }
1133
1134    #[test]
1135    fn payment_intent_empty_input() {
1136        let intent = PaymentIntent::from_prepared_chunks(&[]);
1137        assert!(intent.payments.is_empty());
1138        assert_eq!(intent.total_amount, Amount::ZERO);
1139    }
1140
1141    #[test]
1142    fn finalize_batch_payment_builds_proofs() {
1143        let chunk = make_prepared_chunk(500);
1144        let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
1145
1146        let mut tx_map = HashMap::new();
1147        tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
1148
1149        let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
1150
1151        assert_eq!(paid.len(), 1);
1152        assert!(!paid[0].proof_bytes.is_empty());
1153        assert_eq!(paid[0].address, [0u8; 32]);
1154    }
1155
1156    #[test]
1157    fn finalize_batch_payment_empty_input() {
1158        let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
1159        assert!(paid.is_empty());
1160    }
1161
1162    #[test]
1163    fn finalize_batch_payment_missing_tx_hash_errors() {
1164        // Missing tx hash for a non-zero-amount quote should error,
1165        // since the chunk would be rejected by the network without a valid proof.
1166        let chunk = make_prepared_chunk(500);
1167
1168        let result = finalize_batch_payment(vec![chunk], &HashMap::new());
1169        assert!(result.is_err());
1170        let err = result.unwrap_err().to_string();
1171        assert!(err.contains("Missing tx hash"), "got: {err}");
1172    }
1173
1174    #[test]
1175    fn finalize_batch_payment_multiple_chunks() {
1176        let c1 = make_prepared_chunk(100);
1177        let c2 = make_prepared_chunk(200);
1178        let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
1179        let mut tx_map = HashMap::new();
1180        // Both chunks have the same quote_hash (same index/byte pattern)
1181        // so one tx_hash covers both
1182        tx_map.insert(q1, TxHash::from([0xCC; 32]));
1183
1184        let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
1185        assert_eq!(paid.len(), 2);
1186    }
1187
1188    // ---- prune_locally_expired_proofs ----
1189    //
1190    // Build synthetic ProofOfPayment instances with controlled
1191    // timestamps to verify the local pre-flight stale-proof check.
1192    // This is the "no remote text trust" replacement for the prior
1193    // substring-matching invalidation path. A bug here is a direct
1194    // wallet leak (drop-too-eager = re-pay; keep-too-long = doomed
1195    // PUT round trip but no payment loss).
1196
1197    fn make_proof_with_timestamps(timestamps: &[std::time::SystemTime]) -> ProofOfPayment {
1198        let peer_quotes = timestamps
1199            .iter()
1200            .enumerate()
1201            .map(|(i, ts)| {
1202                let quote = PaymentQuote {
1203                    content: xor_name::XorName([0u8; 32]),
1204                    timestamp: *ts,
1205                    price: Amount::from(1u64),
1206                    rewards_address: RewardsAddress::new([1u8; 20]),
1207                    pub_key: vec![],
1208                    signature: vec![],
1209                };
1210                (EncodedPeerId::from([i as u8; 32]), quote)
1211            })
1212            .collect();
1213        ProofOfPayment { peer_quotes }
1214    }
1215
1216    fn default_max_future_skew() -> Duration {
1217        Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS)
1218    }
1219
1220    #[test]
1221    fn proof_is_safely_fresh_accepts_recent_quote() {
1222        let proof = make_proof_with_timestamps(&[std::time::SystemTime::now()]);
1223        assert!(proof_is_safely_fresh(
1224            &proof,
1225            std::time::SystemTime::now(),
1226            Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1227            default_max_future_skew(),
1228        ));
1229    }
1230
1231    #[test]
1232    fn proof_is_safely_fresh_rejects_quote_past_safe_window() {
1233        // 23h57m old: past the 24h - 5min safe-reuse threshold but
1234        // still within the storer's hard 24h limit. The whole point
1235        // of the safety margin is to drop these locally before
1236        // burning a doomed PUT round trip.
1237        let too_old = std::time::SystemTime::now() - Duration::from_secs(23 * 60 * 60 + 57 * 60);
1238        let proof = make_proof_with_timestamps(&[too_old]);
1239        let max_safe = Duration::from_secs(
1240            CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1241        );
1242        assert!(
1243            !proof_is_safely_fresh(
1244                &proof,
1245                std::time::SystemTime::now(),
1246                max_safe,
1247                default_max_future_skew(),
1248            ),
1249            "23h57m-old quote must fail safe-reuse check (limit is 24h - 5min margin)"
1250        );
1251    }
1252
1253    #[test]
1254    fn proof_is_safely_fresh_rejects_if_any_quote_is_stale() {
1255        // The storer rejects on a per-quote basis: a proof with even
1256        // one stale quote will fail on every retry. We must drop it.
1257        let now = std::time::SystemTime::now();
1258        let fresh = now;
1259        let stale = now - Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1260        let proof = make_proof_with_timestamps(&[fresh, fresh, stale, fresh]);
1261        let max_safe = Duration::from_secs(
1262            CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1263        );
1264        assert!(!proof_is_safely_fresh(
1265            &proof,
1266            now,
1267            max_safe,
1268            default_max_future_skew(),
1269        ));
1270    }
1271
1272    #[test]
1273    fn proof_is_safely_fresh_accepts_slight_future_skew_within_node_tolerance() {
1274        // Client clock 60s slow. Quote claims 60s in the future of
1275        // our local view. Node tolerates 300s forward skew, so the
1276        // storer would accept this quote — we must too, or we'd
1277        // wrongly prune fresh proofs and force re-payment.
1278        let now = std::time::SystemTime::now();
1279        let slight_future = now + Duration::from_secs(60);
1280        let proof = make_proof_with_timestamps(&[slight_future]);
1281        let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1282        assert!(
1283            proof_is_safely_fresh(&proof, now, max_safe, default_max_future_skew()),
1284            "60s-future quote must be accepted (within node's 300s skew tolerance)"
1285        );
1286    }
1287
1288    #[test]
1289    fn proof_is_safely_fresh_rejects_far_future_dated_quote() {
1290        // 1 hour in the future of our local clock. Exceeds the
1291        // node's 300s forward-skew tolerance and the storer would
1292        // reject it — we drop it locally to avoid a round trip.
1293        let now = std::time::SystemTime::now();
1294        let far_future = now + Duration::from_secs(3600);
1295        let proof = make_proof_with_timestamps(&[far_future]);
1296        let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1297        assert!(!proof_is_safely_fresh(
1298            &proof,
1299            now,
1300            max_safe,
1301            default_max_future_skew(),
1302        ));
1303    }
1304
1305    #[test]
1306    fn proof_is_safely_fresh_empty_quotes_is_vacuously_safe() {
1307        // No quotes = no storer-side timestamp check to fail. The
1308        // proof is structurally invalid for other reasons, but
1309        // this function's contract is "no stale timestamp present",
1310        // which is trivially true for an empty list.
1311        let proof = make_proof_with_timestamps(&[]);
1312        assert!(proof_is_safely_fresh(
1313            &proof,
1314            std::time::SystemTime::now(),
1315            Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1316            default_max_future_skew(),
1317        ));
1318    }
1319}