Skip to main content

ant_core/data/client/
batch.rs

1//! Batch chunk upload with wave-based pipelined EVM payments.
2//!
3//! Groups chunks into waves of 64 and pays for each
4//! wave in a single EVM transaction. Stores from wave N are pipelined
5//! with quote collection for wave N+1 via `tokio::join!`.
6
7use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, Result};
13use ant_protocol::evm::{
14    Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15    RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{
18    deserialize_proof, serialize_single_node_proof, PaymentProof, SingleNodePayment,
19};
20use ant_protocol::transport::{MultiAddr, PeerId};
21use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
22use bytes::Bytes;
23use futures::stream::{self, StreamExt};
24use std::collections::{HashMap, HashSet};
25use std::time::{Duration, Instant};
26use tokio::sync::mpsc;
27use tracing::{debug, info, warn};
28
29/// Number of chunks per payment wave.
30const PAYMENT_WAVE_SIZE: usize = 64;
31
32/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`].
33#[derive(Debug)]
34pub struct PreparedChunk {
35    /// The chunk content bytes.
36    pub content: Bytes,
37    /// Content address (BLAKE3 hash).
38    pub address: XorName,
39    /// Closest peers from quote collection — PUT targets for close-group replication.
40    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
41    /// Payment structure (quotes sorted, median selected, not yet paid on-chain).
42    pub payment: SingleNodePayment,
43    /// Peer quotes for building `ProofOfPayment`.
44    pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
45}
46
47/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`].
48#[derive(Debug, Clone)]
49pub struct PaidChunk {
50    /// The chunk content bytes.
51    pub content: Bytes,
52    /// Content address (BLAKE3 hash).
53    pub address: XorName,
54    /// Closest peers from quote collection — PUT targets for close-group replication.
55    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
56    /// Serialized [`PaymentProof`] bytes.
57    pub proof_bytes: Vec<u8>,
58}
59
60/// Result of storing a wave of paid chunks, with retry tracking.
61#[derive(Debug)]
62pub struct WaveResult {
63    /// Successfully stored chunk addresses.
64    pub stored: Vec<XorName>,
65    /// Chunks that failed to store after all retries.
66    pub failed: Vec<(XorName, String)>,
67    /// Sum of store-RPC attempts across all chunks in this wave (>= stored.len() + failed.len()).
68    pub chunk_attempts_total: usize,
69    /// Per-chunk wall-clock (ms) from first attempt to successful store. Only populated for stored chunks.
70    pub store_durations_ms: Vec<u64>,
71    /// Histogram of which retry-round each stored chunk succeeded on (index 0 = first attempt).
72    pub retries_per_chunk: Vec<u32>,
73}
74
75/// Aggregated retry / wall-clock stats across one or more [`WaveResult`]s.
76///
77/// Used by [`Client::batch_upload_chunks_with_events`] (which may store
78/// multiple waves per call) and surfaced upward into `FileUploadResult` so
79/// downstream tooling can record per-upload retry pressure and per-chunk
80/// store wall-clock without needing log parsing.
81#[derive(Debug, Default, Clone)]
82pub struct WaveAggregateStats {
83    /// Sum of store-RPC attempts across all waves (>= chunks_stored).
84    pub chunk_attempts_total: usize,
85    /// Per-chunk wall-clock (ms) from first attempt to successful store,
86    /// concatenated across waves.
87    pub store_durations_ms: Vec<u64>,
88    /// Count of stored chunks that succeeded on each retry round
89    /// (index 0 = first attempt, 1 = first retry, etc.). Indices match
90    /// the retry rounds emitted by `Client::store_paid_chunks_with_events`
91    /// which caps at `MAX_RETRIES = 3`, so an array of 4 suffices.
92    pub retries_histogram: [usize; 4],
93}
94
95impl WaveAggregateStats {
96    /// Fold one [`WaveResult`]'s stats into the running aggregate.
97    pub fn absorb(&mut self, wave: &WaveResult) {
98        self.chunk_attempts_total = self
99            .chunk_attempts_total
100            .saturating_add(wave.chunk_attempts_total);
101        self.store_durations_ms.extend(&wave.store_durations_ms);
102        for &r in &wave.retries_per_chunk {
103            let idx = (r as usize).min(self.retries_histogram.len() - 1);
104            self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
105        }
106    }
107}
108
109/// Compute a percentile from an unsorted slice of `u64` values.
110///
111/// `p` is in `[0.0, 1.0]`. Returns 0 for an empty slice. Uses nearest-rank;
112/// callers don't need numerical precision here — these are coarse log/metric
113/// summaries.
114fn percentile(values: &[u64], p: f64) -> u64 {
115    if values.is_empty() {
116        return 0;
117    }
118    let mut sorted = values.to_vec();
119    sorted.sort_unstable();
120    let p = p.clamp(0.0, 1.0);
121    // Nearest-rank: ceil(p * n) - 1, clamped to [0, n-1].
122    let n = sorted.len();
123    #[allow(
124        clippy::cast_possible_truncation,
125        clippy::cast_sign_loss,
126        clippy::cast_precision_loss
127    )]
128    let rank = ((p * n as f64).ceil() as usize)
129        .saturating_sub(1)
130        .min(n - 1);
131    sorted[rank]
132}
133
134/// Payment data for external signing.
135///
136/// Contains the information needed to construct and submit the on-chain
137/// payment transaction without requiring a local wallet or private key.
138#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
139pub struct PaymentIntent {
140    /// Individual payment entries: (quote_hash, rewards_address, amount).
141    pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
142    /// Total amount across all payments.
143    pub total_amount: Amount,
144}
145
146impl PaymentIntent {
147    /// Build from a set of prepared chunks.
148    ///
149    /// Collects all non-zero payment entries and computes the total.
150    pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
151        let mut payments = Vec::new();
152        let mut total = Amount::ZERO;
153        for chunk in prepared {
154            for info in &chunk.payment.quotes {
155                if !info.amount.is_zero() {
156                    payments.push((info.quote_hash, info.rewards_address, info.amount));
157                    total += info.amount;
158                }
159            }
160        }
161        Self {
162            payments,
163            total_amount: total,
164        }
165    }
166}
167
168/// Build [`PaidChunk`]s from prepared chunks and externally-provided transaction hashes.
169///
170/// Shared by [`Client::batch_pay`] (wallet flow) and [`finalize_batch_payment`] (external signer).
171///
172/// Returns an error if any non-zero-amount quote hash is missing from `tx_hash_map`,
173/// since chunks uploaded without valid proofs would be rejected by the network.
174fn build_paid_chunks(
175    prepared: Vec<PreparedChunk>,
176    tx_hash_map: &HashMap<QuoteHash, TxHash>,
177) -> Result<Vec<PaidChunk>> {
178    let mut paid_chunks = Vec::with_capacity(prepared.len());
179    for chunk in prepared {
180        let mut tx_hashes = Vec::new();
181        for info in &chunk.payment.quotes {
182            if !info.amount.is_zero() {
183                let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
184                    Error::Payment(format!(
185                        "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
186                        hex::encode(info.quote_hash)
187                    ))
188                })?;
189                tx_hashes.push(tx_hash);
190            }
191        }
192
193        let proof = PaymentProof {
194            proof_of_payment: ProofOfPayment {
195                peer_quotes: chunk.peer_quotes,
196            },
197            tx_hashes,
198        };
199
200        let proof_bytes = serialize_single_node_proof(&proof)
201            .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
202
203        paid_chunks.push(PaidChunk {
204            content: chunk.content,
205            address: chunk.address,
206            quoted_peers: chunk.quoted_peers,
207            proof_bytes,
208        });
209    }
210    Ok(paid_chunks)
211}
212
213/// Finalize a batch payment using externally-provided transaction hashes.
214///
215/// Takes prepared chunks and a map of `quote_hash -> tx_hash` from the
216/// external signer. Builds per-chunk `PaymentProof` bytes without needing a wallet.
217pub fn finalize_batch_payment(
218    prepared: Vec<PreparedChunk>,
219    tx_hash_map: &HashMap<QuoteHash, TxHash>,
220) -> Result<Vec<PaidChunk>> {
221    build_paid_chunks(prepared, tx_hash_map)
222}
223
224impl Client {
225    /// Prepare a single chunk for batch payment.
226    ///
227    /// Collects quotes and uses node-reported prices without making any
228    /// on-chain transaction. Returns `Ok(None)` if the chunk is already
229    /// stored on the network.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if quote collection or payment construction fails.
234    pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
235        let address = compute_address(&content);
236        let data_size = u64::try_from(content.len())
237            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
238
239        let quotes_with_peers = match self
240            .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK)
241            .await
242        {
243            Ok(quotes) => quotes,
244            Err(Error::AlreadyStored) => {
245                debug!("Chunk {} already stored, skipping", hex::encode(address));
246                return Ok(None);
247            }
248            Err(e) => return Err(e),
249        };
250
251        // Capture all quoted peers for close-group replication.
252        let quoted_peers: Vec<(PeerId, Vec<MultiAddr>)> = quotes_with_peers
253            .iter()
254            .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone()))
255            .collect();
256
257        // Build peer_quotes for ProofOfPayment + quotes for SingleNodePayment.
258        // Use node-reported prices directly — no contract price fetch needed.
259        let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
260        let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
261
262        for (peer_id, _addrs, quote, price) in quotes_with_peers {
263            let encoded = peer_id_to_encoded(&peer_id)?;
264            peer_quotes.push((encoded, quote.clone()));
265            quotes_for_payment.push((quote, price));
266        }
267
268        let payment = SingleNodePayment::from_quotes(quotes_for_payment)
269            .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
270
271        Ok(Some(PreparedChunk {
272            content,
273            address,
274            quoted_peers,
275            payment,
276            peer_quotes,
277        }))
278    }
279
280    /// Pay for multiple chunks in a single EVM transaction.
281    ///
282    /// Flattens all quote payments from the prepared chunks into one
283    /// `wallet.pay_for_quotes()` call, then maps transaction hashes
284    /// back to per-chunk [`PaymentProof`] bytes.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if the wallet is not configured or the on-chain
289    /// payment fails.
290    /// Returns `(paid_chunks, storage_cost_atto, gas_cost_wei)`.
291    pub async fn batch_pay(
292        &self,
293        prepared: Vec<PreparedChunk>,
294    ) -> Result<(Vec<PaidChunk>, String, u128)> {
295        if prepared.is_empty() {
296            return Ok((Vec::new(), "0".to_string(), 0));
297        }
298
299        let wallet = self.require_wallet()?;
300
301        // Compute total storage cost from the prepared chunks before paying.
302        let intent = PaymentIntent::from_prepared_chunks(&prepared);
303        let storage_cost_atto = intent.total_amount.to_string();
304
305        // Flatten all quote payments from all chunks into a single batch.
306        let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
307        let mut all_payments = Vec::with_capacity(total_quotes);
308        for chunk in &prepared {
309            for info in &chunk.payment.quotes {
310                all_payments.push((info.quote_hash, info.rewards_address, info.amount));
311            }
312        }
313
314        debug!(
315            "Batch payment for {} chunks ({} quote entries)",
316            prepared.len(),
317            all_payments.len()
318        );
319
320        let (tx_hash_map, gas_info) =
321            wallet
322                .pay_for_quotes(all_payments)
323                .await
324                .map_err(|PayForQuotesError(err, _)| {
325                    Error::Payment(format!("Batch payment failed: {err}"))
326                })?;
327
328        info!(
329            "Batch payment succeeded: {} transactions",
330            tx_hash_map.len()
331        );
332
333        let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
334        let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
335        Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
336    }
337
338    /// Upload chunks in waves with pipelined EVM payments.
339    ///
340    /// Processes chunks in waves of `PAYMENT_WAVE_SIZE` (64). Within each wave:
341    /// 1. **Prepare**: collect quotes for all chunks concurrently
342    /// 2. **Pay**: single EVM transaction for the whole wave
343    /// 3. **Store**: concurrent chunk replication to close group
344    ///
345    /// Stores from wave N overlap with quote collection for wave N+1
346    /// via `tokio::join!`.
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if any payment or store operation fails.
351    /// Returns `(addresses, total_storage_cost_atto, total_gas_cost_wei)`.
352    pub async fn batch_upload_chunks(
353        &self,
354        chunks: Vec<Bytes>,
355    ) -> Result<(Vec<XorName>, String, u128)> {
356        let (addresses, storage, gas, _stats) = self
357            .batch_upload_chunks_with_events(chunks, None, 0, 0, None)
358            .await?;
359        Ok((addresses, storage, gas))
360    }
361
362    /// Same as [`Client::batch_upload_chunks`] but sends [`UploadEvent::ChunkStored`]
363    /// events as each chunk is stored, enabling per-chunk progress bars.
364    ///
365    /// `stored_offset` is the number of chunks already stored in previous waves
366    /// (so events report cumulative progress). `file_total` is the total chunk
367    /// count across ALL waves (for the `total` field in events).
368    ///
369    /// When `resume_key` is `Some`, per-wave payment proofs are persisted
370    /// to `<data_dir>/payments/single/<ts>_<hash(resume_key)>` via
371    /// `crate::data::client::cached_single` so that a partial-upload
372    /// failure can be resumed on the next attempt without paying twice.
373    /// The caller is responsible for deleting the cache entry on full
374    /// success (typically `upload_with_options` in `file.rs`).
375    pub async fn batch_upload_chunks_with_events(
376        &self,
377        chunks: Vec<Bytes>,
378        progress: Option<&mpsc::Sender<UploadEvent>>,
379        stored_offset: usize,
380        file_total: usize,
381        resume_key: Option<&str>,
382    ) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
383        if chunks.is_empty() {
384            return Ok((
385                Vec::new(),
386                "0".to_string(),
387                0,
388                WaveAggregateStats::default(),
389            ));
390        }
391
392        let total_chunks = chunks.len();
393        let quote_cap = self.controller().quote.current();
394        let store_cap = self.controller().store.current();
395        debug!(
396            "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
397             (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
398        );
399
400        // Load any previously-cached single-node receipt for this
401        // upload. Each chunk whose address is in the cache will skip
402        // the quote + pay phases and have its `PaidChunk` constructed
403        // directly from the cached proof + fresh quoted peers. The
404        // caller is responsible for deleting the cache on full
405        // success; we only read here, never write the load result back.
406        //
407        // Before trusting any cached proof, decode it locally and drop
408        // any whose quote.timestamp is past the storer's per-quote age
409        // budget (`QUOTE_MAX_AGE_SECS`, mirrored here as
410        // `CACHED_PROOF_EXPIRY_SECS`). The previous design trusted a
411        // substring match on remote error text, which a Byzantine
412        // storer could spoof to force double-payment. Local pre-flight
413        // is decision-pure: we never hand a doomed proof to a storer,
414        // and the cache is updated under our own lock with no remote
415        // text involved.
416        // `cached_cost` carries the cumulative cost from waves paid in
417        // a previous run so the returned tally reflects total spend on
418        // this file, not just freshly-paid chunks. Without this the
419        // user's "this upload cost X" message under-reports by the
420        // resumed waves' cost.
421        let (cached_proofs, cached_storage, cached_gas): (HashMap<XorName, Vec<u8>>, Amount, u128) =
422            match resume_key {
423                Some(key) => match crate::data::client::cached_single::try_load_for_file(key) {
424                    Some((_, receipt)) => {
425                        let prior_storage = receipt
426                            .storage_cost_atto
427                            .parse::<Amount>()
428                            .unwrap_or(Amount::ZERO);
429                        let prior_gas = receipt.gas_cost_wei;
430                        let kept = prune_locally_expired_proofs(key, receipt.proofs);
431                        (kept, prior_storage, prior_gas)
432                    }
433                    None => (HashMap::new(), Amount::ZERO, 0u128),
434                },
435                None => (HashMap::new(), Amount::ZERO, 0u128),
436            };
437
438        let mut all_addresses = Vec::with_capacity(total_chunks);
439        let mut seen_addresses: HashSet<XorName> = HashSet::new();
440
441        // Accumulate costs across waves, seeded with cumulative from
442        // any cached receipt loaded above.
443        let mut total_storage = cached_storage;
444        let mut total_gas: u128 = cached_gas;
445        let mut agg_stats = WaveAggregateStats::default();
446
447        // Deduplicate chunks by content address.
448        let mut unique_chunks = Vec::with_capacity(total_chunks);
449        for chunk in chunks {
450            let address = compute_address(&chunk);
451            if seen_addresses.insert(address) {
452                unique_chunks.push(chunk);
453            } else {
454                debug!("Skipping duplicate chunk {}", hex::encode(address));
455                all_addresses.push(address);
456                if let Some(tx) = progress {
457                    let _ = tx.try_send(UploadEvent::ChunkStored {
458                        stored: stored_offset + all_addresses.len(),
459                        total: file_total,
460                    });
461                }
462            }
463        }
464
465        // Split into waves.
466        let waves: Vec<Vec<Bytes>> = unique_chunks
467            .chunks(PAYMENT_WAVE_SIZE)
468            .map(<[Bytes]>::to_vec)
469            .collect();
470        let wave_count = waves.len();
471
472        debug!(
473            "{total_chunks} chunks -> {} unique -> {wave_count} waves",
474            seen_addresses.len()
475        );
476
477        let mut pending_store: Option<Vec<PaidChunk>> = None;
478        let mut total_quoted: usize = 0;
479
480        for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
481            let wave_num = wave_idx + 1;
482            let wave_size = wave_chunks.len();
483
484            // Pipeline: store previous wave while preparing this one.
485            let (prepare_result, store_result) = match pending_store.take() {
486                Some(paid_chunks) => {
487                    let store_offset = stored_offset + all_addresses.len();
488                    let quoted_offset = stored_offset + total_quoted;
489                    let (prep, stored) = tokio::join!(
490                        self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
491                        self.store_paid_chunks_with_events(
492                            paid_chunks,
493                            progress,
494                            store_offset,
495                            file_total
496                        )
497                    );
498                    (prep, Some(stored))
499                }
500                None => {
501                    let quoted_offset = stored_offset + total_quoted;
502                    let result = self
503                        .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
504                        .await;
505                    (result, None)
506                }
507            };
508            total_quoted += wave_size;
509
510            // Track partial progress from previous wave.
511            if let Some(wave_result) = store_result {
512                all_addresses.extend(&wave_result.stored);
513                agg_stats.absorb(&wave_result);
514                if !wave_result.failed.is_empty() {
515                    let failed_count = wave_result.failed.len();
516                    warn!("{failed_count} chunks failed to store after retries");
517                    return Err(Error::PartialUpload {
518                        stored: all_addresses.clone(),
519                        stored_count: stored_offset + all_addresses.len(),
520                        failed: wave_result.failed,
521                        failed_count,
522                        total_chunks: file_total,
523                        reason: "wave store failed after retries".into(),
524                    });
525                }
526            }
527
528            let (prepared_chunks, already_stored) = prepare_result?;
529            all_addresses.extend(&already_stored);
530            if let Some(tx) = progress {
531                for _ in &already_stored {
532                    let _ = tx.try_send(UploadEvent::ChunkStored {
533                        stored: stored_offset + all_addresses.len(),
534                        total: file_total,
535                    });
536                }
537            }
538
539            if prepared_chunks.is_empty() {
540                info!("Wave {wave_num}/{wave_count}: all chunks already stored");
541                continue;
542            }
543
544            // Split prepared chunks into "already paid in a previous
545            // attempt" (cached) and "needs payment" (fresh). Cached
546            // chunks build a `PaidChunk` from the cached proof + the
547            // freshly-quoted peers, bypassing the EVM transaction.
548            let mut needs_pay: Vec<PreparedChunk> = Vec::with_capacity(prepared_chunks.len());
549            let mut cached_paid: Vec<PaidChunk> = Vec::new();
550            for prep in prepared_chunks {
551                if let Some(proof_bytes) = cached_proofs.get(&prep.address).cloned() {
552                    cached_paid.push(PaidChunk {
553                        content: prep.content,
554                        address: prep.address,
555                        quoted_peers: prep.quoted_peers,
556                        proof_bytes,
557                    });
558                } else {
559                    needs_pay.push(prep);
560                }
561            }
562            if !cached_paid.is_empty() {
563                info!(
564                    "Wave {wave_num}/{wave_count}: reusing {} cached payment proofs",
565                    cached_paid.len()
566                );
567            }
568
569            let (mut paid_chunks, wave_storage, wave_gas) = if needs_pay.is_empty() {
570                (Vec::new(), "0".to_string(), 0u128)
571            } else {
572                info!(
573                    "Wave {wave_num}/{wave_count}: paying for {} chunks",
574                    needs_pay.len()
575                );
576                self.batch_pay(needs_pay).await?
577            };
578            if let Ok(cost) = wave_storage.parse::<Amount>() {
579                total_storage += cost;
580            }
581            total_gas = total_gas.saturating_add(wave_gas);
582
583            // Persist the freshly-paid wave's proofs so a later
584            // failure can resume without re-paying.
585            if let Some(key) = resume_key {
586                if !paid_chunks.is_empty() {
587                    let new_proofs: HashMap<[u8; 32], Vec<u8>> = paid_chunks
588                        .iter()
589                        .map(|pc| (pc.address, pc.proof_bytes.clone()))
590                        .collect();
591                    crate::data::client::cached_single::try_append_wave(
592                        key,
593                        new_proofs,
594                        &wave_storage,
595                        wave_gas,
596                    );
597                }
598            }
599
600            paid_chunks.extend(cached_paid);
601            pending_store = Some(paid_chunks);
602        }
603
604        // Store the last wave.
605        if let Some(paid_chunks) = pending_store {
606            let store_offset = stored_offset + all_addresses.len();
607            let wave_result = self
608                .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
609                .await;
610            all_addresses.extend(&wave_result.stored);
611            agg_stats.absorb(&wave_result);
612            if !wave_result.failed.is_empty() {
613                let failed_count = wave_result.failed.len();
614                warn!("{failed_count} chunks failed to store after retries (final wave)");
615                return Err(Error::PartialUpload {
616                    stored: all_addresses.clone(),
617                    stored_count: stored_offset + all_addresses.len(),
618                    failed: wave_result.failed,
619                    failed_count,
620                    total_chunks: file_total,
621                    reason: "final wave store failed after retries".into(),
622                });
623            }
624        }
625
626        debug!("Batch upload complete: {} addresses", all_addresses.len());
627        Ok((
628            all_addresses,
629            total_storage.to_string(),
630            total_gas,
631            agg_stats,
632        ))
633    }
634
635    /// Prepare a wave of chunks by collecting quotes concurrently.
636    ///
637    /// Fires [`UploadEvent::ChunkQuoted`] as each chunk's quote completes.
638    /// Returns `(prepared_chunks, already_stored_addresses)`.
639    async fn prepare_wave(
640        &self,
641        chunks: Vec<Bytes>,
642        progress: Option<&mpsc::Sender<UploadEvent>>,
643        quoted_offset: usize,
644        file_total: usize,
645    ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
646        let chunk_count = chunks.len();
647        let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
648            .into_iter()
649            .map(|c| {
650                let addr = compute_address(&c);
651                (c, addr)
652            })
653            .collect();
654
655        let quote_limiter = self.controller().quote.clone();
656        // Batch-aware fan-out: clamp to chunk_count so we never
657        // pay for fan-out slots we cannot fill on a partial wave.
658        // See PERF-RESULTS.md — measured ~30% slowdown when
659        // cap > batch size on quoting workloads (live mainnet).
660        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
661        let mut quote_stream = stream::iter(chunks_with_addr)
662            .map(|(content, address)| {
663                let limiter = quote_limiter.clone();
664                async move {
665                    let result = observe_op(
666                        &limiter,
667                        || async move { self.prepare_chunk_payment(content).await },
668                        classify_error,
669                    )
670                    .await;
671                    (address, result)
672                }
673            })
674            .buffer_unordered(quote_concurrency);
675
676        let mut prepared = Vec::with_capacity(chunk_count);
677        let mut already_stored = Vec::new();
678        let mut quoted_count = 0usize;
679
680        while let Some((address, result)) = quote_stream.next().await {
681            let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
682            match result? {
683                Some(chunk) => prepared.push(chunk),
684                None => already_stored.push(address),
685            }
686            quoted_count += 1;
687            let progress_num = quoted_offset + quoted_count;
688            if file_total > 0 {
689                if chunk_already_stored {
690                    info!("Verified {progress_num}/{file_total} (already stored)");
691                } else {
692                    info!("Quoted {progress_num}/{file_total}");
693                }
694            }
695            if let Some(tx) = progress {
696                let _ = tx.try_send(UploadEvent::ChunkQuoted {
697                    quoted: progress_num,
698                    total: file_total,
699                });
700            }
701        }
702
703        Ok((prepared, already_stored))
704    }
705
706    /// Store a batch of paid chunks concurrently to their close groups.
707    ///
708    /// Retries failed chunks up to 3 times with exponential backoff (500ms, 1s, 2s).
709    /// Returns a [`WaveResult`] with both successes and failures so callers can
710    /// track partial progress instead of losing information about stored chunks.
711    ///
712    /// When `progress` is `Some`, sends [`UploadEvent::ChunkStored`] as each
713    /// chunk is successfully stored. `stored_before` is the count of chunks
714    /// already stored in previous waves so the event reports an accurate
715    /// cumulative total; `total_chunks` is the total across all waves. Pass
716    /// `None`/0/0 when progress reporting is not needed.
717    pub(crate) async fn store_paid_chunks_with_events(
718        &self,
719        paid_chunks: Vec<PaidChunk>,
720        progress: Option<&mpsc::Sender<UploadEvent>>,
721        stored_before: usize,
722        total_chunks: usize,
723    ) -> WaveResult {
724        const MAX_RETRIES: u32 = 3;
725        const BASE_DELAY_MS: u64 = 500;
726
727        let mut stored = Vec::new();
728        let mut to_retry = paid_chunks;
729
730        // Per-chunk first-seen timestamps, keyed by chunk address.
731        // Inserted on first sight; never overwritten so wall-clock spans
732        // first attempt → eventual success across all retry rounds.
733        let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
734        for chunk in &to_retry {
735            first_seen.entry(chunk.address).or_insert_with(Instant::now);
736        }
737
738        let mut chunk_attempts_total: usize = 0;
739        let mut store_durations_ms: Vec<u64> = Vec::new();
740        let mut retries_per_chunk: Vec<u32> = Vec::new();
741
742        for attempt in 0..=MAX_RETRIES {
743            if attempt > 0 {
744                let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
745                tokio::time::sleep(delay).await;
746                info!(
747                    "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
748                    to_retry.len()
749                );
750            }
751
752            // Each chunk in this round counts as one store-RPC attempt.
753            chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());
754
755            let store_limiter = self.controller().store.clone();
756            let store_concurrency = store_limiter.current().min(to_retry.len().max(1));
757            let mut upload_stream = stream::iter(to_retry)
758                .map(|chunk| {
759                    let chunk_clone = chunk.clone();
760                    let limiter = store_limiter.clone();
761                    async move {
762                        let result = observe_op(
763                            &limiter,
764                            || async move {
765                                self.chunk_put_to_close_group(
766                                    chunk.content,
767                                    chunk.proof_bytes,
768                                    &chunk.quoted_peers,
769                                )
770                                .await
771                            },
772                            classify_error,
773                        )
774                        .await;
775                        (chunk_clone, result)
776                    }
777                })
778                .buffer_unordered(store_concurrency);
779
780            let mut failed_this_round = Vec::new();
781            while let Some((chunk, result)) = upload_stream.next().await {
782                match result {
783                    Ok(name) => {
784                        let duration_ms = first_seen
785                            .get(&chunk.address)
786                            .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
787                            .unwrap_or(0);
788                        store_durations_ms.push(duration_ms);
789                        retries_per_chunk.push(attempt);
790                        stored.push(name);
791                        let stored_num = stored_before + stored.len();
792                        if total_chunks > 0 {
793                            info!("Stored {stored_num}/{total_chunks}");
794                        }
795                        if let Some(tx) = progress {
796                            let _ = tx.try_send(UploadEvent::ChunkStored {
797                                stored: stored_num,
798                                total: total_chunks,
799                            });
800                        }
801                    }
802                    Err(e) => failed_this_round.push((chunk, e.to_string())),
803                }
804            }
805
806            if failed_this_round.is_empty() {
807                let result = WaveResult {
808                    stored,
809                    failed: Vec::new(),
810                    chunk_attempts_total,
811                    store_durations_ms,
812                    retries_per_chunk,
813                };
814                log_wave_summary(&result);
815                return result;
816            }
817
818            if attempt == MAX_RETRIES {
819                let failed = failed_this_round
820                    .into_iter()
821                    .map(|(c, e)| (c.address, e))
822                    .collect();
823                let result = WaveResult {
824                    stored,
825                    failed,
826                    chunk_attempts_total,
827                    store_durations_ms,
828                    retries_per_chunk,
829                };
830                log_wave_summary(&result);
831                return result;
832            }
833
834            warn!(
835                "{} chunks failed on attempt {}, will retry",
836                failed_this_round.len(),
837                attempt + 1
838            );
839            to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
840        }
841
842        // Unreachable due to loop structure, but satisfy the compiler.
843        let result = WaveResult {
844            stored,
845            failed: Vec::new(),
846            chunk_attempts_total,
847            store_durations_ms,
848            retries_per_chunk,
849        };
850        log_wave_summary(&result);
851        result
852    }
853}
854
855/// Emit one structured info line summarising a wave's store-side stats.
856///
857/// Surfaces p50/p95/max chunk wall-clock and per-round retry counts so
858/// log-based analysis tooling (Elasticsearch / Kibana) can identify
859/// client-side quorum or retry cost without needing the `--json` output.
860fn log_wave_summary(result: &WaveResult) {
861    let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
862    let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
863    let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
864    let chunk_attempts_total = result.chunk_attempts_total;
865    info!(
866        chunks_stored = result.stored.len(),
867        chunks_failed = result.failed.len(),
868        chunk_attempts_total,
869        retries_round_1,
870        retries_round_2,
871        retries_round_3,
872        store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
873        store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
874        store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
875        "chunk_store_wave_complete"
876    );
877}
878
879/// Safety margin subtracted from the storer's `QUOTE_MAX_AGE_SECS` (24 h)
880/// when deciding to trust a cached proof.
881///
882/// A proof whose oldest `quote.timestamp` is closer than this to the
883/// storer's hard limit is treated as already-expired locally. The
884/// margin covers (a) clock skew between client and storer, (b) the
885/// in-flight time between the local check and the storer's
886/// `validate_quote_timestamps` call, and (c) the time spent uploading
887/// the chunk body. 5 minutes is generous for all three combined and
888/// cheap: a wrongly-kept proof costs an extra retry round trip, a
889/// wrongly-dropped proof costs one re-pay (cheap chunk).
890const CACHED_PROOF_SAFETY_MARGIN_SECS: u64 = 300;
891
892/// Storer-side budget for a quote's age. Mirrors `QUOTE_MAX_AGE_SECS`
893/// in `ant-node/src/payment/verifier.rs`. If this value drifts on the
894/// node side, the worst case is the client either keeps proofs slightly
895/// past the storer limit (forced re-pay on next retry, no money lost)
896/// or drops them slightly early (one extra re-pay, no money lost).
897/// Either way, no payment is double-spent or stranded.
898const CACHED_PROOF_MAX_AGE_SECS: u64 = 24 * 60 * 60;
899
900/// How far a cached quote's `timestamp` may be in the future before we
901/// classify it as too-skewed-to-trust and prune.
902///
903/// Mirrors `QUOTE_FUTURE_SKEW_TOLERANCE_SECS = 300` in
904/// `ant-node/src/payment/verifier.rs`. If the client's clock runs
905/// slow relative to the storer that issued the quote, a perfectly
906/// valid proof can appear future-dated to the client — rejecting any
907/// forward drift would re-pay those chunks on every retry. Allow the
908/// same 5-minute window the storer does so the client and node agree
909/// on which proofs are fresh.
910const CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS: u64 = 300;
911
912/// Drop cached `proof_bytes` whose quote timestamps are too close to
913/// the storer's expiry window to safely reuse.
914///
915/// Why this exists
916/// ---------------
917/// The cache stores `(chunk_address, proof_bytes)` so a retried upload
918/// can skip re-paying. The proof bytes embed `quote.timestamp`s. Each
919/// storer evaluates each `quote.timestamp` independently against its
920/// 24 h `QUOTE_MAX_AGE_SECS` budget, so close to the 24 h boundary
921/// (or on a multi-day-old cache that survived past the receipt's outer
922/// expiry for some reason) the storer rejects what the client still
923/// believes is fresh.
924///
925/// The previous design trusted a substring match on the storer's
926/// returned error text to detect this and invalidate the cache after
927/// the fact. That allowed a Byzantine storer to spoof the marker and
928/// force the client to re-pay fresh proofs (double-payment). This
929/// implementation is decision-pure: we decode the proof locally and
930/// only re-use it if every embedded quote is comfortably within the
931/// budget. No remote text involved.
932///
933/// Side-effect: dropped entries are removed from the on-disk cache so
934/// they don't reappear on the next load.
935fn prune_locally_expired_proofs(
936    resume_key: &str,
937    proofs: HashMap<[u8; 32], Vec<u8>>,
938) -> HashMap<XorName, Vec<u8>> {
939    let now = std::time::SystemTime::now();
940    let max_safe_age = Duration::from_secs(
941        CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
942    );
943    let max_future_skew = Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS);
944    let mut kept: HashMap<XorName, Vec<u8>> = HashMap::with_capacity(proofs.len());
945    // Pair each expired address with the EXACT bytes we observed at
946    // load time. The cache-side drop only removes the entry if those
947    // bytes still match, so a concurrent re-pay that refreshed the
948    // proof under its own lock is not clobbered (CAS semantics, fixes
949    // the TOCTOU between unlocked-load and locked-drop).
950    let mut expired: Vec<([u8; 32], Vec<u8>)> = Vec::new();
951    for (addr, bytes) in proofs {
952        match deserialize_proof(&bytes) {
953            Ok((proof, _tx_hashes)) => {
954                if proof_is_safely_fresh(&proof, now, max_safe_age, max_future_skew) {
955                    kept.insert(addr, bytes);
956                } else {
957                    expired.push((addr, bytes));
958                }
959            }
960            Err(_) => {
961                // Unreadable cached entry: drop it so it doesn't sit
962                // here forever. The chunk will re-quote+re-pay.
963                expired.push((addr, bytes));
964            }
965        }
966    }
967    if !expired.is_empty() {
968        info!(
969            "Pruning {} stale cached proofs (quote.timestamp past safe-reuse window) \
970             before resume",
971            expired.len()
972        );
973        crate::data::client::cached_single::try_drop_proofs_for_file(resume_key, &expired);
974    }
975    kept
976}
977
978/// True iff every quote in the proof has a timestamp not older than
979/// `now - max_safe_age` AND not further in the future than
980/// `max_future_skew`. The forward-skew check mirrors the storer's
981/// `QUOTE_FUTURE_SKEW_TOLERANCE_SECS` (300s) so a slow-running client
982/// clock doesn't cause us to wrongly prune perfectly fresh proofs
983/// that the storer would still accept.
984fn proof_is_safely_fresh(
985    proof: &ProofOfPayment,
986    now: std::time::SystemTime,
987    max_safe_age: Duration,
988    max_future_skew: Duration,
989) -> bool {
990    for (_peer, quote) in &proof.peer_quotes {
991        match now.duration_since(quote.timestamp) {
992            Ok(age) => {
993                if age > max_safe_age {
994                    return false;
995                }
996            }
997            Err(future) => {
998                if future.duration() > max_future_skew {
999                    return false;
1000                }
1001            }
1002        }
1003    }
1004    true
1005}
1006
1007/// Compile-time assertions that batch method futures are Send.
1008#[cfg(test)]
1009mod send_assertions {
1010    use super::*;
1011
1012    fn _assert_send<T: Send>(_: &T) {}
1013
1014    #[allow(dead_code)]
1015    async fn _batch_upload_is_send(client: &Client) {
1016        let fut = client.batch_upload_chunks(Vec::new());
1017        _assert_send(&fut);
1018    }
1019}
1020
1021#[cfg(test)]
1022#[allow(clippy::unwrap_used)]
1023mod tests {
1024    use super::*;
1025    use ant_protocol::payment::QuotePaymentInfo;
1026    use ant_protocol::CLOSE_GROUP_SIZE;
1027
1028    /// Median index in the quotes array.
1029    const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
1030
1031    /// Helper: build a `PreparedChunk` with `median_amount` at the median
1032    /// quote index and zero for all other quotes. Adapts automatically to
1033    /// `CLOSE_GROUP_SIZE` changes.
1034    fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
1035        let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
1036            let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
1037            QuotePaymentInfo {
1038                quote_hash: QuoteHash::from([i as u8 + 1; 32]),
1039                rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
1040                amount: Amount::from(amount),
1041                price: Amount::from(amount),
1042            }
1043        });
1044
1045        PreparedChunk {
1046            content: Bytes::from(vec![0xAA; 32]),
1047            address: [0u8; 32],
1048            quoted_peers: Vec::new(),
1049            payment: SingleNodePayment { quotes },
1050            peer_quotes: Vec::new(),
1051        }
1052    }
1053
1054    #[test]
1055    fn payment_intent_from_single_chunk() {
1056        let chunk = make_prepared_chunk(300);
1057        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1058
1059        assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
1060        assert_eq!(intent.total_amount, Amount::from(300));
1061
1062        let (hash, addr, amt) = &intent.payments[0];
1063        assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
1064        assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
1065        assert_eq!(*amt, Amount::from(300));
1066    }
1067
1068    #[test]
1069    fn payment_intent_from_multiple_chunks() {
1070        let c1 = make_prepared_chunk(100);
1071        let c2 = make_prepared_chunk(250);
1072        let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
1073
1074        assert_eq!(intent.payments.len(), 2);
1075        assert_eq!(intent.total_amount, Amount::from(350));
1076    }
1077
1078    #[test]
1079    fn payment_intent_skips_all_zero_chunks() {
1080        let chunk = make_prepared_chunk(0);
1081        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1082
1083        assert!(intent.payments.is_empty());
1084        assert_eq!(intent.total_amount, Amount::ZERO);
1085    }
1086
1087    #[test]
1088    fn payment_intent_empty_input() {
1089        let intent = PaymentIntent::from_prepared_chunks(&[]);
1090        assert!(intent.payments.is_empty());
1091        assert_eq!(intent.total_amount, Amount::ZERO);
1092    }
1093
1094    #[test]
1095    fn finalize_batch_payment_builds_proofs() {
1096        let chunk = make_prepared_chunk(500);
1097        let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
1098
1099        let mut tx_map = HashMap::new();
1100        tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
1101
1102        let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
1103
1104        assert_eq!(paid.len(), 1);
1105        assert!(!paid[0].proof_bytes.is_empty());
1106        assert_eq!(paid[0].address, [0u8; 32]);
1107    }
1108
1109    #[test]
1110    fn finalize_batch_payment_empty_input() {
1111        let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
1112        assert!(paid.is_empty());
1113    }
1114
1115    #[test]
1116    fn finalize_batch_payment_missing_tx_hash_errors() {
1117        // Missing tx hash for a non-zero-amount quote should error,
1118        // since the chunk would be rejected by the network without a valid proof.
1119        let chunk = make_prepared_chunk(500);
1120
1121        let result = finalize_batch_payment(vec![chunk], &HashMap::new());
1122        assert!(result.is_err());
1123        let err = result.unwrap_err().to_string();
1124        assert!(err.contains("Missing tx hash"), "got: {err}");
1125    }
1126
1127    #[test]
1128    fn finalize_batch_payment_multiple_chunks() {
1129        let c1 = make_prepared_chunk(100);
1130        let c2 = make_prepared_chunk(200);
1131        let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
1132        let mut tx_map = HashMap::new();
1133        // Both chunks have the same quote_hash (same index/byte pattern)
1134        // so one tx_hash covers both
1135        tx_map.insert(q1, TxHash::from([0xCC; 32]));
1136
1137        let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
1138        assert_eq!(paid.len(), 2);
1139    }
1140
1141    // ---- prune_locally_expired_proofs ----
1142    //
1143    // Build synthetic ProofOfPayment instances with controlled
1144    // timestamps to verify the local pre-flight stale-proof check.
1145    // This is the "no remote text trust" replacement for the prior
1146    // substring-matching invalidation path. A bug here is a direct
1147    // wallet leak (drop-too-eager = re-pay; keep-too-long = doomed
1148    // PUT round trip but no payment loss).
1149
1150    fn make_proof_with_timestamps(timestamps: &[std::time::SystemTime]) -> ProofOfPayment {
1151        let peer_quotes = timestamps
1152            .iter()
1153            .enumerate()
1154            .map(|(i, ts)| {
1155                let quote = PaymentQuote {
1156                    content: xor_name::XorName([0u8; 32]),
1157                    timestamp: *ts,
1158                    price: Amount::from(1u64),
1159                    rewards_address: RewardsAddress::new([1u8; 20]),
1160                    pub_key: vec![],
1161                    signature: vec![],
1162                };
1163                (EncodedPeerId::from([i as u8; 32]), quote)
1164            })
1165            .collect();
1166        ProofOfPayment { peer_quotes }
1167    }
1168
1169    fn default_max_future_skew() -> Duration {
1170        Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS)
1171    }
1172
1173    #[test]
1174    fn proof_is_safely_fresh_accepts_recent_quote() {
1175        let proof = make_proof_with_timestamps(&[std::time::SystemTime::now()]);
1176        assert!(proof_is_safely_fresh(
1177            &proof,
1178            std::time::SystemTime::now(),
1179            Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1180            default_max_future_skew(),
1181        ));
1182    }
1183
1184    #[test]
1185    fn proof_is_safely_fresh_rejects_quote_past_safe_window() {
1186        // 23h57m old: past the 24h - 5min safe-reuse threshold but
1187        // still within the storer's hard 24h limit. The whole point
1188        // of the safety margin is to drop these locally before
1189        // burning a doomed PUT round trip.
1190        let too_old = std::time::SystemTime::now() - Duration::from_secs(23 * 60 * 60 + 57 * 60);
1191        let proof = make_proof_with_timestamps(&[too_old]);
1192        let max_safe = Duration::from_secs(
1193            CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1194        );
1195        assert!(
1196            !proof_is_safely_fresh(
1197                &proof,
1198                std::time::SystemTime::now(),
1199                max_safe,
1200                default_max_future_skew(),
1201            ),
1202            "23h57m-old quote must fail safe-reuse check (limit is 24h - 5min margin)"
1203        );
1204    }
1205
1206    #[test]
1207    fn proof_is_safely_fresh_rejects_if_any_quote_is_stale() {
1208        // The storer rejects on a per-quote basis: a proof with even
1209        // one stale quote will fail on every retry. We must drop it.
1210        let now = std::time::SystemTime::now();
1211        let fresh = now;
1212        let stale = now - Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1213        let proof = make_proof_with_timestamps(&[fresh, fresh, stale, fresh]);
1214        let max_safe = Duration::from_secs(
1215            CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1216        );
1217        assert!(!proof_is_safely_fresh(
1218            &proof,
1219            now,
1220            max_safe,
1221            default_max_future_skew(),
1222        ));
1223    }
1224
1225    #[test]
1226    fn proof_is_safely_fresh_accepts_slight_future_skew_within_node_tolerance() {
1227        // Client clock 60s slow. Quote claims 60s in the future of
1228        // our local view. Node tolerates 300s forward skew, so the
1229        // storer would accept this quote — we must too, or we'd
1230        // wrongly prune fresh proofs and force re-payment.
1231        let now = std::time::SystemTime::now();
1232        let slight_future = now + Duration::from_secs(60);
1233        let proof = make_proof_with_timestamps(&[slight_future]);
1234        let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1235        assert!(
1236            proof_is_safely_fresh(&proof, now, max_safe, default_max_future_skew()),
1237            "60s-future quote must be accepted (within node's 300s skew tolerance)"
1238        );
1239    }
1240
1241    #[test]
1242    fn proof_is_safely_fresh_rejects_far_future_dated_quote() {
1243        // 1 hour in the future of our local clock. Exceeds the
1244        // node's 300s forward-skew tolerance and the storer would
1245        // reject it — we drop it locally to avoid a round trip.
1246        let now = std::time::SystemTime::now();
1247        let far_future = now + Duration::from_secs(3600);
1248        let proof = make_proof_with_timestamps(&[far_future]);
1249        let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1250        assert!(!proof_is_safely_fresh(
1251            &proof,
1252            now,
1253            max_safe,
1254            default_max_future_skew(),
1255        ));
1256    }
1257
1258    #[test]
1259    fn proof_is_safely_fresh_empty_quotes_is_vacuously_safe() {
1260        // No quotes = no storer-side timestamp check to fail. The
1261        // proof is structurally invalid for other reasons, but
1262        // this function's contract is "no stale timestamp present",
1263        // which is trivially true for an empty list.
1264        let proof = make_proof_with_timestamps(&[]);
1265        assert!(proof_is_safely_fresh(
1266            &proof,
1267            std::time::SystemTime::now(),
1268            Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1269            default_max_future_skew(),
1270        ));
1271    }
1272}