1use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, PartialUploadSpend, Result};
13use ant_protocol::evm::{
14 Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15 RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{
18 deserialize_proof, serialize_single_node_proof, PaymentProof, SingleNodePayment,
19};
20use ant_protocol::transport::{MultiAddr, PeerId};
21use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
22use bytes::Bytes;
23use futures::stream::{self, FuturesUnordered, StreamExt};
24use std::collections::{HashMap, HashSet};
25use std::time::{Duration, Instant};
26use tokio::sync::mpsc;
27use tracing::{debug, info, warn};
28
29const PAYMENT_WAVE_SIZE: usize = 64;
31
32const STORE_INFLIGHT_BYTE_BUDGET: usize = 64 * 1024 * 1024;
38
39#[derive(Debug)]
41pub struct PreparedChunk {
42 pub content: Bytes,
44 pub address: XorName,
46 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
52 pub payment: SingleNodePayment,
54 pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
56}
57
58#[derive(Debug, Clone)]
60pub struct PaidChunk {
61 pub content: Bytes,
63 pub address: XorName,
65 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
71 pub proof_bytes: Vec<u8>,
73}
74
75#[derive(Debug)]
77pub struct WaveResult {
78 pub stored: Vec<XorName>,
80 pub failed: Vec<(XorName, String)>,
82 pub chunk_attempts_total: usize,
84 pub store_durations_ms: Vec<u64>,
86 pub retries_per_chunk: Vec<u32>,
88}
89
90#[derive(Debug, Default, Clone)]
97pub struct WaveAggregateStats {
98 pub chunk_attempts_total: usize,
100 pub store_durations_ms: Vec<u64>,
103 pub retries_histogram: [usize; 4],
108}
109
110impl WaveAggregateStats {
111 pub fn absorb(&mut self, wave: &WaveResult) {
113 self.chunk_attempts_total = self
114 .chunk_attempts_total
115 .saturating_add(wave.chunk_attempts_total);
116 self.store_durations_ms.extend(&wave.store_durations_ms);
117 for &r in &wave.retries_per_chunk {
118 let idx = (r as usize).min(self.retries_histogram.len() - 1);
119 self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
120 }
121 }
122}
123
124fn percentile(values: &[u64], p: f64) -> u64 {
130 if values.is_empty() {
131 return 0;
132 }
133 let mut sorted = values.to_vec();
134 sorted.sort_unstable();
135 let p = p.clamp(0.0, 1.0);
136 let n = sorted.len();
138 #[allow(
139 clippy::cast_possible_truncation,
140 clippy::cast_sign_loss,
141 clippy::cast_precision_loss
142 )]
143 let rank = ((p * n as f64).ceil() as usize)
144 .saturating_sub(1)
145 .min(n - 1);
146 sorted[rank]
147}
148
149#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
154pub struct PaymentIntent {
155 pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
157 pub total_amount: Amount,
159}
160
161impl PaymentIntent {
162 pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
166 let mut payments = Vec::new();
167 let mut total = Amount::ZERO;
168 for chunk in prepared {
169 for info in &chunk.payment.quotes {
170 if !info.amount.is_zero() {
171 payments.push((info.quote_hash, info.rewards_address, info.amount));
172 total += info.amount;
173 }
174 }
175 }
176 Self {
177 payments,
178 total_amount: total,
179 }
180 }
181}
182
183fn build_paid_chunks(
190 prepared: Vec<PreparedChunk>,
191 tx_hash_map: &HashMap<QuoteHash, TxHash>,
192) -> Result<Vec<PaidChunk>> {
193 let mut paid_chunks = Vec::with_capacity(prepared.len());
194 for chunk in prepared {
195 let mut tx_hashes = Vec::new();
196 for info in &chunk.payment.quotes {
197 if !info.amount.is_zero() {
198 let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
199 Error::Payment(format!(
200 "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
201 hex::encode(info.quote_hash)
202 ))
203 })?;
204 tx_hashes.push(tx_hash);
205 }
206 }
207
208 let proof = PaymentProof {
209 proof_of_payment: ProofOfPayment {
210 peer_quotes: chunk.peer_quotes,
211 },
212 tx_hashes,
213 };
214
215 let proof_bytes = serialize_single_node_proof(&proof)
216 .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
217
218 paid_chunks.push(PaidChunk {
219 content: chunk.content,
220 address: chunk.address,
221 quoted_peers: chunk.quoted_peers,
222 proof_bytes,
223 });
224 }
225 Ok(paid_chunks)
226}
227
228pub fn finalize_batch_payment(
233 prepared: Vec<PreparedChunk>,
234 tx_hash_map: &HashMap<QuoteHash, TxHash>,
235) -> Result<Vec<PaidChunk>> {
236 build_paid_chunks(prepared, tx_hash_map)
237}
238
239impl Client {
240 pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
250 let address = compute_address(&content);
251 let data_size = u64::try_from(content.len())
252 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
253
254 let quote_plan = match self
255 .get_store_quote_plan(&address, data_size, DATA_TYPE_CHUNK)
256 .await
257 {
258 Ok(plan) => plan,
259 Err(Error::AlreadyStored) => {
260 debug!("Chunk {} already stored, skipping", hex::encode(address));
261 return Ok(None);
262 }
263 Err(e) => return Err(e),
264 };
265 let quotes_with_peers = quote_plan.quotes;
266
267 let quoted_peers = quote_plan.put_peers;
270
271 let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
274 let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
275
276 for (peer_id, _addrs, quote, price) in quotes_with_peers {
277 let encoded = peer_id_to_encoded(&peer_id)?;
278 peer_quotes.push((encoded, quote.clone()));
279 quotes_for_payment.push((quote, price));
280 }
281
282 let payment = SingleNodePayment::from_quotes(quotes_for_payment)
283 .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
284
285 Ok(Some(PreparedChunk {
286 content,
287 address,
288 quoted_peers,
289 payment,
290 peer_quotes,
291 }))
292 }
293
294 pub async fn batch_pay(
306 &self,
307 prepared: Vec<PreparedChunk>,
308 ) -> Result<(Vec<PaidChunk>, String, u128)> {
309 if prepared.is_empty() {
310 return Ok((Vec::new(), "0".to_string(), 0));
311 }
312
313 let wallet = self.require_wallet()?;
314
315 let intent = PaymentIntent::from_prepared_chunks(&prepared);
317 let storage_cost_atto = intent.total_amount.to_string();
318
319 let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
321 let mut all_payments = Vec::with_capacity(total_quotes);
322 for chunk in &prepared {
323 for info in &chunk.payment.quotes {
324 all_payments.push((info.quote_hash, info.rewards_address, info.amount));
325 }
326 }
327
328 debug!(
329 "Batch payment for {} chunks ({} quote entries)",
330 prepared.len(),
331 all_payments.len()
332 );
333
334 let (tx_hash_map, gas_info) =
335 wallet
336 .pay_for_quotes(all_payments)
337 .await
338 .map_err(|PayForQuotesError(err, _)| {
339 Error::Payment(format!("Batch payment failed: {err}"))
340 })?;
341
342 info!(
343 "Batch payment succeeded: {} transactions",
344 tx_hash_map.len()
345 );
346
347 let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
348 let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
349 Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
350 }
351
352 pub async fn batch_upload_chunks(
367 &self,
368 chunks: Vec<Bytes>,
369 ) -> Result<(Vec<XorName>, String, u128)> {
370 let (addresses, storage, gas, _stats) = self
371 .batch_upload_chunks_with_events(chunks, None, 0, 0, None)
372 .await?;
373 Ok((addresses, storage, gas))
374 }
375
376 pub async fn batch_upload_chunks_with_events(
390 &self,
391 chunks: Vec<Bytes>,
392 progress: Option<&mpsc::Sender<UploadEvent>>,
393 stored_offset: usize,
394 file_total: usize,
395 resume_key: Option<&str>,
396 ) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
397 if chunks.is_empty() {
398 return Ok((
399 Vec::new(),
400 "0".to_string(),
401 0,
402 WaveAggregateStats::default(),
403 ));
404 }
405
406 let total_chunks = chunks.len();
407 let quote_cap = self.controller().quote.current();
408 let store_cap = self.controller().store.current();
409 debug!(
410 "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
411 (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
412 );
413
414 let cached_proofs: HashMap<XorName, Vec<u8>> = match resume_key {
438 Some(key) => match crate::data::client::cached_single::try_load_for_file(key) {
439 Some((_, receipt)) => prune_locally_expired_proofs(key, receipt.proofs),
440 None => HashMap::new(),
441 },
442 None => HashMap::new(),
443 };
444
445 let mut all_addresses = Vec::with_capacity(total_chunks);
446 let mut seen_addresses: HashSet<XorName> = HashSet::new();
447
448 let mut total_storage = Amount::ZERO;
451 let mut total_gas: u128 = 0;
452 let mut agg_stats = WaveAggregateStats::default();
453
454 let mut unique_chunks = Vec::with_capacity(total_chunks);
456 for chunk in chunks {
457 let address = compute_address(&chunk);
458 if seen_addresses.insert(address) {
459 unique_chunks.push(chunk);
460 } else {
461 debug!("Skipping duplicate chunk {}", hex::encode(address));
462 all_addresses.push(address);
463 if let Some(tx) = progress {
464 let _ = tx.try_send(UploadEvent::ChunkStored {
465 stored: stored_offset + all_addresses.len(),
466 total: file_total,
467 });
468 }
469 }
470 }
471
472 let waves: Vec<Vec<Bytes>> = unique_chunks
474 .chunks(PAYMENT_WAVE_SIZE)
475 .map(<[Bytes]>::to_vec)
476 .collect();
477 let wave_count = waves.len();
478
479 debug!(
480 "{total_chunks} chunks -> {} unique -> {wave_count} waves",
481 seen_addresses.len()
482 );
483
484 let mut pending_store: Option<Vec<PaidChunk>> = None;
485 let mut total_quoted: usize = 0;
486
487 for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
488 let wave_num = wave_idx + 1;
489 let wave_size = wave_chunks.len();
490
491 let (prepare_result, store_result) = match pending_store.take() {
493 Some(paid_chunks) => {
494 let store_offset = stored_offset + all_addresses.len();
495 let quoted_offset = stored_offset + total_quoted;
496 let (prep, stored) = tokio::join!(
497 self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
498 self.store_paid_chunks_with_events(
499 paid_chunks,
500 progress,
501 store_offset,
502 file_total
503 )
504 );
505 (prep, Some(stored))
506 }
507 None => {
508 let quoted_offset = stored_offset + total_quoted;
509 let result = self
510 .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
511 .await;
512 (result, None)
513 }
514 };
515 total_quoted += wave_size;
516
517 if let Some(wave_result) = store_result {
519 all_addresses.extend(&wave_result.stored);
520 agg_stats.absorb(&wave_result);
521 if !wave_result.failed.is_empty() {
522 let failed_count = wave_result.failed.len();
523 warn!("{failed_count} chunks failed to store after retries");
524 return Err(Error::PartialUpload {
525 stored: all_addresses.clone(),
526 stored_count: stored_offset + all_addresses.len(),
527 failed: wave_result.failed,
528 failed_count,
529 total_chunks: file_total,
530 spend: Box::new(PartialUploadSpend {
531 storage_cost_atto: total_storage.to_string(),
532 gas_cost_wei: total_gas,
533 }),
534 reason: "wave store failed after retries".into(),
535 });
536 }
537 }
538
539 let (prepared_chunks, already_stored) = prepare_result?;
540 all_addresses.extend(&already_stored);
541 if let Some(tx) = progress {
542 for _ in &already_stored {
543 let _ = tx.try_send(UploadEvent::ChunkStored {
544 stored: stored_offset + all_addresses.len(),
545 total: file_total,
546 });
547 }
548 }
549
550 if prepared_chunks.is_empty() {
551 info!("Wave {wave_num}/{wave_count}: all chunks already stored");
552 continue;
553 }
554
555 let mut needs_pay: Vec<PreparedChunk> = Vec::with_capacity(prepared_chunks.len());
560 let mut cached_paid: Vec<PaidChunk> = Vec::new();
561 for prep in prepared_chunks {
562 if let Some(proof_bytes) = cached_proofs.get(&prep.address).cloned() {
563 cached_paid.push(PaidChunk {
564 content: prep.content,
565 address: prep.address,
566 quoted_peers: prep.quoted_peers,
567 proof_bytes,
568 });
569 } else {
570 needs_pay.push(prep);
571 }
572 }
573 if !cached_paid.is_empty() {
574 info!(
575 "Wave {wave_num}/{wave_count}: reusing {} cached payment proofs",
576 cached_paid.len()
577 );
578 }
579
580 let (mut paid_chunks, wave_storage, wave_gas) = if needs_pay.is_empty() {
581 (Vec::new(), "0".to_string(), 0u128)
582 } else {
583 info!(
584 "Wave {wave_num}/{wave_count}: paying for {} chunks",
585 needs_pay.len()
586 );
587 self.batch_pay(needs_pay).await?
588 };
589 if let Ok(cost) = wave_storage.parse::<Amount>() {
590 total_storage += cost;
591 }
592 total_gas = total_gas.saturating_add(wave_gas);
593
594 if let Some(key) = resume_key {
597 if !paid_chunks.is_empty() {
598 let new_proofs: HashMap<[u8; 32], Vec<u8>> = paid_chunks
599 .iter()
600 .map(|pc| (pc.address, pc.proof_bytes.clone()))
601 .collect();
602 crate::data::client::cached_single::try_append_wave(
603 key,
604 new_proofs,
605 &wave_storage,
606 wave_gas,
607 );
608 }
609 }
610
611 paid_chunks.extend(cached_paid);
612 pending_store = Some(paid_chunks);
613 }
614
615 if let Some(paid_chunks) = pending_store {
617 let store_offset = stored_offset + all_addresses.len();
618 let wave_result = self
619 .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
620 .await;
621 all_addresses.extend(&wave_result.stored);
622 agg_stats.absorb(&wave_result);
623 if !wave_result.failed.is_empty() {
624 let failed_count = wave_result.failed.len();
625 warn!("{failed_count} chunks failed to store after retries (final wave)");
626 return Err(Error::PartialUpload {
627 stored: all_addresses.clone(),
628 stored_count: stored_offset + all_addresses.len(),
629 failed: wave_result.failed,
630 failed_count,
631 total_chunks: file_total,
632 spend: Box::new(PartialUploadSpend {
633 storage_cost_atto: total_storage.to_string(),
634 gas_cost_wei: total_gas,
635 }),
636 reason: "final wave store failed after retries".into(),
637 });
638 }
639 }
640
641 debug!("Batch upload complete: {} addresses", all_addresses.len());
642 Ok((
643 all_addresses,
644 total_storage.to_string(),
645 total_gas,
646 agg_stats,
647 ))
648 }
649
650 async fn prepare_wave(
655 &self,
656 chunks: Vec<Bytes>,
657 progress: Option<&mpsc::Sender<UploadEvent>>,
658 quoted_offset: usize,
659 file_total: usize,
660 ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
661 let chunk_count = chunks.len();
662 let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
663 .into_iter()
664 .map(|c| {
665 let addr = compute_address(&c);
666 (c, addr)
667 })
668 .collect();
669
670 let quote_limiter = self.controller().quote.clone();
671 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
676 let mut quote_stream = stream::iter(chunks_with_addr)
677 .map(|(content, address)| {
678 let limiter = quote_limiter.clone();
679 async move {
680 let result = observe_op(
681 &limiter,
682 || async move { self.prepare_chunk_payment(content).await },
683 classify_error,
684 )
685 .await;
686 (address, result)
687 }
688 })
689 .buffer_unordered(quote_concurrency);
690
691 let mut prepared = Vec::with_capacity(chunk_count);
692 let mut already_stored = Vec::new();
693 let mut quoted_count = 0usize;
694
695 while let Some((address, result)) = quote_stream.next().await {
696 let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
697 match result? {
698 Some(chunk) => prepared.push(chunk),
699 None => already_stored.push(address),
700 }
701 quoted_count += 1;
702 let progress_num = quoted_offset + quoted_count;
703 if file_total > 0 {
704 if chunk_already_stored {
705 info!("Verified {progress_num}/{file_total} (already stored)");
706 } else {
707 info!("Quoted {progress_num}/{file_total}");
708 }
709 }
710 if let Some(tx) = progress {
711 let _ = tx.try_send(UploadEvent::ChunkQuoted {
712 quoted: progress_num,
713 total: file_total,
714 });
715 }
716 }
717
718 Ok((prepared, already_stored))
719 }
720
721 pub(crate) async fn store_paid_chunks_with_events(
733 &self,
734 paid_chunks: Vec<PaidChunk>,
735 progress: Option<&mpsc::Sender<UploadEvent>>,
736 stored_before: usize,
737 total_chunks: usize,
738 ) -> WaveResult {
739 const MAX_RETRIES: u32 = 3;
740 const BASE_DELAY_MS: u64 = 500;
741
742 let mut stored = Vec::new();
743 let mut to_retry = paid_chunks;
744
745 let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
749 for chunk in &to_retry {
750 first_seen.entry(chunk.address).or_insert_with(Instant::now);
751 }
752
753 let max_chunk_bytes = to_retry.iter().map(|c| c.content.len()).max().unwrap_or(0);
763 let byte_bound = STORE_INFLIGHT_BYTE_BUDGET
766 .checked_div(max_chunk_bytes)
767 .map_or(usize::MAX, |n| n.max(1));
768
769 let mut chunk_attempts_total: usize = 0;
770 let mut store_durations_ms: Vec<u64> = Vec::new();
771 let mut retries_per_chunk: Vec<u32> = Vec::new();
772
773 for attempt in 0..=MAX_RETRIES {
774 if attempt > 0 {
775 let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
776 tokio::time::sleep(delay).await;
777 info!(
778 "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
779 to_retry.len()
780 );
781 }
782
783 chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());
785
786 let store_limiter = self.controller().store.clone();
787 let make_store = |chunk: PaidChunk| {
795 let chunk_clone = chunk.clone();
796 let limiter = store_limiter.clone();
797 async move {
798 let result = observe_op(
799 &limiter,
800 || async move {
801 self.chunk_put_to_close_group(
802 chunk.content,
803 chunk.proof_bytes,
804 &chunk.quoted_peers,
805 )
806 .await
807 },
808 classify_error,
809 )
810 .await;
811 (chunk_clone, result)
812 }
813 };
814 let mut chunk_iter = to_retry.into_iter();
815 let mut in_flight = FuturesUnordered::new();
816
817 let mut failed_this_round = Vec::new();
818 loop {
819 let slots = store_limiter.current().min(byte_bound).max(1);
820 while in_flight.len() < slots {
821 match chunk_iter.next() {
822 Some(chunk) => in_flight.push(make_store(chunk)),
823 None => break,
824 }
825 }
826 let Some((chunk, result)) = in_flight.next().await else {
827 break;
828 };
829 match result {
830 Ok(name) => {
831 let duration_ms = first_seen
832 .get(&chunk.address)
833 .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
834 .unwrap_or(0);
835 store_durations_ms.push(duration_ms);
836 retries_per_chunk.push(attempt);
837 stored.push(name);
838 let stored_num = stored_before + stored.len();
839 if total_chunks > 0 {
840 info!("Stored {stored_num}/{total_chunks}");
841 }
842 if let Some(tx) = progress {
843 let _ = tx.try_send(UploadEvent::ChunkStored {
844 stored: stored_num,
845 total: total_chunks,
846 });
847 }
848 }
849 Err(e) => failed_this_round.push((chunk, e.to_string())),
850 }
851 }
852
853 if failed_this_round.is_empty() {
854 let result = WaveResult {
855 stored,
856 failed: Vec::new(),
857 chunk_attempts_total,
858 store_durations_ms,
859 retries_per_chunk,
860 };
861 log_wave_summary(&result);
862 return result;
863 }
864
865 if attempt == MAX_RETRIES {
866 let failed = failed_this_round
867 .into_iter()
868 .map(|(c, e)| (c.address, e))
869 .collect();
870 let result = WaveResult {
871 stored,
872 failed,
873 chunk_attempts_total,
874 store_durations_ms,
875 retries_per_chunk,
876 };
877 log_wave_summary(&result);
878 return result;
879 }
880
881 warn!(
882 "{} chunks failed on attempt {}, will retry",
883 failed_this_round.len(),
884 attempt + 1
885 );
886 to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
887 }
888
889 let result = WaveResult {
891 stored,
892 failed: Vec::new(),
893 chunk_attempts_total,
894 store_durations_ms,
895 retries_per_chunk,
896 };
897 log_wave_summary(&result);
898 result
899 }
900}
901
902fn log_wave_summary(result: &WaveResult) {
908 let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
909 let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
910 let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
911 let chunk_attempts_total = result.chunk_attempts_total;
912 info!(
913 chunks_stored = result.stored.len(),
914 chunks_failed = result.failed.len(),
915 chunk_attempts_total,
916 retries_round_1,
917 retries_round_2,
918 retries_round_3,
919 store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
920 store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
921 store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
922 "chunk_store_wave_complete"
923 );
924}
925
926const CACHED_PROOF_SAFETY_MARGIN_SECS: u64 = 300;
938
939const CACHED_PROOF_MAX_AGE_SECS: u64 = 24 * 60 * 60;
946
947const CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS: u64 = 300;
958
959fn prune_locally_expired_proofs(
983 resume_key: &str,
984 proofs: HashMap<[u8; 32], Vec<u8>>,
985) -> HashMap<XorName, Vec<u8>> {
986 let now = std::time::SystemTime::now();
987 let max_safe_age = Duration::from_secs(
988 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
989 );
990 let max_future_skew = Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS);
991 let mut kept: HashMap<XorName, Vec<u8>> = HashMap::with_capacity(proofs.len());
992 let mut expired: Vec<([u8; 32], Vec<u8>)> = Vec::new();
998 for (addr, bytes) in proofs {
999 match deserialize_proof(&bytes) {
1000 Ok((proof, _tx_hashes)) => {
1001 if proof_is_safely_fresh(&proof, now, max_safe_age, max_future_skew) {
1002 kept.insert(addr, bytes);
1003 } else {
1004 expired.push((addr, bytes));
1005 }
1006 }
1007 Err(_) => {
1008 expired.push((addr, bytes));
1011 }
1012 }
1013 }
1014 if !expired.is_empty() {
1015 info!(
1016 "Pruning {} stale cached proofs (quote.timestamp past safe-reuse window) \
1017 before resume",
1018 expired.len()
1019 );
1020 crate::data::client::cached_single::try_drop_proofs_for_file(resume_key, &expired);
1021 }
1022 kept
1023}
1024
1025fn proof_is_safely_fresh(
1032 proof: &ProofOfPayment,
1033 now: std::time::SystemTime,
1034 max_safe_age: Duration,
1035 max_future_skew: Duration,
1036) -> bool {
1037 for (_peer, quote) in &proof.peer_quotes {
1038 match now.duration_since(quote.timestamp) {
1039 Ok(age) => {
1040 if age > max_safe_age {
1041 return false;
1042 }
1043 }
1044 Err(future) => {
1045 if future.duration() > max_future_skew {
1046 return false;
1047 }
1048 }
1049 }
1050 }
1051 true
1052}
1053
1054#[cfg(test)]
1056mod send_assertions {
1057 use super::*;
1058
1059 fn _assert_send<T: Send>(_: &T) {}
1060
1061 #[allow(dead_code)]
1062 async fn _batch_upload_is_send(client: &Client) {
1063 let fut = client.batch_upload_chunks(Vec::new());
1064 _assert_send(&fut);
1065 }
1066}
1067
1068#[cfg(test)]
1069#[allow(clippy::unwrap_used)]
1070mod tests {
1071 use super::*;
1072 use ant_protocol::payment::QuotePaymentInfo;
1073 use ant_protocol::CLOSE_GROUP_SIZE;
1074
1075 const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
1077
1078 fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
1082 let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
1083 let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
1084 QuotePaymentInfo {
1085 quote_hash: QuoteHash::from([i as u8 + 1; 32]),
1086 rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
1087 amount: Amount::from(amount),
1088 price: Amount::from(amount),
1089 }
1090 });
1091
1092 PreparedChunk {
1093 content: Bytes::from(vec![0xAA; 32]),
1094 address: [0u8; 32],
1095 quoted_peers: Vec::new(),
1096 payment: SingleNodePayment { quotes },
1097 peer_quotes: Vec::new(),
1098 }
1099 }
1100
1101 #[test]
1102 fn payment_intent_from_single_chunk() {
1103 let chunk = make_prepared_chunk(300);
1104 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1105
1106 assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
1107 assert_eq!(intent.total_amount, Amount::from(300));
1108
1109 let (hash, addr, amt) = &intent.payments[0];
1110 assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
1111 assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
1112 assert_eq!(*amt, Amount::from(300));
1113 }
1114
1115 #[test]
1116 fn payment_intent_from_multiple_chunks() {
1117 let c1 = make_prepared_chunk(100);
1118 let c2 = make_prepared_chunk(250);
1119 let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
1120
1121 assert_eq!(intent.payments.len(), 2);
1122 assert_eq!(intent.total_amount, Amount::from(350));
1123 }
1124
1125 #[test]
1126 fn payment_intent_skips_all_zero_chunks() {
1127 let chunk = make_prepared_chunk(0);
1128 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1129
1130 assert!(intent.payments.is_empty());
1131 assert_eq!(intent.total_amount, Amount::ZERO);
1132 }
1133
1134 #[test]
1135 fn payment_intent_empty_input() {
1136 let intent = PaymentIntent::from_prepared_chunks(&[]);
1137 assert!(intent.payments.is_empty());
1138 assert_eq!(intent.total_amount, Amount::ZERO);
1139 }
1140
1141 #[test]
1142 fn finalize_batch_payment_builds_proofs() {
1143 let chunk = make_prepared_chunk(500);
1144 let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
1145
1146 let mut tx_map = HashMap::new();
1147 tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
1148
1149 let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
1150
1151 assert_eq!(paid.len(), 1);
1152 assert!(!paid[0].proof_bytes.is_empty());
1153 assert_eq!(paid[0].address, [0u8; 32]);
1154 }
1155
1156 #[test]
1157 fn finalize_batch_payment_empty_input() {
1158 let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
1159 assert!(paid.is_empty());
1160 }
1161
1162 #[test]
1163 fn finalize_batch_payment_missing_tx_hash_errors() {
1164 let chunk = make_prepared_chunk(500);
1167
1168 let result = finalize_batch_payment(vec![chunk], &HashMap::new());
1169 assert!(result.is_err());
1170 let err = result.unwrap_err().to_string();
1171 assert!(err.contains("Missing tx hash"), "got: {err}");
1172 }
1173
1174 #[test]
1175 fn finalize_batch_payment_multiple_chunks() {
1176 let c1 = make_prepared_chunk(100);
1177 let c2 = make_prepared_chunk(200);
1178 let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
1179 let mut tx_map = HashMap::new();
1180 tx_map.insert(q1, TxHash::from([0xCC; 32]));
1183
1184 let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
1185 assert_eq!(paid.len(), 2);
1186 }
1187
1188 fn make_proof_with_timestamps(timestamps: &[std::time::SystemTime]) -> ProofOfPayment {
1198 let peer_quotes = timestamps
1199 .iter()
1200 .enumerate()
1201 .map(|(i, ts)| {
1202 let quote = PaymentQuote {
1203 content: xor_name::XorName([0u8; 32]),
1204 timestamp: *ts,
1205 price: Amount::from(1u64),
1206 rewards_address: RewardsAddress::new([1u8; 20]),
1207 pub_key: vec![],
1208 signature: vec![],
1209 };
1210 (EncodedPeerId::from([i as u8; 32]), quote)
1211 })
1212 .collect();
1213 ProofOfPayment { peer_quotes }
1214 }
1215
1216 fn default_max_future_skew() -> Duration {
1217 Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS)
1218 }
1219
1220 #[test]
1221 fn proof_is_safely_fresh_accepts_recent_quote() {
1222 let proof = make_proof_with_timestamps(&[std::time::SystemTime::now()]);
1223 assert!(proof_is_safely_fresh(
1224 &proof,
1225 std::time::SystemTime::now(),
1226 Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1227 default_max_future_skew(),
1228 ));
1229 }
1230
1231 #[test]
1232 fn proof_is_safely_fresh_rejects_quote_past_safe_window() {
1233 let too_old = std::time::SystemTime::now() - Duration::from_secs(23 * 60 * 60 + 57 * 60);
1238 let proof = make_proof_with_timestamps(&[too_old]);
1239 let max_safe = Duration::from_secs(
1240 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1241 );
1242 assert!(
1243 !proof_is_safely_fresh(
1244 &proof,
1245 std::time::SystemTime::now(),
1246 max_safe,
1247 default_max_future_skew(),
1248 ),
1249 "23h57m-old quote must fail safe-reuse check (limit is 24h - 5min margin)"
1250 );
1251 }
1252
1253 #[test]
1254 fn proof_is_safely_fresh_rejects_if_any_quote_is_stale() {
1255 let now = std::time::SystemTime::now();
1258 let fresh = now;
1259 let stale = now - Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1260 let proof = make_proof_with_timestamps(&[fresh, fresh, stale, fresh]);
1261 let max_safe = Duration::from_secs(
1262 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1263 );
1264 assert!(!proof_is_safely_fresh(
1265 &proof,
1266 now,
1267 max_safe,
1268 default_max_future_skew(),
1269 ));
1270 }
1271
1272 #[test]
1273 fn proof_is_safely_fresh_accepts_slight_future_skew_within_node_tolerance() {
1274 let now = std::time::SystemTime::now();
1279 let slight_future = now + Duration::from_secs(60);
1280 let proof = make_proof_with_timestamps(&[slight_future]);
1281 let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1282 assert!(
1283 proof_is_safely_fresh(&proof, now, max_safe, default_max_future_skew()),
1284 "60s-future quote must be accepted (within node's 300s skew tolerance)"
1285 );
1286 }
1287
1288 #[test]
1289 fn proof_is_safely_fresh_rejects_far_future_dated_quote() {
1290 let now = std::time::SystemTime::now();
1294 let far_future = now + Duration::from_secs(3600);
1295 let proof = make_proof_with_timestamps(&[far_future]);
1296 let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1297 assert!(!proof_is_safely_fresh(
1298 &proof,
1299 now,
1300 max_safe,
1301 default_max_future_skew(),
1302 ));
1303 }
1304
1305 #[test]
1306 fn proof_is_safely_fresh_empty_quotes_is_vacuously_safe() {
1307 let proof = make_proof_with_timestamps(&[]);
1312 assert!(proof_is_safely_fresh(
1313 &proof,
1314 std::time::SystemTime::now(),
1315 Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1316 default_max_future_skew(),
1317 ));
1318 }
1319}