Skip to main content

ant_core/data/client/
batch.rs

1//! Batch chunk upload with wave-based pipelined EVM payments.
2//!
3//! Groups chunks into waves of 64 and pays for each
4//! wave in a single EVM transaction. Stores from wave N are pipelined
5//! with quote collection for wave N+1 via `tokio::join!`.
6
7use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, PartialUploadSpend, Result};
13use ant_protocol::evm::{
14    Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15    RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{
18    deserialize_proof, serialize_single_node_proof, PaymentProof, SingleNodePayment,
19};
20use ant_protocol::transport::{MultiAddr, PeerId};
21use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
22use bytes::Bytes;
23use futures::stream::{self, StreamExt};
24use std::collections::{HashMap, HashSet};
25use std::time::{Duration, Instant};
26use tokio::sync::mpsc;
27use tracing::{debug, info, warn};
28
29/// Number of chunks per payment wave.
30const PAYMENT_WAVE_SIZE: usize = 64;
31
32/// Soft ceiling on the combined body size of chunks stored concurrently in a
33/// single wave. Caps store concurrency for large chunks so the send path's
34/// per-peer body buffers can't pin multiple GB at once (see V2-461). At ~4 MB
35/// chunks this permits ~16 concurrent stores; small chunks hit the chunk-count
36/// / adaptive limits instead and are unaffected.
37const STORE_INFLIGHT_BYTE_BUDGET: usize = 64 * 1024 * 1024;
38
39/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`].
40#[derive(Debug)]
41pub struct PreparedChunk {
42    /// The chunk content bytes.
43    pub content: Bytes,
44    /// Content address (BLAKE3 hash).
45    pub address: XorName,
46    /// Closest peers from quote collection — PUT targets for close-group replication.
47    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
48    /// Payment structure (quotes sorted, median selected, not yet paid on-chain).
49    pub payment: SingleNodePayment,
50    /// Peer quotes for building `ProofOfPayment`.
51    pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
52}
53
54/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`].
55#[derive(Debug, Clone)]
56pub struct PaidChunk {
57    /// The chunk content bytes.
58    pub content: Bytes,
59    /// Content address (BLAKE3 hash).
60    pub address: XorName,
61    /// Closest peers from quote collection — PUT targets for close-group replication.
62    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
63    /// Serialized [`PaymentProof`] bytes.
64    pub proof_bytes: Vec<u8>,
65}
66
67/// Result of storing a wave of paid chunks, with retry tracking.
68#[derive(Debug)]
69pub struct WaveResult {
70    /// Successfully stored chunk addresses.
71    pub stored: Vec<XorName>,
72    /// Chunks that failed to store after all retries.
73    pub failed: Vec<(XorName, String)>,
74    /// Sum of store-RPC attempts across all chunks in this wave (>= stored.len() + failed.len()).
75    pub chunk_attempts_total: usize,
76    /// Per-chunk wall-clock (ms) from first attempt to successful store. Only populated for stored chunks.
77    pub store_durations_ms: Vec<u64>,
78    /// Histogram of which retry-round each stored chunk succeeded on (index 0 = first attempt).
79    pub retries_per_chunk: Vec<u32>,
80}
81
82/// Aggregated retry / wall-clock stats across one or more [`WaveResult`]s.
83///
84/// Used by [`Client::batch_upload_chunks_with_events`] (which may store
85/// multiple waves per call) and surfaced upward into `FileUploadResult` so
86/// downstream tooling can record per-upload retry pressure and per-chunk
87/// store wall-clock without needing log parsing.
88#[derive(Debug, Default, Clone)]
89pub struct WaveAggregateStats {
90    /// Sum of store-RPC attempts across all waves (>= chunks_stored).
91    pub chunk_attempts_total: usize,
92    /// Per-chunk wall-clock (ms) from first attempt to successful store,
93    /// concatenated across waves.
94    pub store_durations_ms: Vec<u64>,
95    /// Count of stored chunks that succeeded on each retry round
96    /// (index 0 = first attempt, 1 = first retry, etc.). Indices match
97    /// the retry rounds emitted by `Client::store_paid_chunks_with_events`
98    /// which caps at `MAX_RETRIES = 3`, so an array of 4 suffices.
99    pub retries_histogram: [usize; 4],
100}
101
102impl WaveAggregateStats {
103    /// Fold one [`WaveResult`]'s stats into the running aggregate.
104    pub fn absorb(&mut self, wave: &WaveResult) {
105        self.chunk_attempts_total = self
106            .chunk_attempts_total
107            .saturating_add(wave.chunk_attempts_total);
108        self.store_durations_ms.extend(&wave.store_durations_ms);
109        for &r in &wave.retries_per_chunk {
110            let idx = (r as usize).min(self.retries_histogram.len() - 1);
111            self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
112        }
113    }
114}
115
116/// Compute a percentile from an unsorted slice of `u64` values.
117///
118/// `p` is in `[0.0, 1.0]`. Returns 0 for an empty slice. Uses nearest-rank;
119/// callers don't need numerical precision here — these are coarse log/metric
120/// summaries.
121fn percentile(values: &[u64], p: f64) -> u64 {
122    if values.is_empty() {
123        return 0;
124    }
125    let mut sorted = values.to_vec();
126    sorted.sort_unstable();
127    let p = p.clamp(0.0, 1.0);
128    // Nearest-rank: ceil(p * n) - 1, clamped to [0, n-1].
129    let n = sorted.len();
130    #[allow(
131        clippy::cast_possible_truncation,
132        clippy::cast_sign_loss,
133        clippy::cast_precision_loss
134    )]
135    let rank = ((p * n as f64).ceil() as usize)
136        .saturating_sub(1)
137        .min(n - 1);
138    sorted[rank]
139}
140
141/// Payment data for external signing.
142///
143/// Contains the information needed to construct and submit the on-chain
144/// payment transaction without requiring a local wallet or private key.
145#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
146pub struct PaymentIntent {
147    /// Individual payment entries: (quote_hash, rewards_address, amount).
148    pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
149    /// Total amount across all payments.
150    pub total_amount: Amount,
151}
152
153impl PaymentIntent {
154    /// Build from a set of prepared chunks.
155    ///
156    /// Collects all non-zero payment entries and computes the total.
157    pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
158        let mut payments = Vec::new();
159        let mut total = Amount::ZERO;
160        for chunk in prepared {
161            for info in &chunk.payment.quotes {
162                if !info.amount.is_zero() {
163                    payments.push((info.quote_hash, info.rewards_address, info.amount));
164                    total += info.amount;
165                }
166            }
167        }
168        Self {
169            payments,
170            total_amount: total,
171        }
172    }
173}
174
175/// Build [`PaidChunk`]s from prepared chunks and externally-provided transaction hashes.
176///
177/// Shared by [`Client::batch_pay`] (wallet flow) and [`finalize_batch_payment`] (external signer).
178///
179/// Returns an error if any non-zero-amount quote hash is missing from `tx_hash_map`,
180/// since chunks uploaded without valid proofs would be rejected by the network.
181fn build_paid_chunks(
182    prepared: Vec<PreparedChunk>,
183    tx_hash_map: &HashMap<QuoteHash, TxHash>,
184) -> Result<Vec<PaidChunk>> {
185    let mut paid_chunks = Vec::with_capacity(prepared.len());
186    for chunk in prepared {
187        let mut tx_hashes = Vec::new();
188        for info in &chunk.payment.quotes {
189            if !info.amount.is_zero() {
190                let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
191                    Error::Payment(format!(
192                        "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
193                        hex::encode(info.quote_hash)
194                    ))
195                })?;
196                tx_hashes.push(tx_hash);
197            }
198        }
199
200        let proof = PaymentProof {
201            proof_of_payment: ProofOfPayment {
202                peer_quotes: chunk.peer_quotes,
203            },
204            tx_hashes,
205        };
206
207        let proof_bytes = serialize_single_node_proof(&proof)
208            .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
209
210        paid_chunks.push(PaidChunk {
211            content: chunk.content,
212            address: chunk.address,
213            quoted_peers: chunk.quoted_peers,
214            proof_bytes,
215        });
216    }
217    Ok(paid_chunks)
218}
219
220/// Finalize a batch payment using externally-provided transaction hashes.
221///
222/// Takes prepared chunks and a map of `quote_hash -> tx_hash` from the
223/// external signer. Builds per-chunk `PaymentProof` bytes without needing a wallet.
224pub fn finalize_batch_payment(
225    prepared: Vec<PreparedChunk>,
226    tx_hash_map: &HashMap<QuoteHash, TxHash>,
227) -> Result<Vec<PaidChunk>> {
228    build_paid_chunks(prepared, tx_hash_map)
229}
230
231impl Client {
232    /// Prepare a single chunk for batch payment.
233    ///
234    /// Collects quotes and uses node-reported prices without making any
235    /// on-chain transaction. Returns `Ok(None)` if the chunk is already
236    /// stored on the network.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if quote collection or payment construction fails.
241    pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
242        let address = compute_address(&content);
243        let data_size = u64::try_from(content.len())
244            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
245
246        let quote_plan = match self
247            .get_store_quote_plan(&address, data_size, DATA_TYPE_CHUNK)
248            .await
249        {
250            Ok(plan) => plan,
251            Err(Error::AlreadyStored) => {
252                debug!("Chunk {} already stored, skipping", hex::encode(address));
253                return Ok(None);
254            }
255            Err(e) => return Err(e),
256        };
257        let quotes_with_peers = quote_plan.quotes;
258
259        // Capture all quoted peers for close-group replication.
260        let quoted_peers = quote_plan.put_peers;
261
262        // Build peer_quotes for ProofOfPayment + quotes for SingleNodePayment.
263        // Use node-reported prices directly — no contract price fetch needed.
264        let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
265        let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
266
267        for (peer_id, _addrs, quote, price) in quotes_with_peers {
268            let encoded = peer_id_to_encoded(&peer_id)?;
269            peer_quotes.push((encoded, quote.clone()));
270            quotes_for_payment.push((quote, price));
271        }
272
273        let payment = SingleNodePayment::from_quotes(quotes_for_payment)
274            .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
275
276        Ok(Some(PreparedChunk {
277            content,
278            address,
279            quoted_peers,
280            payment,
281            peer_quotes,
282        }))
283    }
284
285    /// Pay for multiple chunks in a single EVM transaction.
286    ///
287    /// Flattens all quote payments from the prepared chunks into one
288    /// `wallet.pay_for_quotes()` call, then maps transaction hashes
289    /// back to per-chunk [`PaymentProof`] bytes.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if the wallet is not configured or the on-chain
294    /// payment fails.
295    /// Returns `(paid_chunks, storage_cost_atto, gas_cost_wei)`.
296    pub async fn batch_pay(
297        &self,
298        prepared: Vec<PreparedChunk>,
299    ) -> Result<(Vec<PaidChunk>, String, u128)> {
300        if prepared.is_empty() {
301            return Ok((Vec::new(), "0".to_string(), 0));
302        }
303
304        let wallet = self.require_wallet()?;
305
306        // Compute total storage cost from the prepared chunks before paying.
307        let intent = PaymentIntent::from_prepared_chunks(&prepared);
308        let storage_cost_atto = intent.total_amount.to_string();
309
310        // Flatten all quote payments from all chunks into a single batch.
311        let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
312        let mut all_payments = Vec::with_capacity(total_quotes);
313        for chunk in &prepared {
314            for info in &chunk.payment.quotes {
315                all_payments.push((info.quote_hash, info.rewards_address, info.amount));
316            }
317        }
318
319        debug!(
320            "Batch payment for {} chunks ({} quote entries)",
321            prepared.len(),
322            all_payments.len()
323        );
324
325        let (tx_hash_map, gas_info) =
326            wallet
327                .pay_for_quotes(all_payments)
328                .await
329                .map_err(|PayForQuotesError(err, _)| {
330                    Error::Payment(format!("Batch payment failed: {err}"))
331                })?;
332
333        info!(
334            "Batch payment succeeded: {} transactions",
335            tx_hash_map.len()
336        );
337
338        let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
339        let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
340        Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
341    }
342
343    /// Upload chunks in waves with pipelined EVM payments.
344    ///
345    /// Processes chunks in waves of `PAYMENT_WAVE_SIZE` (64). Within each wave:
346    /// 1. **Prepare**: collect quotes for all chunks concurrently
347    /// 2. **Pay**: single EVM transaction for the whole wave
348    /// 3. **Store**: concurrent chunk replication to close group
349    ///
350    /// Stores from wave N overlap with quote collection for wave N+1
351    /// via `tokio::join!`.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if any payment or store operation fails.
356    /// Returns `(addresses, total_storage_cost_atto, total_gas_cost_wei)`.
357    pub async fn batch_upload_chunks(
358        &self,
359        chunks: Vec<Bytes>,
360    ) -> Result<(Vec<XorName>, String, u128)> {
361        let (addresses, storage, gas, _stats) = self
362            .batch_upload_chunks_with_events(chunks, None, 0, 0, None)
363            .await?;
364        Ok((addresses, storage, gas))
365    }
366
367    /// Same as [`Client::batch_upload_chunks`] but sends [`UploadEvent::ChunkStored`]
368    /// events as each chunk is stored, enabling per-chunk progress bars.
369    ///
370    /// `stored_offset` is the number of chunks already stored in previous waves
371    /// (so events report cumulative progress). `file_total` is the total chunk
372    /// count across ALL waves (for the `total` field in events).
373    ///
374    /// When `resume_key` is `Some`, per-wave payment proofs are persisted
375    /// to `<data_dir>/payments/single/<ts>_<hash(resume_key)>` via
376    /// `crate::data::client::cached_single` so that a partial-upload
377    /// failure can be resumed on the next attempt without paying twice.
378    /// The caller is responsible for deleting the cache entry on full
379    /// success (typically `upload_with_options` in `file.rs`).
380    pub async fn batch_upload_chunks_with_events(
381        &self,
382        chunks: Vec<Bytes>,
383        progress: Option<&mpsc::Sender<UploadEvent>>,
384        stored_offset: usize,
385        file_total: usize,
386        resume_key: Option<&str>,
387    ) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
388        if chunks.is_empty() {
389            return Ok((
390                Vec::new(),
391                "0".to_string(),
392                0,
393                WaveAggregateStats::default(),
394            ));
395        }
396
397        let total_chunks = chunks.len();
398        let quote_cap = self.controller().quote.current();
399        let store_cap = self.controller().store.current();
400        debug!(
401            "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
402             (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
403        );
404
405        // Load any previously-cached single-node receipt for this
406        // upload. Each chunk whose address is in the cache will skip
407        // the quote + pay phases and have its `PaidChunk` constructed
408        // directly from the cached proof + fresh quoted peers. The
409        // caller is responsible for deleting the cache on full
410        // success; we only read here, never write the load result back.
411        //
412        // Before trusting any cached proof, decode it locally and drop
413        // any whose quote.timestamp is past the storer's per-quote age
414        // budget (`QUOTE_MAX_AGE_SECS`, mirrored here as
415        // `CACHED_PROOF_EXPIRY_SECS`). The previous design trusted a
416        // substring match on remote error text, which a Byzantine
417        // storer could spoof to force double-payment. Local pre-flight
418        // is decision-pure: we never hand a doomed proof to a storer,
419        // and the cache is updated under our own lock with no remote
420        // text involved.
421        // Load only the cached PROOFS (for reuse). The cost this function
422        // returns is a per-call DELTA — what was freshly paid in THIS call —
423        // not the cache's cumulative. The single-node wave driver
424        // (`upload_spill_addresses_single`) calls this once per wave and SUMS
425        // the per-call costs, so seeding the return with the cumulative cache
426        // (which grows as each wave appends to it) double-counts:
427        // A + (A+B) + (A+B+C) instead of A+B+C.
428        let cached_proofs: HashMap<XorName, Vec<u8>> = match resume_key {
429            Some(key) => match crate::data::client::cached_single::try_load_for_file(key) {
430                Some((_, receipt)) => prune_locally_expired_proofs(key, receipt.proofs),
431                None => HashMap::new(),
432            },
433            None => HashMap::new(),
434        };
435
436        let mut all_addresses = Vec::with_capacity(total_chunks);
437        let mut seen_addresses: HashSet<XorName> = HashSet::new();
438
439        // Accumulate only THIS call's freshly-paid cost (per-call delta; see
440        // the proof-load comment above for why this must not include the cache).
441        let mut total_storage = Amount::ZERO;
442        let mut total_gas: u128 = 0;
443        let mut agg_stats = WaveAggregateStats::default();
444
445        // Deduplicate chunks by content address.
446        let mut unique_chunks = Vec::with_capacity(total_chunks);
447        for chunk in chunks {
448            let address = compute_address(&chunk);
449            if seen_addresses.insert(address) {
450                unique_chunks.push(chunk);
451            } else {
452                debug!("Skipping duplicate chunk {}", hex::encode(address));
453                all_addresses.push(address);
454                if let Some(tx) = progress {
455                    let _ = tx.try_send(UploadEvent::ChunkStored {
456                        stored: stored_offset + all_addresses.len(),
457                        total: file_total,
458                    });
459                }
460            }
461        }
462
463        // Split into waves.
464        let waves: Vec<Vec<Bytes>> = unique_chunks
465            .chunks(PAYMENT_WAVE_SIZE)
466            .map(<[Bytes]>::to_vec)
467            .collect();
468        let wave_count = waves.len();
469
470        debug!(
471            "{total_chunks} chunks -> {} unique -> {wave_count} waves",
472            seen_addresses.len()
473        );
474
475        let mut pending_store: Option<Vec<PaidChunk>> = None;
476        let mut total_quoted: usize = 0;
477
478        for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
479            let wave_num = wave_idx + 1;
480            let wave_size = wave_chunks.len();
481
482            // Pipeline: store previous wave while preparing this one.
483            let (prepare_result, store_result) = match pending_store.take() {
484                Some(paid_chunks) => {
485                    let store_offset = stored_offset + all_addresses.len();
486                    let quoted_offset = stored_offset + total_quoted;
487                    let (prep, stored) = tokio::join!(
488                        self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
489                        self.store_paid_chunks_with_events(
490                            paid_chunks,
491                            progress,
492                            store_offset,
493                            file_total
494                        )
495                    );
496                    (prep, Some(stored))
497                }
498                None => {
499                    let quoted_offset = stored_offset + total_quoted;
500                    let result = self
501                        .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
502                        .await;
503                    (result, None)
504                }
505            };
506            total_quoted += wave_size;
507
508            // Track partial progress from previous wave.
509            if let Some(wave_result) = store_result {
510                all_addresses.extend(&wave_result.stored);
511                agg_stats.absorb(&wave_result);
512                if !wave_result.failed.is_empty() {
513                    let failed_count = wave_result.failed.len();
514                    warn!("{failed_count} chunks failed to store after retries");
515                    return Err(Error::PartialUpload {
516                        stored: all_addresses.clone(),
517                        stored_count: stored_offset + all_addresses.len(),
518                        failed: wave_result.failed,
519                        failed_count,
520                        total_chunks: file_total,
521                        spend: Box::new(PartialUploadSpend {
522                            storage_cost_atto: total_storage.to_string(),
523                            gas_cost_wei: total_gas,
524                        }),
525                        reason: "wave store failed after retries".into(),
526                    });
527                }
528            }
529
530            let (prepared_chunks, already_stored) = prepare_result?;
531            all_addresses.extend(&already_stored);
532            if let Some(tx) = progress {
533                for _ in &already_stored {
534                    let _ = tx.try_send(UploadEvent::ChunkStored {
535                        stored: stored_offset + all_addresses.len(),
536                        total: file_total,
537                    });
538                }
539            }
540
541            if prepared_chunks.is_empty() {
542                info!("Wave {wave_num}/{wave_count}: all chunks already stored");
543                continue;
544            }
545
546            // Split prepared chunks into "already paid in a previous
547            // attempt" (cached) and "needs payment" (fresh). Cached
548            // chunks build a `PaidChunk` from the cached proof + the
549            // freshly-quoted peers, bypassing the EVM transaction.
550            let mut needs_pay: Vec<PreparedChunk> = Vec::with_capacity(prepared_chunks.len());
551            let mut cached_paid: Vec<PaidChunk> = Vec::new();
552            for prep in prepared_chunks {
553                if let Some(proof_bytes) = cached_proofs.get(&prep.address).cloned() {
554                    cached_paid.push(PaidChunk {
555                        content: prep.content,
556                        address: prep.address,
557                        quoted_peers: prep.quoted_peers,
558                        proof_bytes,
559                    });
560                } else {
561                    needs_pay.push(prep);
562                }
563            }
564            if !cached_paid.is_empty() {
565                info!(
566                    "Wave {wave_num}/{wave_count}: reusing {} cached payment proofs",
567                    cached_paid.len()
568                );
569            }
570
571            let (mut paid_chunks, wave_storage, wave_gas) = if needs_pay.is_empty() {
572                (Vec::new(), "0".to_string(), 0u128)
573            } else {
574                info!(
575                    "Wave {wave_num}/{wave_count}: paying for {} chunks",
576                    needs_pay.len()
577                );
578                self.batch_pay(needs_pay).await?
579            };
580            if let Ok(cost) = wave_storage.parse::<Amount>() {
581                total_storage += cost;
582            }
583            total_gas = total_gas.saturating_add(wave_gas);
584
585            // Persist the freshly-paid wave's proofs so a later
586            // failure can resume without re-paying.
587            if let Some(key) = resume_key {
588                if !paid_chunks.is_empty() {
589                    let new_proofs: HashMap<[u8; 32], Vec<u8>> = paid_chunks
590                        .iter()
591                        .map(|pc| (pc.address, pc.proof_bytes.clone()))
592                        .collect();
593                    crate::data::client::cached_single::try_append_wave(
594                        key,
595                        new_proofs,
596                        &wave_storage,
597                        wave_gas,
598                    );
599                }
600            }
601
602            paid_chunks.extend(cached_paid);
603            pending_store = Some(paid_chunks);
604        }
605
606        // Store the last wave.
607        if let Some(paid_chunks) = pending_store {
608            let store_offset = stored_offset + all_addresses.len();
609            let wave_result = self
610                .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
611                .await;
612            all_addresses.extend(&wave_result.stored);
613            agg_stats.absorb(&wave_result);
614            if !wave_result.failed.is_empty() {
615                let failed_count = wave_result.failed.len();
616                warn!("{failed_count} chunks failed to store after retries (final wave)");
617                return Err(Error::PartialUpload {
618                    stored: all_addresses.clone(),
619                    stored_count: stored_offset + all_addresses.len(),
620                    failed: wave_result.failed,
621                    failed_count,
622                    total_chunks: file_total,
623                    spend: Box::new(PartialUploadSpend {
624                        storage_cost_atto: total_storage.to_string(),
625                        gas_cost_wei: total_gas,
626                    }),
627                    reason: "final wave store failed after retries".into(),
628                });
629            }
630        }
631
632        debug!("Batch upload complete: {} addresses", all_addresses.len());
633        Ok((
634            all_addresses,
635            total_storage.to_string(),
636            total_gas,
637            agg_stats,
638        ))
639    }
640
641    /// Prepare a wave of chunks by collecting quotes concurrently.
642    ///
643    /// Fires [`UploadEvent::ChunkQuoted`] as each chunk's quote completes.
644    /// Returns `(prepared_chunks, already_stored_addresses)`.
645    async fn prepare_wave(
646        &self,
647        chunks: Vec<Bytes>,
648        progress: Option<&mpsc::Sender<UploadEvent>>,
649        quoted_offset: usize,
650        file_total: usize,
651    ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
652        let chunk_count = chunks.len();
653        let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
654            .into_iter()
655            .map(|c| {
656                let addr = compute_address(&c);
657                (c, addr)
658            })
659            .collect();
660
661        let quote_limiter = self.controller().quote.clone();
662        // Batch-aware fan-out: clamp to chunk_count so we never
663        // pay for fan-out slots we cannot fill on a partial wave.
664        // See PERF-RESULTS.md — measured ~30% slowdown when
665        // cap > batch size on quoting workloads (live mainnet).
666        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
667        let mut quote_stream = stream::iter(chunks_with_addr)
668            .map(|(content, address)| {
669                let limiter = quote_limiter.clone();
670                async move {
671                    let result = observe_op(
672                        &limiter,
673                        || async move { self.prepare_chunk_payment(content).await },
674                        classify_error,
675                    )
676                    .await;
677                    (address, result)
678                }
679            })
680            .buffer_unordered(quote_concurrency);
681
682        let mut prepared = Vec::with_capacity(chunk_count);
683        let mut already_stored = Vec::new();
684        let mut quoted_count = 0usize;
685
686        while let Some((address, result)) = quote_stream.next().await {
687            let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
688            match result? {
689                Some(chunk) => prepared.push(chunk),
690                None => already_stored.push(address),
691            }
692            quoted_count += 1;
693            let progress_num = quoted_offset + quoted_count;
694            if file_total > 0 {
695                if chunk_already_stored {
696                    info!("Verified {progress_num}/{file_total} (already stored)");
697                } else {
698                    info!("Quoted {progress_num}/{file_total}");
699                }
700            }
701            if let Some(tx) = progress {
702                let _ = tx.try_send(UploadEvent::ChunkQuoted {
703                    quoted: progress_num,
704                    total: file_total,
705                });
706            }
707        }
708
709        Ok((prepared, already_stored))
710    }
711
712    /// Store a batch of paid chunks concurrently to their close groups.
713    ///
714    /// Retries failed chunks up to 3 times with exponential backoff (500ms, 1s, 2s).
715    /// Returns a [`WaveResult`] with both successes and failures so callers can
716    /// track partial progress instead of losing information about stored chunks.
717    ///
718    /// When `progress` is `Some`, sends [`UploadEvent::ChunkStored`] as each
719    /// chunk is successfully stored. `stored_before` is the count of chunks
720    /// already stored in previous waves so the event reports an accurate
721    /// cumulative total; `total_chunks` is the total across all waves. Pass
722    /// `None`/0/0 when progress reporting is not needed.
723    pub(crate) async fn store_paid_chunks_with_events(
724        &self,
725        paid_chunks: Vec<PaidChunk>,
726        progress: Option<&mpsc::Sender<UploadEvent>>,
727        stored_before: usize,
728        total_chunks: usize,
729    ) -> WaveResult {
730        const MAX_RETRIES: u32 = 3;
731        const BASE_DELAY_MS: u64 = 500;
732
733        let mut stored = Vec::new();
734        let mut to_retry = paid_chunks;
735
736        // Per-chunk first-seen timestamps, keyed by chunk address.
737        // Inserted on first sight; never overwritten so wall-clock spans
738        // first attempt → eventual success across all retry rounds.
739        let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
740        for chunk in &to_retry {
741            first_seen.entry(chunk.address).or_insert_with(Instant::now);
742        }
743
744        // Bound concurrency by IN-FLIGHT BYTES, not just chunk count. Each
745        // concurrently-stored chunk is held in memory while it is sent to its
746        // close group, and the send path re-serializes the body once per peer,
747        // so a wave of large (~4 MB) chunks at full store concurrency can pin
748        // multiple GB and OOM a small host. Cap how many chunks store at once
749        // so their combined body size stays under the budget; small chunks are
750        // unaffected (the byte bound exceeds the chunk-count bound). The budget
751        // is deliberately conservative for the current per-peer send
752        // amplification and can be raised once that is reduced upstream.
753        let max_chunk_bytes = to_retry.iter().map(|c| c.content.len()).max().unwrap_or(0);
754        // `checked_div` yields `None` only when `max_chunk_bytes == 0` (an
755        // empty/zero-length wave), in which case there is no byte limit.
756        let byte_bound = STORE_INFLIGHT_BYTE_BUDGET
757            .checked_div(max_chunk_bytes)
758            .map_or(usize::MAX, |n| n.max(1));
759
760        let mut chunk_attempts_total: usize = 0;
761        let mut store_durations_ms: Vec<u64> = Vec::new();
762        let mut retries_per_chunk: Vec<u32> = Vec::new();
763
764        for attempt in 0..=MAX_RETRIES {
765            if attempt > 0 {
766                let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
767                tokio::time::sleep(delay).await;
768                info!(
769                    "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
770                    to_retry.len()
771                );
772            }
773
774            // Each chunk in this round counts as one store-RPC attempt.
775            chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());
776
777            let store_limiter = self.controller().store.clone();
778            let store_concurrency = store_limiter
779                .current()
780                .min(to_retry.len().max(1))
781                .min(byte_bound);
782            let mut upload_stream = stream::iter(to_retry)
783                .map(|chunk| {
784                    let chunk_clone = chunk.clone();
785                    let limiter = store_limiter.clone();
786                    async move {
787                        let result = observe_op(
788                            &limiter,
789                            || async move {
790                                self.chunk_put_to_close_group(
791                                    chunk.content,
792                                    chunk.proof_bytes,
793                                    &chunk.quoted_peers,
794                                )
795                                .await
796                            },
797                            classify_error,
798                        )
799                        .await;
800                        (chunk_clone, result)
801                    }
802                })
803                .buffer_unordered(store_concurrency);
804
805            let mut failed_this_round = Vec::new();
806            while let Some((chunk, result)) = upload_stream.next().await {
807                match result {
808                    Ok(name) => {
809                        let duration_ms = first_seen
810                            .get(&chunk.address)
811                            .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
812                            .unwrap_or(0);
813                        store_durations_ms.push(duration_ms);
814                        retries_per_chunk.push(attempt);
815                        stored.push(name);
816                        let stored_num = stored_before + stored.len();
817                        if total_chunks > 0 {
818                            info!("Stored {stored_num}/{total_chunks}");
819                        }
820                        if let Some(tx) = progress {
821                            let _ = tx.try_send(UploadEvent::ChunkStored {
822                                stored: stored_num,
823                                total: total_chunks,
824                            });
825                        }
826                    }
827                    Err(e) => failed_this_round.push((chunk, e.to_string())),
828                }
829            }
830
831            if failed_this_round.is_empty() {
832                let result = WaveResult {
833                    stored,
834                    failed: Vec::new(),
835                    chunk_attempts_total,
836                    store_durations_ms,
837                    retries_per_chunk,
838                };
839                log_wave_summary(&result);
840                return result;
841            }
842
843            if attempt == MAX_RETRIES {
844                let failed = failed_this_round
845                    .into_iter()
846                    .map(|(c, e)| (c.address, e))
847                    .collect();
848                let result = WaveResult {
849                    stored,
850                    failed,
851                    chunk_attempts_total,
852                    store_durations_ms,
853                    retries_per_chunk,
854                };
855                log_wave_summary(&result);
856                return result;
857            }
858
859            warn!(
860                "{} chunks failed on attempt {}, will retry",
861                failed_this_round.len(),
862                attempt + 1
863            );
864            to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
865        }
866
867        // Unreachable due to loop structure, but satisfy the compiler.
868        let result = WaveResult {
869            stored,
870            failed: Vec::new(),
871            chunk_attempts_total,
872            store_durations_ms,
873            retries_per_chunk,
874        };
875        log_wave_summary(&result);
876        result
877    }
878}
879
880/// Emit one structured info line summarising a wave's store-side stats.
881///
882/// Surfaces p50/p95/max chunk wall-clock and per-round retry counts so
883/// log-based analysis tooling (Elasticsearch / Kibana) can identify
884/// client-side quorum or retry cost without needing the `--json` output.
885fn log_wave_summary(result: &WaveResult) {
886    let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
887    let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
888    let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
889    let chunk_attempts_total = result.chunk_attempts_total;
890    info!(
891        chunks_stored = result.stored.len(),
892        chunks_failed = result.failed.len(),
893        chunk_attempts_total,
894        retries_round_1,
895        retries_round_2,
896        retries_round_3,
897        store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
898        store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
899        store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
900        "chunk_store_wave_complete"
901    );
902}
903
904/// Safety margin subtracted from the storer's `QUOTE_MAX_AGE_SECS` (24 h)
905/// when deciding to trust a cached proof.
906///
907/// A proof whose oldest `quote.timestamp` is closer than this to the
908/// storer's hard limit is treated as already-expired locally. The
909/// margin covers (a) clock skew between client and storer, (b) the
910/// in-flight time between the local check and the storer's
911/// `validate_quote_timestamps` call, and (c) the time spent uploading
912/// the chunk body. 5 minutes is generous for all three combined and
913/// cheap: a wrongly-kept proof costs an extra retry round trip, a
914/// wrongly-dropped proof costs one re-pay (cheap chunk).
915const CACHED_PROOF_SAFETY_MARGIN_SECS: u64 = 300;
916
917/// Storer-side budget for a quote's age. Mirrors `QUOTE_MAX_AGE_SECS`
918/// in `ant-node/src/payment/verifier.rs`. If this value drifts on the
919/// node side, the worst case is the client either keeps proofs slightly
920/// past the storer limit (forced re-pay on next retry, no money lost)
921/// or drops them slightly early (one extra re-pay, no money lost).
922/// Either way, no payment is double-spent or stranded.
923const CACHED_PROOF_MAX_AGE_SECS: u64 = 24 * 60 * 60;
924
925/// How far a cached quote's `timestamp` may be in the future before we
926/// classify it as too-skewed-to-trust and prune.
927///
928/// Mirrors `QUOTE_FUTURE_SKEW_TOLERANCE_SECS = 300` in
929/// `ant-node/src/payment/verifier.rs`. If the client's clock runs
930/// slow relative to the storer that issued the quote, a perfectly
931/// valid proof can appear future-dated to the client — rejecting any
932/// forward drift would re-pay those chunks on every retry. Allow the
933/// same 5-minute window the storer does so the client and node agree
934/// on which proofs are fresh.
935const CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS: u64 = 300;
936
937/// Drop cached `proof_bytes` whose quote timestamps are too close to
938/// the storer's expiry window to safely reuse.
939///
940/// Why this exists
941/// ---------------
942/// The cache stores `(chunk_address, proof_bytes)` so a retried upload
943/// can skip re-paying. The proof bytes embed `quote.timestamp`s. Each
944/// storer evaluates each `quote.timestamp` independently against its
945/// 24 h `QUOTE_MAX_AGE_SECS` budget, so close to the 24 h boundary
946/// (or on a multi-day-old cache that survived past the receipt's outer
947/// expiry for some reason) the storer rejects what the client still
948/// believes is fresh.
949///
950/// The previous design trusted a substring match on the storer's
951/// returned error text to detect this and invalidate the cache after
952/// the fact. That allowed a Byzantine storer to spoof the marker and
953/// force the client to re-pay fresh proofs (double-payment). This
954/// implementation is decision-pure: we decode the proof locally and
955/// only re-use it if every embedded quote is comfortably within the
956/// budget. No remote text involved.
957///
958/// Side-effect: dropped entries are removed from the on-disk cache so
959/// they don't reappear on the next load.
960fn prune_locally_expired_proofs(
961    resume_key: &str,
962    proofs: HashMap<[u8; 32], Vec<u8>>,
963) -> HashMap<XorName, Vec<u8>> {
964    let now = std::time::SystemTime::now();
965    let max_safe_age = Duration::from_secs(
966        CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
967    );
968    let max_future_skew = Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS);
969    let mut kept: HashMap<XorName, Vec<u8>> = HashMap::with_capacity(proofs.len());
970    // Pair each expired address with the EXACT bytes we observed at
971    // load time. The cache-side drop only removes the entry if those
972    // bytes still match, so a concurrent re-pay that refreshed the
973    // proof under its own lock is not clobbered (CAS semantics, fixes
974    // the TOCTOU between unlocked-load and locked-drop).
975    let mut expired: Vec<([u8; 32], Vec<u8>)> = Vec::new();
976    for (addr, bytes) in proofs {
977        match deserialize_proof(&bytes) {
978            Ok((proof, _tx_hashes)) => {
979                if proof_is_safely_fresh(&proof, now, max_safe_age, max_future_skew) {
980                    kept.insert(addr, bytes);
981                } else {
982                    expired.push((addr, bytes));
983                }
984            }
985            Err(_) => {
986                // Unreadable cached entry: drop it so it doesn't sit
987                // here forever. The chunk will re-quote+re-pay.
988                expired.push((addr, bytes));
989            }
990        }
991    }
992    if !expired.is_empty() {
993        info!(
994            "Pruning {} stale cached proofs (quote.timestamp past safe-reuse window) \
995             before resume",
996            expired.len()
997        );
998        crate::data::client::cached_single::try_drop_proofs_for_file(resume_key, &expired);
999    }
1000    kept
1001}
1002
1003/// True iff every quote in the proof has a timestamp not older than
1004/// `now - max_safe_age` AND not further in the future than
1005/// `max_future_skew`. The forward-skew check mirrors the storer's
1006/// `QUOTE_FUTURE_SKEW_TOLERANCE_SECS` (300s) so a slow-running client
1007/// clock doesn't cause us to wrongly prune perfectly fresh proofs
1008/// that the storer would still accept.
1009fn proof_is_safely_fresh(
1010    proof: &ProofOfPayment,
1011    now: std::time::SystemTime,
1012    max_safe_age: Duration,
1013    max_future_skew: Duration,
1014) -> bool {
1015    for (_peer, quote) in &proof.peer_quotes {
1016        match now.duration_since(quote.timestamp) {
1017            Ok(age) => {
1018                if age > max_safe_age {
1019                    return false;
1020                }
1021            }
1022            Err(future) => {
1023                if future.duration() > max_future_skew {
1024                    return false;
1025                }
1026            }
1027        }
1028    }
1029    true
1030}
1031
1032/// Compile-time assertions that batch method futures are Send.
1033#[cfg(test)]
1034mod send_assertions {
1035    use super::*;
1036
1037    fn _assert_send<T: Send>(_: &T) {}
1038
1039    #[allow(dead_code)]
1040    async fn _batch_upload_is_send(client: &Client) {
1041        let fut = client.batch_upload_chunks(Vec::new());
1042        _assert_send(&fut);
1043    }
1044}
1045
1046#[cfg(test)]
1047#[allow(clippy::unwrap_used)]
1048mod tests {
1049    use super::*;
1050    use ant_protocol::payment::QuotePaymentInfo;
1051    use ant_protocol::CLOSE_GROUP_SIZE;
1052
1053    /// Median index in the quotes array.
1054    const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
1055
1056    /// Helper: build a `PreparedChunk` with `median_amount` at the median
1057    /// quote index and zero for all other quotes. Adapts automatically to
1058    /// `CLOSE_GROUP_SIZE` changes.
1059    fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
1060        let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
1061            let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
1062            QuotePaymentInfo {
1063                quote_hash: QuoteHash::from([i as u8 + 1; 32]),
1064                rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
1065                amount: Amount::from(amount),
1066                price: Amount::from(amount),
1067            }
1068        });
1069
1070        PreparedChunk {
1071            content: Bytes::from(vec![0xAA; 32]),
1072            address: [0u8; 32],
1073            quoted_peers: Vec::new(),
1074            payment: SingleNodePayment { quotes },
1075            peer_quotes: Vec::new(),
1076        }
1077    }
1078
1079    #[test]
1080    fn payment_intent_from_single_chunk() {
1081        let chunk = make_prepared_chunk(300);
1082        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1083
1084        assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
1085        assert_eq!(intent.total_amount, Amount::from(300));
1086
1087        let (hash, addr, amt) = &intent.payments[0];
1088        assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
1089        assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
1090        assert_eq!(*amt, Amount::from(300));
1091    }
1092
1093    #[test]
1094    fn payment_intent_from_multiple_chunks() {
1095        let c1 = make_prepared_chunk(100);
1096        let c2 = make_prepared_chunk(250);
1097        let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
1098
1099        assert_eq!(intent.payments.len(), 2);
1100        assert_eq!(intent.total_amount, Amount::from(350));
1101    }
1102
1103    #[test]
1104    fn payment_intent_skips_all_zero_chunks() {
1105        let chunk = make_prepared_chunk(0);
1106        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1107
1108        assert!(intent.payments.is_empty());
1109        assert_eq!(intent.total_amount, Amount::ZERO);
1110    }
1111
1112    #[test]
1113    fn payment_intent_empty_input() {
1114        let intent = PaymentIntent::from_prepared_chunks(&[]);
1115        assert!(intent.payments.is_empty());
1116        assert_eq!(intent.total_amount, Amount::ZERO);
1117    }
1118
1119    #[test]
1120    fn finalize_batch_payment_builds_proofs() {
1121        let chunk = make_prepared_chunk(500);
1122        let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
1123
1124        let mut tx_map = HashMap::new();
1125        tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
1126
1127        let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
1128
1129        assert_eq!(paid.len(), 1);
1130        assert!(!paid[0].proof_bytes.is_empty());
1131        assert_eq!(paid[0].address, [0u8; 32]);
1132    }
1133
1134    #[test]
1135    fn finalize_batch_payment_empty_input() {
1136        let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
1137        assert!(paid.is_empty());
1138    }
1139
1140    #[test]
1141    fn finalize_batch_payment_missing_tx_hash_errors() {
1142        // Missing tx hash for a non-zero-amount quote should error,
1143        // since the chunk would be rejected by the network without a valid proof.
1144        let chunk = make_prepared_chunk(500);
1145
1146        let result = finalize_batch_payment(vec![chunk], &HashMap::new());
1147        assert!(result.is_err());
1148        let err = result.unwrap_err().to_string();
1149        assert!(err.contains("Missing tx hash"), "got: {err}");
1150    }
1151
1152    #[test]
1153    fn finalize_batch_payment_multiple_chunks() {
1154        let c1 = make_prepared_chunk(100);
1155        let c2 = make_prepared_chunk(200);
1156        let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
1157        let mut tx_map = HashMap::new();
1158        // Both chunks have the same quote_hash (same index/byte pattern)
1159        // so one tx_hash covers both
1160        tx_map.insert(q1, TxHash::from([0xCC; 32]));
1161
1162        let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
1163        assert_eq!(paid.len(), 2);
1164    }
1165
1166    // ---- prune_locally_expired_proofs ----
1167    //
1168    // Build synthetic ProofOfPayment instances with controlled
1169    // timestamps to verify the local pre-flight stale-proof check.
1170    // This is the "no remote text trust" replacement for the prior
1171    // substring-matching invalidation path. A bug here is a direct
1172    // wallet leak (drop-too-eager = re-pay; keep-too-long = doomed
1173    // PUT round trip but no payment loss).
1174
1175    fn make_proof_with_timestamps(timestamps: &[std::time::SystemTime]) -> ProofOfPayment {
1176        let peer_quotes = timestamps
1177            .iter()
1178            .enumerate()
1179            .map(|(i, ts)| {
1180                let quote = PaymentQuote {
1181                    content: xor_name::XorName([0u8; 32]),
1182                    timestamp: *ts,
1183                    price: Amount::from(1u64),
1184                    rewards_address: RewardsAddress::new([1u8; 20]),
1185                    pub_key: vec![],
1186                    signature: vec![],
1187                };
1188                (EncodedPeerId::from([i as u8; 32]), quote)
1189            })
1190            .collect();
1191        ProofOfPayment { peer_quotes }
1192    }
1193
1194    fn default_max_future_skew() -> Duration {
1195        Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS)
1196    }
1197
1198    #[test]
1199    fn proof_is_safely_fresh_accepts_recent_quote() {
1200        let proof = make_proof_with_timestamps(&[std::time::SystemTime::now()]);
1201        assert!(proof_is_safely_fresh(
1202            &proof,
1203            std::time::SystemTime::now(),
1204            Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1205            default_max_future_skew(),
1206        ));
1207    }
1208
1209    #[test]
1210    fn proof_is_safely_fresh_rejects_quote_past_safe_window() {
1211        // 23h57m old: past the 24h - 5min safe-reuse threshold but
1212        // still within the storer's hard 24h limit. The whole point
1213        // of the safety margin is to drop these locally before
1214        // burning a doomed PUT round trip.
1215        let too_old = std::time::SystemTime::now() - Duration::from_secs(23 * 60 * 60 + 57 * 60);
1216        let proof = make_proof_with_timestamps(&[too_old]);
1217        let max_safe = Duration::from_secs(
1218            CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1219        );
1220        assert!(
1221            !proof_is_safely_fresh(
1222                &proof,
1223                std::time::SystemTime::now(),
1224                max_safe,
1225                default_max_future_skew(),
1226            ),
1227            "23h57m-old quote must fail safe-reuse check (limit is 24h - 5min margin)"
1228        );
1229    }
1230
1231    #[test]
1232    fn proof_is_safely_fresh_rejects_if_any_quote_is_stale() {
1233        // The storer rejects on a per-quote basis: a proof with even
1234        // one stale quote will fail on every retry. We must drop it.
1235        let now = std::time::SystemTime::now();
1236        let fresh = now;
1237        let stale = now - Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1238        let proof = make_proof_with_timestamps(&[fresh, fresh, stale, fresh]);
1239        let max_safe = Duration::from_secs(
1240            CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1241        );
1242        assert!(!proof_is_safely_fresh(
1243            &proof,
1244            now,
1245            max_safe,
1246            default_max_future_skew(),
1247        ));
1248    }
1249
1250    #[test]
1251    fn proof_is_safely_fresh_accepts_slight_future_skew_within_node_tolerance() {
1252        // Client clock 60s slow. Quote claims 60s in the future of
1253        // our local view. Node tolerates 300s forward skew, so the
1254        // storer would accept this quote — we must too, or we'd
1255        // wrongly prune fresh proofs and force re-payment.
1256        let now = std::time::SystemTime::now();
1257        let slight_future = now + Duration::from_secs(60);
1258        let proof = make_proof_with_timestamps(&[slight_future]);
1259        let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1260        assert!(
1261            proof_is_safely_fresh(&proof, now, max_safe, default_max_future_skew()),
1262            "60s-future quote must be accepted (within node's 300s skew tolerance)"
1263        );
1264    }
1265
1266    #[test]
1267    fn proof_is_safely_fresh_rejects_far_future_dated_quote() {
1268        // 1 hour in the future of our local clock. Exceeds the
1269        // node's 300s forward-skew tolerance and the storer would
1270        // reject it — we drop it locally to avoid a round trip.
1271        let now = std::time::SystemTime::now();
1272        let far_future = now + Duration::from_secs(3600);
1273        let proof = make_proof_with_timestamps(&[far_future]);
1274        let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1275        assert!(!proof_is_safely_fresh(
1276            &proof,
1277            now,
1278            max_safe,
1279            default_max_future_skew(),
1280        ));
1281    }
1282
1283    #[test]
1284    fn proof_is_safely_fresh_empty_quotes_is_vacuously_safe() {
1285        // No quotes = no storer-side timestamp check to fail. The
1286        // proof is structurally invalid for other reasons, but
1287        // this function's contract is "no stale timestamp present",
1288        // which is trivially true for an empty list.
1289        let proof = make_proof_with_timestamps(&[]);
1290        assert!(proof_is_safely_fresh(
1291            &proof,
1292            std::time::SystemTime::now(),
1293            Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1294            default_max_future_skew(),
1295        ));
1296    }
1297}