1use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, PartialUploadSpend, Result};
13use ant_protocol::evm::{
14 Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15 RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{
18 deserialize_proof, serialize_single_node_proof, PaymentProof, SingleNodePayment,
19};
20use ant_protocol::transport::{MultiAddr, PeerId};
21use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
22use bytes::Bytes;
23use futures::stream::{self, StreamExt};
24use std::collections::{HashMap, HashSet};
25use std::time::{Duration, Instant};
26use tokio::sync::mpsc;
27use tracing::{debug, info, warn};
28
29const PAYMENT_WAVE_SIZE: usize = 64;
31
32const STORE_INFLIGHT_BYTE_BUDGET: usize = 64 * 1024 * 1024;
38
39#[derive(Debug)]
41pub struct PreparedChunk {
42 pub content: Bytes,
44 pub address: XorName,
46 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
48 pub payment: SingleNodePayment,
50 pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
52}
53
54#[derive(Debug, Clone)]
56pub struct PaidChunk {
57 pub content: Bytes,
59 pub address: XorName,
61 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
63 pub proof_bytes: Vec<u8>,
65}
66
67#[derive(Debug)]
69pub struct WaveResult {
70 pub stored: Vec<XorName>,
72 pub failed: Vec<(XorName, String)>,
74 pub chunk_attempts_total: usize,
76 pub store_durations_ms: Vec<u64>,
78 pub retries_per_chunk: Vec<u32>,
80}
81
82#[derive(Debug, Default, Clone)]
89pub struct WaveAggregateStats {
90 pub chunk_attempts_total: usize,
92 pub store_durations_ms: Vec<u64>,
95 pub retries_histogram: [usize; 4],
100}
101
102impl WaveAggregateStats {
103 pub fn absorb(&mut self, wave: &WaveResult) {
105 self.chunk_attempts_total = self
106 .chunk_attempts_total
107 .saturating_add(wave.chunk_attempts_total);
108 self.store_durations_ms.extend(&wave.store_durations_ms);
109 for &r in &wave.retries_per_chunk {
110 let idx = (r as usize).min(self.retries_histogram.len() - 1);
111 self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
112 }
113 }
114}
115
116fn percentile(values: &[u64], p: f64) -> u64 {
122 if values.is_empty() {
123 return 0;
124 }
125 let mut sorted = values.to_vec();
126 sorted.sort_unstable();
127 let p = p.clamp(0.0, 1.0);
128 let n = sorted.len();
130 #[allow(
131 clippy::cast_possible_truncation,
132 clippy::cast_sign_loss,
133 clippy::cast_precision_loss
134 )]
135 let rank = ((p * n as f64).ceil() as usize)
136 .saturating_sub(1)
137 .min(n - 1);
138 sorted[rank]
139}
140
141#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
146pub struct PaymentIntent {
147 pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
149 pub total_amount: Amount,
151}
152
153impl PaymentIntent {
154 pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
158 let mut payments = Vec::new();
159 let mut total = Amount::ZERO;
160 for chunk in prepared {
161 for info in &chunk.payment.quotes {
162 if !info.amount.is_zero() {
163 payments.push((info.quote_hash, info.rewards_address, info.amount));
164 total += info.amount;
165 }
166 }
167 }
168 Self {
169 payments,
170 total_amount: total,
171 }
172 }
173}
174
175fn build_paid_chunks(
182 prepared: Vec<PreparedChunk>,
183 tx_hash_map: &HashMap<QuoteHash, TxHash>,
184) -> Result<Vec<PaidChunk>> {
185 let mut paid_chunks = Vec::with_capacity(prepared.len());
186 for chunk in prepared {
187 let mut tx_hashes = Vec::new();
188 for info in &chunk.payment.quotes {
189 if !info.amount.is_zero() {
190 let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
191 Error::Payment(format!(
192 "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
193 hex::encode(info.quote_hash)
194 ))
195 })?;
196 tx_hashes.push(tx_hash);
197 }
198 }
199
200 let proof = PaymentProof {
201 proof_of_payment: ProofOfPayment {
202 peer_quotes: chunk.peer_quotes,
203 },
204 tx_hashes,
205 };
206
207 let proof_bytes = serialize_single_node_proof(&proof)
208 .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
209
210 paid_chunks.push(PaidChunk {
211 content: chunk.content,
212 address: chunk.address,
213 quoted_peers: chunk.quoted_peers,
214 proof_bytes,
215 });
216 }
217 Ok(paid_chunks)
218}
219
220pub fn finalize_batch_payment(
225 prepared: Vec<PreparedChunk>,
226 tx_hash_map: &HashMap<QuoteHash, TxHash>,
227) -> Result<Vec<PaidChunk>> {
228 build_paid_chunks(prepared, tx_hash_map)
229}
230
231impl Client {
232 pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
242 let address = compute_address(&content);
243 let data_size = u64::try_from(content.len())
244 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
245
246 let quote_plan = match self
247 .get_store_quote_plan(&address, data_size, DATA_TYPE_CHUNK)
248 .await
249 {
250 Ok(plan) => plan,
251 Err(Error::AlreadyStored) => {
252 debug!("Chunk {} already stored, skipping", hex::encode(address));
253 return Ok(None);
254 }
255 Err(e) => return Err(e),
256 };
257 let quotes_with_peers = quote_plan.quotes;
258
259 let quoted_peers = quote_plan.put_peers;
261
262 let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
265 let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
266
267 for (peer_id, _addrs, quote, price) in quotes_with_peers {
268 let encoded = peer_id_to_encoded(&peer_id)?;
269 peer_quotes.push((encoded, quote.clone()));
270 quotes_for_payment.push((quote, price));
271 }
272
273 let payment = SingleNodePayment::from_quotes(quotes_for_payment)
274 .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
275
276 Ok(Some(PreparedChunk {
277 content,
278 address,
279 quoted_peers,
280 payment,
281 peer_quotes,
282 }))
283 }
284
285 pub async fn batch_pay(
297 &self,
298 prepared: Vec<PreparedChunk>,
299 ) -> Result<(Vec<PaidChunk>, String, u128)> {
300 if prepared.is_empty() {
301 return Ok((Vec::new(), "0".to_string(), 0));
302 }
303
304 let wallet = self.require_wallet()?;
305
306 let intent = PaymentIntent::from_prepared_chunks(&prepared);
308 let storage_cost_atto = intent.total_amount.to_string();
309
310 let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
312 let mut all_payments = Vec::with_capacity(total_quotes);
313 for chunk in &prepared {
314 for info in &chunk.payment.quotes {
315 all_payments.push((info.quote_hash, info.rewards_address, info.amount));
316 }
317 }
318
319 debug!(
320 "Batch payment for {} chunks ({} quote entries)",
321 prepared.len(),
322 all_payments.len()
323 );
324
325 let (tx_hash_map, gas_info) =
326 wallet
327 .pay_for_quotes(all_payments)
328 .await
329 .map_err(|PayForQuotesError(err, _)| {
330 Error::Payment(format!("Batch payment failed: {err}"))
331 })?;
332
333 info!(
334 "Batch payment succeeded: {} transactions",
335 tx_hash_map.len()
336 );
337
338 let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
339 let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
340 Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
341 }
342
343 pub async fn batch_upload_chunks(
358 &self,
359 chunks: Vec<Bytes>,
360 ) -> Result<(Vec<XorName>, String, u128)> {
361 let (addresses, storage, gas, _stats) = self
362 .batch_upload_chunks_with_events(chunks, None, 0, 0, None)
363 .await?;
364 Ok((addresses, storage, gas))
365 }
366
367 pub async fn batch_upload_chunks_with_events(
381 &self,
382 chunks: Vec<Bytes>,
383 progress: Option<&mpsc::Sender<UploadEvent>>,
384 stored_offset: usize,
385 file_total: usize,
386 resume_key: Option<&str>,
387 ) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
388 if chunks.is_empty() {
389 return Ok((
390 Vec::new(),
391 "0".to_string(),
392 0,
393 WaveAggregateStats::default(),
394 ));
395 }
396
397 let total_chunks = chunks.len();
398 let quote_cap = self.controller().quote.current();
399 let store_cap = self.controller().store.current();
400 debug!(
401 "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
402 (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
403 );
404
405 let cached_proofs: HashMap<XorName, Vec<u8>> = match resume_key {
429 Some(key) => match crate::data::client::cached_single::try_load_for_file(key) {
430 Some((_, receipt)) => prune_locally_expired_proofs(key, receipt.proofs),
431 None => HashMap::new(),
432 },
433 None => HashMap::new(),
434 };
435
436 let mut all_addresses = Vec::with_capacity(total_chunks);
437 let mut seen_addresses: HashSet<XorName> = HashSet::new();
438
439 let mut total_storage = Amount::ZERO;
442 let mut total_gas: u128 = 0;
443 let mut agg_stats = WaveAggregateStats::default();
444
445 let mut unique_chunks = Vec::with_capacity(total_chunks);
447 for chunk in chunks {
448 let address = compute_address(&chunk);
449 if seen_addresses.insert(address) {
450 unique_chunks.push(chunk);
451 } else {
452 debug!("Skipping duplicate chunk {}", hex::encode(address));
453 all_addresses.push(address);
454 if let Some(tx) = progress {
455 let _ = tx.try_send(UploadEvent::ChunkStored {
456 stored: stored_offset + all_addresses.len(),
457 total: file_total,
458 });
459 }
460 }
461 }
462
463 let waves: Vec<Vec<Bytes>> = unique_chunks
465 .chunks(PAYMENT_WAVE_SIZE)
466 .map(<[Bytes]>::to_vec)
467 .collect();
468 let wave_count = waves.len();
469
470 debug!(
471 "{total_chunks} chunks -> {} unique -> {wave_count} waves",
472 seen_addresses.len()
473 );
474
475 let mut pending_store: Option<Vec<PaidChunk>> = None;
476 let mut total_quoted: usize = 0;
477
478 for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
479 let wave_num = wave_idx + 1;
480 let wave_size = wave_chunks.len();
481
482 let (prepare_result, store_result) = match pending_store.take() {
484 Some(paid_chunks) => {
485 let store_offset = stored_offset + all_addresses.len();
486 let quoted_offset = stored_offset + total_quoted;
487 let (prep, stored) = tokio::join!(
488 self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
489 self.store_paid_chunks_with_events(
490 paid_chunks,
491 progress,
492 store_offset,
493 file_total
494 )
495 );
496 (prep, Some(stored))
497 }
498 None => {
499 let quoted_offset = stored_offset + total_quoted;
500 let result = self
501 .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
502 .await;
503 (result, None)
504 }
505 };
506 total_quoted += wave_size;
507
508 if let Some(wave_result) = store_result {
510 all_addresses.extend(&wave_result.stored);
511 agg_stats.absorb(&wave_result);
512 if !wave_result.failed.is_empty() {
513 let failed_count = wave_result.failed.len();
514 warn!("{failed_count} chunks failed to store after retries");
515 return Err(Error::PartialUpload {
516 stored: all_addresses.clone(),
517 stored_count: stored_offset + all_addresses.len(),
518 failed: wave_result.failed,
519 failed_count,
520 total_chunks: file_total,
521 spend: Box::new(PartialUploadSpend {
522 storage_cost_atto: total_storage.to_string(),
523 gas_cost_wei: total_gas,
524 }),
525 reason: "wave store failed after retries".into(),
526 });
527 }
528 }
529
530 let (prepared_chunks, already_stored) = prepare_result?;
531 all_addresses.extend(&already_stored);
532 if let Some(tx) = progress {
533 for _ in &already_stored {
534 let _ = tx.try_send(UploadEvent::ChunkStored {
535 stored: stored_offset + all_addresses.len(),
536 total: file_total,
537 });
538 }
539 }
540
541 if prepared_chunks.is_empty() {
542 info!("Wave {wave_num}/{wave_count}: all chunks already stored");
543 continue;
544 }
545
546 let mut needs_pay: Vec<PreparedChunk> = Vec::with_capacity(prepared_chunks.len());
551 let mut cached_paid: Vec<PaidChunk> = Vec::new();
552 for prep in prepared_chunks {
553 if let Some(proof_bytes) = cached_proofs.get(&prep.address).cloned() {
554 cached_paid.push(PaidChunk {
555 content: prep.content,
556 address: prep.address,
557 quoted_peers: prep.quoted_peers,
558 proof_bytes,
559 });
560 } else {
561 needs_pay.push(prep);
562 }
563 }
564 if !cached_paid.is_empty() {
565 info!(
566 "Wave {wave_num}/{wave_count}: reusing {} cached payment proofs",
567 cached_paid.len()
568 );
569 }
570
571 let (mut paid_chunks, wave_storage, wave_gas) = if needs_pay.is_empty() {
572 (Vec::new(), "0".to_string(), 0u128)
573 } else {
574 info!(
575 "Wave {wave_num}/{wave_count}: paying for {} chunks",
576 needs_pay.len()
577 );
578 self.batch_pay(needs_pay).await?
579 };
580 if let Ok(cost) = wave_storage.parse::<Amount>() {
581 total_storage += cost;
582 }
583 total_gas = total_gas.saturating_add(wave_gas);
584
585 if let Some(key) = resume_key {
588 if !paid_chunks.is_empty() {
589 let new_proofs: HashMap<[u8; 32], Vec<u8>> = paid_chunks
590 .iter()
591 .map(|pc| (pc.address, pc.proof_bytes.clone()))
592 .collect();
593 crate::data::client::cached_single::try_append_wave(
594 key,
595 new_proofs,
596 &wave_storage,
597 wave_gas,
598 );
599 }
600 }
601
602 paid_chunks.extend(cached_paid);
603 pending_store = Some(paid_chunks);
604 }
605
606 if let Some(paid_chunks) = pending_store {
608 let store_offset = stored_offset + all_addresses.len();
609 let wave_result = self
610 .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
611 .await;
612 all_addresses.extend(&wave_result.stored);
613 agg_stats.absorb(&wave_result);
614 if !wave_result.failed.is_empty() {
615 let failed_count = wave_result.failed.len();
616 warn!("{failed_count} chunks failed to store after retries (final wave)");
617 return Err(Error::PartialUpload {
618 stored: all_addresses.clone(),
619 stored_count: stored_offset + all_addresses.len(),
620 failed: wave_result.failed,
621 failed_count,
622 total_chunks: file_total,
623 spend: Box::new(PartialUploadSpend {
624 storage_cost_atto: total_storage.to_string(),
625 gas_cost_wei: total_gas,
626 }),
627 reason: "final wave store failed after retries".into(),
628 });
629 }
630 }
631
632 debug!("Batch upload complete: {} addresses", all_addresses.len());
633 Ok((
634 all_addresses,
635 total_storage.to_string(),
636 total_gas,
637 agg_stats,
638 ))
639 }
640
641 async fn prepare_wave(
646 &self,
647 chunks: Vec<Bytes>,
648 progress: Option<&mpsc::Sender<UploadEvent>>,
649 quoted_offset: usize,
650 file_total: usize,
651 ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
652 let chunk_count = chunks.len();
653 let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
654 .into_iter()
655 .map(|c| {
656 let addr = compute_address(&c);
657 (c, addr)
658 })
659 .collect();
660
661 let quote_limiter = self.controller().quote.clone();
662 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
667 let mut quote_stream = stream::iter(chunks_with_addr)
668 .map(|(content, address)| {
669 let limiter = quote_limiter.clone();
670 async move {
671 let result = observe_op(
672 &limiter,
673 || async move { self.prepare_chunk_payment(content).await },
674 classify_error,
675 )
676 .await;
677 (address, result)
678 }
679 })
680 .buffer_unordered(quote_concurrency);
681
682 let mut prepared = Vec::with_capacity(chunk_count);
683 let mut already_stored = Vec::new();
684 let mut quoted_count = 0usize;
685
686 while let Some((address, result)) = quote_stream.next().await {
687 let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
688 match result? {
689 Some(chunk) => prepared.push(chunk),
690 None => already_stored.push(address),
691 }
692 quoted_count += 1;
693 let progress_num = quoted_offset + quoted_count;
694 if file_total > 0 {
695 if chunk_already_stored {
696 info!("Verified {progress_num}/{file_total} (already stored)");
697 } else {
698 info!("Quoted {progress_num}/{file_total}");
699 }
700 }
701 if let Some(tx) = progress {
702 let _ = tx.try_send(UploadEvent::ChunkQuoted {
703 quoted: progress_num,
704 total: file_total,
705 });
706 }
707 }
708
709 Ok((prepared, already_stored))
710 }
711
712 pub(crate) async fn store_paid_chunks_with_events(
724 &self,
725 paid_chunks: Vec<PaidChunk>,
726 progress: Option<&mpsc::Sender<UploadEvent>>,
727 stored_before: usize,
728 total_chunks: usize,
729 ) -> WaveResult {
730 const MAX_RETRIES: u32 = 3;
731 const BASE_DELAY_MS: u64 = 500;
732
733 let mut stored = Vec::new();
734 let mut to_retry = paid_chunks;
735
736 let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
740 for chunk in &to_retry {
741 first_seen.entry(chunk.address).or_insert_with(Instant::now);
742 }
743
744 let max_chunk_bytes = to_retry.iter().map(|c| c.content.len()).max().unwrap_or(0);
754 let byte_bound = STORE_INFLIGHT_BYTE_BUDGET
757 .checked_div(max_chunk_bytes)
758 .map_or(usize::MAX, |n| n.max(1));
759
760 let mut chunk_attempts_total: usize = 0;
761 let mut store_durations_ms: Vec<u64> = Vec::new();
762 let mut retries_per_chunk: Vec<u32> = Vec::new();
763
764 for attempt in 0..=MAX_RETRIES {
765 if attempt > 0 {
766 let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
767 tokio::time::sleep(delay).await;
768 info!(
769 "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
770 to_retry.len()
771 );
772 }
773
774 chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());
776
777 let store_limiter = self.controller().store.clone();
778 let store_concurrency = store_limiter
779 .current()
780 .min(to_retry.len().max(1))
781 .min(byte_bound);
782 let mut upload_stream = stream::iter(to_retry)
783 .map(|chunk| {
784 let chunk_clone = chunk.clone();
785 let limiter = store_limiter.clone();
786 async move {
787 let result = observe_op(
788 &limiter,
789 || async move {
790 self.chunk_put_to_close_group(
791 chunk.content,
792 chunk.proof_bytes,
793 &chunk.quoted_peers,
794 )
795 .await
796 },
797 classify_error,
798 )
799 .await;
800 (chunk_clone, result)
801 }
802 })
803 .buffer_unordered(store_concurrency);
804
805 let mut failed_this_round = Vec::new();
806 while let Some((chunk, result)) = upload_stream.next().await {
807 match result {
808 Ok(name) => {
809 let duration_ms = first_seen
810 .get(&chunk.address)
811 .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
812 .unwrap_or(0);
813 store_durations_ms.push(duration_ms);
814 retries_per_chunk.push(attempt);
815 stored.push(name);
816 let stored_num = stored_before + stored.len();
817 if total_chunks > 0 {
818 info!("Stored {stored_num}/{total_chunks}");
819 }
820 if let Some(tx) = progress {
821 let _ = tx.try_send(UploadEvent::ChunkStored {
822 stored: stored_num,
823 total: total_chunks,
824 });
825 }
826 }
827 Err(e) => failed_this_round.push((chunk, e.to_string())),
828 }
829 }
830
831 if failed_this_round.is_empty() {
832 let result = WaveResult {
833 stored,
834 failed: Vec::new(),
835 chunk_attempts_total,
836 store_durations_ms,
837 retries_per_chunk,
838 };
839 log_wave_summary(&result);
840 return result;
841 }
842
843 if attempt == MAX_RETRIES {
844 let failed = failed_this_round
845 .into_iter()
846 .map(|(c, e)| (c.address, e))
847 .collect();
848 let result = WaveResult {
849 stored,
850 failed,
851 chunk_attempts_total,
852 store_durations_ms,
853 retries_per_chunk,
854 };
855 log_wave_summary(&result);
856 return result;
857 }
858
859 warn!(
860 "{} chunks failed on attempt {}, will retry",
861 failed_this_round.len(),
862 attempt + 1
863 );
864 to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
865 }
866
867 let result = WaveResult {
869 stored,
870 failed: Vec::new(),
871 chunk_attempts_total,
872 store_durations_ms,
873 retries_per_chunk,
874 };
875 log_wave_summary(&result);
876 result
877 }
878}
879
880fn log_wave_summary(result: &WaveResult) {
886 let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
887 let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
888 let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
889 let chunk_attempts_total = result.chunk_attempts_total;
890 info!(
891 chunks_stored = result.stored.len(),
892 chunks_failed = result.failed.len(),
893 chunk_attempts_total,
894 retries_round_1,
895 retries_round_2,
896 retries_round_3,
897 store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
898 store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
899 store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
900 "chunk_store_wave_complete"
901 );
902}
903
904const CACHED_PROOF_SAFETY_MARGIN_SECS: u64 = 300;
916
917const CACHED_PROOF_MAX_AGE_SECS: u64 = 24 * 60 * 60;
924
925const CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS: u64 = 300;
936
937fn prune_locally_expired_proofs(
961 resume_key: &str,
962 proofs: HashMap<[u8; 32], Vec<u8>>,
963) -> HashMap<XorName, Vec<u8>> {
964 let now = std::time::SystemTime::now();
965 let max_safe_age = Duration::from_secs(
966 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
967 );
968 let max_future_skew = Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS);
969 let mut kept: HashMap<XorName, Vec<u8>> = HashMap::with_capacity(proofs.len());
970 let mut expired: Vec<([u8; 32], Vec<u8>)> = Vec::new();
976 for (addr, bytes) in proofs {
977 match deserialize_proof(&bytes) {
978 Ok((proof, _tx_hashes)) => {
979 if proof_is_safely_fresh(&proof, now, max_safe_age, max_future_skew) {
980 kept.insert(addr, bytes);
981 } else {
982 expired.push((addr, bytes));
983 }
984 }
985 Err(_) => {
986 expired.push((addr, bytes));
989 }
990 }
991 }
992 if !expired.is_empty() {
993 info!(
994 "Pruning {} stale cached proofs (quote.timestamp past safe-reuse window) \
995 before resume",
996 expired.len()
997 );
998 crate::data::client::cached_single::try_drop_proofs_for_file(resume_key, &expired);
999 }
1000 kept
1001}
1002
1003fn proof_is_safely_fresh(
1010 proof: &ProofOfPayment,
1011 now: std::time::SystemTime,
1012 max_safe_age: Duration,
1013 max_future_skew: Duration,
1014) -> bool {
1015 for (_peer, quote) in &proof.peer_quotes {
1016 match now.duration_since(quote.timestamp) {
1017 Ok(age) => {
1018 if age > max_safe_age {
1019 return false;
1020 }
1021 }
1022 Err(future) => {
1023 if future.duration() > max_future_skew {
1024 return false;
1025 }
1026 }
1027 }
1028 }
1029 true
1030}
1031
1032#[cfg(test)]
1034mod send_assertions {
1035 use super::*;
1036
1037 fn _assert_send<T: Send>(_: &T) {}
1038
1039 #[allow(dead_code)]
1040 async fn _batch_upload_is_send(client: &Client) {
1041 let fut = client.batch_upload_chunks(Vec::new());
1042 _assert_send(&fut);
1043 }
1044}
1045
1046#[cfg(test)]
1047#[allow(clippy::unwrap_used)]
1048mod tests {
1049 use super::*;
1050 use ant_protocol::payment::QuotePaymentInfo;
1051 use ant_protocol::CLOSE_GROUP_SIZE;
1052
1053 const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
1055
1056 fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
1060 let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
1061 let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
1062 QuotePaymentInfo {
1063 quote_hash: QuoteHash::from([i as u8 + 1; 32]),
1064 rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
1065 amount: Amount::from(amount),
1066 price: Amount::from(amount),
1067 }
1068 });
1069
1070 PreparedChunk {
1071 content: Bytes::from(vec![0xAA; 32]),
1072 address: [0u8; 32],
1073 quoted_peers: Vec::new(),
1074 payment: SingleNodePayment { quotes },
1075 peer_quotes: Vec::new(),
1076 }
1077 }
1078
1079 #[test]
1080 fn payment_intent_from_single_chunk() {
1081 let chunk = make_prepared_chunk(300);
1082 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1083
1084 assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
1085 assert_eq!(intent.total_amount, Amount::from(300));
1086
1087 let (hash, addr, amt) = &intent.payments[0];
1088 assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
1089 assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
1090 assert_eq!(*amt, Amount::from(300));
1091 }
1092
1093 #[test]
1094 fn payment_intent_from_multiple_chunks() {
1095 let c1 = make_prepared_chunk(100);
1096 let c2 = make_prepared_chunk(250);
1097 let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
1098
1099 assert_eq!(intent.payments.len(), 2);
1100 assert_eq!(intent.total_amount, Amount::from(350));
1101 }
1102
1103 #[test]
1104 fn payment_intent_skips_all_zero_chunks() {
1105 let chunk = make_prepared_chunk(0);
1106 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1107
1108 assert!(intent.payments.is_empty());
1109 assert_eq!(intent.total_amount, Amount::ZERO);
1110 }
1111
1112 #[test]
1113 fn payment_intent_empty_input() {
1114 let intent = PaymentIntent::from_prepared_chunks(&[]);
1115 assert!(intent.payments.is_empty());
1116 assert_eq!(intent.total_amount, Amount::ZERO);
1117 }
1118
1119 #[test]
1120 fn finalize_batch_payment_builds_proofs() {
1121 let chunk = make_prepared_chunk(500);
1122 let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
1123
1124 let mut tx_map = HashMap::new();
1125 tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
1126
1127 let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
1128
1129 assert_eq!(paid.len(), 1);
1130 assert!(!paid[0].proof_bytes.is_empty());
1131 assert_eq!(paid[0].address, [0u8; 32]);
1132 }
1133
1134 #[test]
1135 fn finalize_batch_payment_empty_input() {
1136 let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
1137 assert!(paid.is_empty());
1138 }
1139
1140 #[test]
1141 fn finalize_batch_payment_missing_tx_hash_errors() {
1142 let chunk = make_prepared_chunk(500);
1145
1146 let result = finalize_batch_payment(vec![chunk], &HashMap::new());
1147 assert!(result.is_err());
1148 let err = result.unwrap_err().to_string();
1149 assert!(err.contains("Missing tx hash"), "got: {err}");
1150 }
1151
1152 #[test]
1153 fn finalize_batch_payment_multiple_chunks() {
1154 let c1 = make_prepared_chunk(100);
1155 let c2 = make_prepared_chunk(200);
1156 let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
1157 let mut tx_map = HashMap::new();
1158 tx_map.insert(q1, TxHash::from([0xCC; 32]));
1161
1162 let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
1163 assert_eq!(paid.len(), 2);
1164 }
1165
1166 fn make_proof_with_timestamps(timestamps: &[std::time::SystemTime]) -> ProofOfPayment {
1176 let peer_quotes = timestamps
1177 .iter()
1178 .enumerate()
1179 .map(|(i, ts)| {
1180 let quote = PaymentQuote {
1181 content: xor_name::XorName([0u8; 32]),
1182 timestamp: *ts,
1183 price: Amount::from(1u64),
1184 rewards_address: RewardsAddress::new([1u8; 20]),
1185 pub_key: vec![],
1186 signature: vec![],
1187 };
1188 (EncodedPeerId::from([i as u8; 32]), quote)
1189 })
1190 .collect();
1191 ProofOfPayment { peer_quotes }
1192 }
1193
1194 fn default_max_future_skew() -> Duration {
1195 Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS)
1196 }
1197
1198 #[test]
1199 fn proof_is_safely_fresh_accepts_recent_quote() {
1200 let proof = make_proof_with_timestamps(&[std::time::SystemTime::now()]);
1201 assert!(proof_is_safely_fresh(
1202 &proof,
1203 std::time::SystemTime::now(),
1204 Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1205 default_max_future_skew(),
1206 ));
1207 }
1208
1209 #[test]
1210 fn proof_is_safely_fresh_rejects_quote_past_safe_window() {
1211 let too_old = std::time::SystemTime::now() - Duration::from_secs(23 * 60 * 60 + 57 * 60);
1216 let proof = make_proof_with_timestamps(&[too_old]);
1217 let max_safe = Duration::from_secs(
1218 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1219 );
1220 assert!(
1221 !proof_is_safely_fresh(
1222 &proof,
1223 std::time::SystemTime::now(),
1224 max_safe,
1225 default_max_future_skew(),
1226 ),
1227 "23h57m-old quote must fail safe-reuse check (limit is 24h - 5min margin)"
1228 );
1229 }
1230
1231 #[test]
1232 fn proof_is_safely_fresh_rejects_if_any_quote_is_stale() {
1233 let now = std::time::SystemTime::now();
1236 let fresh = now;
1237 let stale = now - Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1238 let proof = make_proof_with_timestamps(&[fresh, fresh, stale, fresh]);
1239 let max_safe = Duration::from_secs(
1240 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1241 );
1242 assert!(!proof_is_safely_fresh(
1243 &proof,
1244 now,
1245 max_safe,
1246 default_max_future_skew(),
1247 ));
1248 }
1249
1250 #[test]
1251 fn proof_is_safely_fresh_accepts_slight_future_skew_within_node_tolerance() {
1252 let now = std::time::SystemTime::now();
1257 let slight_future = now + Duration::from_secs(60);
1258 let proof = make_proof_with_timestamps(&[slight_future]);
1259 let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1260 assert!(
1261 proof_is_safely_fresh(&proof, now, max_safe, default_max_future_skew()),
1262 "60s-future quote must be accepted (within node's 300s skew tolerance)"
1263 );
1264 }
1265
1266 #[test]
1267 fn proof_is_safely_fresh_rejects_far_future_dated_quote() {
1268 let now = std::time::SystemTime::now();
1272 let far_future = now + Duration::from_secs(3600);
1273 let proof = make_proof_with_timestamps(&[far_future]);
1274 let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1275 assert!(!proof_is_safely_fresh(
1276 &proof,
1277 now,
1278 max_safe,
1279 default_max_future_skew(),
1280 ));
1281 }
1282
1283 #[test]
1284 fn proof_is_safely_fresh_empty_quotes_is_vacuously_safe() {
1285 let proof = make_proof_with_timestamps(&[]);
1290 assert!(proof_is_safely_fresh(
1291 &proof,
1292 std::time::SystemTime::now(),
1293 Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1294 default_max_future_skew(),
1295 ));
1296 }
1297}