1use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, Result};
13use ant_protocol::evm::{
14 Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15 RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{
18 deserialize_proof, serialize_single_node_proof, PaymentProof, SingleNodePayment,
19};
20use ant_protocol::transport::{MultiAddr, PeerId};
21use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
22use bytes::Bytes;
23use futures::stream::{self, StreamExt};
24use std::collections::{HashMap, HashSet};
25use std::time::{Duration, Instant};
26use tokio::sync::mpsc;
27use tracing::{debug, info, warn};
28
29const PAYMENT_WAVE_SIZE: usize = 64;
31
32#[derive(Debug)]
34pub struct PreparedChunk {
35 pub content: Bytes,
37 pub address: XorName,
39 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
41 pub payment: SingleNodePayment,
43 pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
45}
46
47#[derive(Debug, Clone)]
49pub struct PaidChunk {
50 pub content: Bytes,
52 pub address: XorName,
54 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
56 pub proof_bytes: Vec<u8>,
58}
59
60#[derive(Debug)]
62pub struct WaveResult {
63 pub stored: Vec<XorName>,
65 pub failed: Vec<(XorName, String)>,
67 pub chunk_attempts_total: usize,
69 pub store_durations_ms: Vec<u64>,
71 pub retries_per_chunk: Vec<u32>,
73}
74
75#[derive(Debug, Default, Clone)]
82pub struct WaveAggregateStats {
83 pub chunk_attempts_total: usize,
85 pub store_durations_ms: Vec<u64>,
88 pub retries_histogram: [usize; 4],
93}
94
95impl WaveAggregateStats {
96 pub fn absorb(&mut self, wave: &WaveResult) {
98 self.chunk_attempts_total = self
99 .chunk_attempts_total
100 .saturating_add(wave.chunk_attempts_total);
101 self.store_durations_ms.extend(&wave.store_durations_ms);
102 for &r in &wave.retries_per_chunk {
103 let idx = (r as usize).min(self.retries_histogram.len() - 1);
104 self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
105 }
106 }
107}
108
109fn percentile(values: &[u64], p: f64) -> u64 {
115 if values.is_empty() {
116 return 0;
117 }
118 let mut sorted = values.to_vec();
119 sorted.sort_unstable();
120 let p = p.clamp(0.0, 1.0);
121 let n = sorted.len();
123 #[allow(
124 clippy::cast_possible_truncation,
125 clippy::cast_sign_loss,
126 clippy::cast_precision_loss
127 )]
128 let rank = ((p * n as f64).ceil() as usize)
129 .saturating_sub(1)
130 .min(n - 1);
131 sorted[rank]
132}
133
134#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
139pub struct PaymentIntent {
140 pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
142 pub total_amount: Amount,
144}
145
146impl PaymentIntent {
147 pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
151 let mut payments = Vec::new();
152 let mut total = Amount::ZERO;
153 for chunk in prepared {
154 for info in &chunk.payment.quotes {
155 if !info.amount.is_zero() {
156 payments.push((info.quote_hash, info.rewards_address, info.amount));
157 total += info.amount;
158 }
159 }
160 }
161 Self {
162 payments,
163 total_amount: total,
164 }
165 }
166}
167
168fn build_paid_chunks(
175 prepared: Vec<PreparedChunk>,
176 tx_hash_map: &HashMap<QuoteHash, TxHash>,
177) -> Result<Vec<PaidChunk>> {
178 let mut paid_chunks = Vec::with_capacity(prepared.len());
179 for chunk in prepared {
180 let mut tx_hashes = Vec::new();
181 for info in &chunk.payment.quotes {
182 if !info.amount.is_zero() {
183 let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
184 Error::Payment(format!(
185 "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
186 hex::encode(info.quote_hash)
187 ))
188 })?;
189 tx_hashes.push(tx_hash);
190 }
191 }
192
193 let proof = PaymentProof {
194 proof_of_payment: ProofOfPayment {
195 peer_quotes: chunk.peer_quotes,
196 },
197 tx_hashes,
198 };
199
200 let proof_bytes = serialize_single_node_proof(&proof)
201 .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
202
203 paid_chunks.push(PaidChunk {
204 content: chunk.content,
205 address: chunk.address,
206 quoted_peers: chunk.quoted_peers,
207 proof_bytes,
208 });
209 }
210 Ok(paid_chunks)
211}
212
213pub fn finalize_batch_payment(
218 prepared: Vec<PreparedChunk>,
219 tx_hash_map: &HashMap<QuoteHash, TxHash>,
220) -> Result<Vec<PaidChunk>> {
221 build_paid_chunks(prepared, tx_hash_map)
222}
223
224impl Client {
225 pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
235 let address = compute_address(&content);
236 let data_size = u64::try_from(content.len())
237 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
238
239 let quotes_with_peers = match self
240 .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK)
241 .await
242 {
243 Ok(quotes) => quotes,
244 Err(Error::AlreadyStored) => {
245 debug!("Chunk {} already stored, skipping", hex::encode(address));
246 return Ok(None);
247 }
248 Err(e) => return Err(e),
249 };
250
251 let quoted_peers: Vec<(PeerId, Vec<MultiAddr>)> = quotes_with_peers
253 .iter()
254 .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone()))
255 .collect();
256
257 let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
260 let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
261
262 for (peer_id, _addrs, quote, price) in quotes_with_peers {
263 let encoded = peer_id_to_encoded(&peer_id)?;
264 peer_quotes.push((encoded, quote.clone()));
265 quotes_for_payment.push((quote, price));
266 }
267
268 let payment = SingleNodePayment::from_quotes(quotes_for_payment)
269 .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
270
271 Ok(Some(PreparedChunk {
272 content,
273 address,
274 quoted_peers,
275 payment,
276 peer_quotes,
277 }))
278 }
279
280 pub async fn batch_pay(
292 &self,
293 prepared: Vec<PreparedChunk>,
294 ) -> Result<(Vec<PaidChunk>, String, u128)> {
295 if prepared.is_empty() {
296 return Ok((Vec::new(), "0".to_string(), 0));
297 }
298
299 let wallet = self.require_wallet()?;
300
301 let intent = PaymentIntent::from_prepared_chunks(&prepared);
303 let storage_cost_atto = intent.total_amount.to_string();
304
305 let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
307 let mut all_payments = Vec::with_capacity(total_quotes);
308 for chunk in &prepared {
309 for info in &chunk.payment.quotes {
310 all_payments.push((info.quote_hash, info.rewards_address, info.amount));
311 }
312 }
313
314 debug!(
315 "Batch payment for {} chunks ({} quote entries)",
316 prepared.len(),
317 all_payments.len()
318 );
319
320 let (tx_hash_map, gas_info) =
321 wallet
322 .pay_for_quotes(all_payments)
323 .await
324 .map_err(|PayForQuotesError(err, _)| {
325 Error::Payment(format!("Batch payment failed: {err}"))
326 })?;
327
328 info!(
329 "Batch payment succeeded: {} transactions",
330 tx_hash_map.len()
331 );
332
333 let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
334 let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
335 Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
336 }
337
338 pub async fn batch_upload_chunks(
353 &self,
354 chunks: Vec<Bytes>,
355 ) -> Result<(Vec<XorName>, String, u128)> {
356 let (addresses, storage, gas, _stats) = self
357 .batch_upload_chunks_with_events(chunks, None, 0, 0, None)
358 .await?;
359 Ok((addresses, storage, gas))
360 }
361
362 pub async fn batch_upload_chunks_with_events(
376 &self,
377 chunks: Vec<Bytes>,
378 progress: Option<&mpsc::Sender<UploadEvent>>,
379 stored_offset: usize,
380 file_total: usize,
381 resume_key: Option<&str>,
382 ) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
383 if chunks.is_empty() {
384 return Ok((
385 Vec::new(),
386 "0".to_string(),
387 0,
388 WaveAggregateStats::default(),
389 ));
390 }
391
392 let total_chunks = chunks.len();
393 let quote_cap = self.controller().quote.current();
394 let store_cap = self.controller().store.current();
395 debug!(
396 "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
397 (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
398 );
399
400 let (cached_proofs, cached_storage, cached_gas): (HashMap<XorName, Vec<u8>>, Amount, u128) =
422 match resume_key {
423 Some(key) => match crate::data::client::cached_single::try_load_for_file(key) {
424 Some((_, receipt)) => {
425 let prior_storage = receipt
426 .storage_cost_atto
427 .parse::<Amount>()
428 .unwrap_or(Amount::ZERO);
429 let prior_gas = receipt.gas_cost_wei;
430 let kept = prune_locally_expired_proofs(key, receipt.proofs);
431 (kept, prior_storage, prior_gas)
432 }
433 None => (HashMap::new(), Amount::ZERO, 0u128),
434 },
435 None => (HashMap::new(), Amount::ZERO, 0u128),
436 };
437
438 let mut all_addresses = Vec::with_capacity(total_chunks);
439 let mut seen_addresses: HashSet<XorName> = HashSet::new();
440
441 let mut total_storage = cached_storage;
444 let mut total_gas: u128 = cached_gas;
445 let mut agg_stats = WaveAggregateStats::default();
446
447 let mut unique_chunks = Vec::with_capacity(total_chunks);
449 for chunk in chunks {
450 let address = compute_address(&chunk);
451 if seen_addresses.insert(address) {
452 unique_chunks.push(chunk);
453 } else {
454 debug!("Skipping duplicate chunk {}", hex::encode(address));
455 all_addresses.push(address);
456 if let Some(tx) = progress {
457 let _ = tx.try_send(UploadEvent::ChunkStored {
458 stored: stored_offset + all_addresses.len(),
459 total: file_total,
460 });
461 }
462 }
463 }
464
465 let waves: Vec<Vec<Bytes>> = unique_chunks
467 .chunks(PAYMENT_WAVE_SIZE)
468 .map(<[Bytes]>::to_vec)
469 .collect();
470 let wave_count = waves.len();
471
472 debug!(
473 "{total_chunks} chunks -> {} unique -> {wave_count} waves",
474 seen_addresses.len()
475 );
476
477 let mut pending_store: Option<Vec<PaidChunk>> = None;
478 let mut total_quoted: usize = 0;
479
480 for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
481 let wave_num = wave_idx + 1;
482 let wave_size = wave_chunks.len();
483
484 let (prepare_result, store_result) = match pending_store.take() {
486 Some(paid_chunks) => {
487 let store_offset = stored_offset + all_addresses.len();
488 let quoted_offset = stored_offset + total_quoted;
489 let (prep, stored) = tokio::join!(
490 self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
491 self.store_paid_chunks_with_events(
492 paid_chunks,
493 progress,
494 store_offset,
495 file_total
496 )
497 );
498 (prep, Some(stored))
499 }
500 None => {
501 let quoted_offset = stored_offset + total_quoted;
502 let result = self
503 .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
504 .await;
505 (result, None)
506 }
507 };
508 total_quoted += wave_size;
509
510 if let Some(wave_result) = store_result {
512 all_addresses.extend(&wave_result.stored);
513 agg_stats.absorb(&wave_result);
514 if !wave_result.failed.is_empty() {
515 let failed_count = wave_result.failed.len();
516 warn!("{failed_count} chunks failed to store after retries");
517 return Err(Error::PartialUpload {
518 stored: all_addresses.clone(),
519 stored_count: stored_offset + all_addresses.len(),
520 failed: wave_result.failed,
521 failed_count,
522 total_chunks: file_total,
523 reason: "wave store failed after retries".into(),
524 });
525 }
526 }
527
528 let (prepared_chunks, already_stored) = prepare_result?;
529 all_addresses.extend(&already_stored);
530 if let Some(tx) = progress {
531 for _ in &already_stored {
532 let _ = tx.try_send(UploadEvent::ChunkStored {
533 stored: stored_offset + all_addresses.len(),
534 total: file_total,
535 });
536 }
537 }
538
539 if prepared_chunks.is_empty() {
540 info!("Wave {wave_num}/{wave_count}: all chunks already stored");
541 continue;
542 }
543
544 let mut needs_pay: Vec<PreparedChunk> = Vec::with_capacity(prepared_chunks.len());
549 let mut cached_paid: Vec<PaidChunk> = Vec::new();
550 for prep in prepared_chunks {
551 if let Some(proof_bytes) = cached_proofs.get(&prep.address).cloned() {
552 cached_paid.push(PaidChunk {
553 content: prep.content,
554 address: prep.address,
555 quoted_peers: prep.quoted_peers,
556 proof_bytes,
557 });
558 } else {
559 needs_pay.push(prep);
560 }
561 }
562 if !cached_paid.is_empty() {
563 info!(
564 "Wave {wave_num}/{wave_count}: reusing {} cached payment proofs",
565 cached_paid.len()
566 );
567 }
568
569 let (mut paid_chunks, wave_storage, wave_gas) = if needs_pay.is_empty() {
570 (Vec::new(), "0".to_string(), 0u128)
571 } else {
572 info!(
573 "Wave {wave_num}/{wave_count}: paying for {} chunks",
574 needs_pay.len()
575 );
576 self.batch_pay(needs_pay).await?
577 };
578 if let Ok(cost) = wave_storage.parse::<Amount>() {
579 total_storage += cost;
580 }
581 total_gas = total_gas.saturating_add(wave_gas);
582
583 if let Some(key) = resume_key {
586 if !paid_chunks.is_empty() {
587 let new_proofs: HashMap<[u8; 32], Vec<u8>> = paid_chunks
588 .iter()
589 .map(|pc| (pc.address, pc.proof_bytes.clone()))
590 .collect();
591 crate::data::client::cached_single::try_append_wave(
592 key,
593 new_proofs,
594 &wave_storage,
595 wave_gas,
596 );
597 }
598 }
599
600 paid_chunks.extend(cached_paid);
601 pending_store = Some(paid_chunks);
602 }
603
604 if let Some(paid_chunks) = pending_store {
606 let store_offset = stored_offset + all_addresses.len();
607 let wave_result = self
608 .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
609 .await;
610 all_addresses.extend(&wave_result.stored);
611 agg_stats.absorb(&wave_result);
612 if !wave_result.failed.is_empty() {
613 let failed_count = wave_result.failed.len();
614 warn!("{failed_count} chunks failed to store after retries (final wave)");
615 return Err(Error::PartialUpload {
616 stored: all_addresses.clone(),
617 stored_count: stored_offset + all_addresses.len(),
618 failed: wave_result.failed,
619 failed_count,
620 total_chunks: file_total,
621 reason: "final wave store failed after retries".into(),
622 });
623 }
624 }
625
626 debug!("Batch upload complete: {} addresses", all_addresses.len());
627 Ok((
628 all_addresses,
629 total_storage.to_string(),
630 total_gas,
631 agg_stats,
632 ))
633 }
634
635 async fn prepare_wave(
640 &self,
641 chunks: Vec<Bytes>,
642 progress: Option<&mpsc::Sender<UploadEvent>>,
643 quoted_offset: usize,
644 file_total: usize,
645 ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
646 let chunk_count = chunks.len();
647 let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
648 .into_iter()
649 .map(|c| {
650 let addr = compute_address(&c);
651 (c, addr)
652 })
653 .collect();
654
655 let quote_limiter = self.controller().quote.clone();
656 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
661 let mut quote_stream = stream::iter(chunks_with_addr)
662 .map(|(content, address)| {
663 let limiter = quote_limiter.clone();
664 async move {
665 let result = observe_op(
666 &limiter,
667 || async move { self.prepare_chunk_payment(content).await },
668 classify_error,
669 )
670 .await;
671 (address, result)
672 }
673 })
674 .buffer_unordered(quote_concurrency);
675
676 let mut prepared = Vec::with_capacity(chunk_count);
677 let mut already_stored = Vec::new();
678 let mut quoted_count = 0usize;
679
680 while let Some((address, result)) = quote_stream.next().await {
681 let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
682 match result? {
683 Some(chunk) => prepared.push(chunk),
684 None => already_stored.push(address),
685 }
686 quoted_count += 1;
687 let progress_num = quoted_offset + quoted_count;
688 if file_total > 0 {
689 if chunk_already_stored {
690 info!("Verified {progress_num}/{file_total} (already stored)");
691 } else {
692 info!("Quoted {progress_num}/{file_total}");
693 }
694 }
695 if let Some(tx) = progress {
696 let _ = tx.try_send(UploadEvent::ChunkQuoted {
697 quoted: progress_num,
698 total: file_total,
699 });
700 }
701 }
702
703 Ok((prepared, already_stored))
704 }
705
706 pub(crate) async fn store_paid_chunks_with_events(
718 &self,
719 paid_chunks: Vec<PaidChunk>,
720 progress: Option<&mpsc::Sender<UploadEvent>>,
721 stored_before: usize,
722 total_chunks: usize,
723 ) -> WaveResult {
724 const MAX_RETRIES: u32 = 3;
725 const BASE_DELAY_MS: u64 = 500;
726
727 let mut stored = Vec::new();
728 let mut to_retry = paid_chunks;
729
730 let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
734 for chunk in &to_retry {
735 first_seen.entry(chunk.address).or_insert_with(Instant::now);
736 }
737
738 let mut chunk_attempts_total: usize = 0;
739 let mut store_durations_ms: Vec<u64> = Vec::new();
740 let mut retries_per_chunk: Vec<u32> = Vec::new();
741
742 for attempt in 0..=MAX_RETRIES {
743 if attempt > 0 {
744 let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
745 tokio::time::sleep(delay).await;
746 info!(
747 "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
748 to_retry.len()
749 );
750 }
751
752 chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());
754
755 let store_limiter = self.controller().store.clone();
756 let store_concurrency = store_limiter.current().min(to_retry.len().max(1));
757 let mut upload_stream = stream::iter(to_retry)
758 .map(|chunk| {
759 let chunk_clone = chunk.clone();
760 let limiter = store_limiter.clone();
761 async move {
762 let result = observe_op(
763 &limiter,
764 || async move {
765 self.chunk_put_to_close_group(
766 chunk.content,
767 chunk.proof_bytes,
768 &chunk.quoted_peers,
769 )
770 .await
771 },
772 classify_error,
773 )
774 .await;
775 (chunk_clone, result)
776 }
777 })
778 .buffer_unordered(store_concurrency);
779
780 let mut failed_this_round = Vec::new();
781 while let Some((chunk, result)) = upload_stream.next().await {
782 match result {
783 Ok(name) => {
784 let duration_ms = first_seen
785 .get(&chunk.address)
786 .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
787 .unwrap_or(0);
788 store_durations_ms.push(duration_ms);
789 retries_per_chunk.push(attempt);
790 stored.push(name);
791 let stored_num = stored_before + stored.len();
792 if total_chunks > 0 {
793 info!("Stored {stored_num}/{total_chunks}");
794 }
795 if let Some(tx) = progress {
796 let _ = tx.try_send(UploadEvent::ChunkStored {
797 stored: stored_num,
798 total: total_chunks,
799 });
800 }
801 }
802 Err(e) => failed_this_round.push((chunk, e.to_string())),
803 }
804 }
805
806 if failed_this_round.is_empty() {
807 let result = WaveResult {
808 stored,
809 failed: Vec::new(),
810 chunk_attempts_total,
811 store_durations_ms,
812 retries_per_chunk,
813 };
814 log_wave_summary(&result);
815 return result;
816 }
817
818 if attempt == MAX_RETRIES {
819 let failed = failed_this_round
820 .into_iter()
821 .map(|(c, e)| (c.address, e))
822 .collect();
823 let result = WaveResult {
824 stored,
825 failed,
826 chunk_attempts_total,
827 store_durations_ms,
828 retries_per_chunk,
829 };
830 log_wave_summary(&result);
831 return result;
832 }
833
834 warn!(
835 "{} chunks failed on attempt {}, will retry",
836 failed_this_round.len(),
837 attempt + 1
838 );
839 to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
840 }
841
842 let result = WaveResult {
844 stored,
845 failed: Vec::new(),
846 chunk_attempts_total,
847 store_durations_ms,
848 retries_per_chunk,
849 };
850 log_wave_summary(&result);
851 result
852 }
853}
854
855fn log_wave_summary(result: &WaveResult) {
861 let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
862 let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
863 let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
864 let chunk_attempts_total = result.chunk_attempts_total;
865 info!(
866 chunks_stored = result.stored.len(),
867 chunks_failed = result.failed.len(),
868 chunk_attempts_total,
869 retries_round_1,
870 retries_round_2,
871 retries_round_3,
872 store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
873 store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
874 store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
875 "chunk_store_wave_complete"
876 );
877}
878
879const CACHED_PROOF_SAFETY_MARGIN_SECS: u64 = 300;
891
892const CACHED_PROOF_MAX_AGE_SECS: u64 = 24 * 60 * 60;
899
900const CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS: u64 = 300;
911
912fn prune_locally_expired_proofs(
936 resume_key: &str,
937 proofs: HashMap<[u8; 32], Vec<u8>>,
938) -> HashMap<XorName, Vec<u8>> {
939 let now = std::time::SystemTime::now();
940 let max_safe_age = Duration::from_secs(
941 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
942 );
943 let max_future_skew = Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS);
944 let mut kept: HashMap<XorName, Vec<u8>> = HashMap::with_capacity(proofs.len());
945 let mut expired: Vec<([u8; 32], Vec<u8>)> = Vec::new();
951 for (addr, bytes) in proofs {
952 match deserialize_proof(&bytes) {
953 Ok((proof, _tx_hashes)) => {
954 if proof_is_safely_fresh(&proof, now, max_safe_age, max_future_skew) {
955 kept.insert(addr, bytes);
956 } else {
957 expired.push((addr, bytes));
958 }
959 }
960 Err(_) => {
961 expired.push((addr, bytes));
964 }
965 }
966 }
967 if !expired.is_empty() {
968 info!(
969 "Pruning {} stale cached proofs (quote.timestamp past safe-reuse window) \
970 before resume",
971 expired.len()
972 );
973 crate::data::client::cached_single::try_drop_proofs_for_file(resume_key, &expired);
974 }
975 kept
976}
977
978fn proof_is_safely_fresh(
985 proof: &ProofOfPayment,
986 now: std::time::SystemTime,
987 max_safe_age: Duration,
988 max_future_skew: Duration,
989) -> bool {
990 for (_peer, quote) in &proof.peer_quotes {
991 match now.duration_since(quote.timestamp) {
992 Ok(age) => {
993 if age > max_safe_age {
994 return false;
995 }
996 }
997 Err(future) => {
998 if future.duration() > max_future_skew {
999 return false;
1000 }
1001 }
1002 }
1003 }
1004 true
1005}
1006
1007#[cfg(test)]
1009mod send_assertions {
1010 use super::*;
1011
1012 fn _assert_send<T: Send>(_: &T) {}
1013
1014 #[allow(dead_code)]
1015 async fn _batch_upload_is_send(client: &Client) {
1016 let fut = client.batch_upload_chunks(Vec::new());
1017 _assert_send(&fut);
1018 }
1019}
1020
1021#[cfg(test)]
1022#[allow(clippy::unwrap_used)]
1023mod tests {
1024 use super::*;
1025 use ant_protocol::payment::QuotePaymentInfo;
1026 use ant_protocol::CLOSE_GROUP_SIZE;
1027
1028 const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
1030
1031 fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
1035 let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
1036 let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
1037 QuotePaymentInfo {
1038 quote_hash: QuoteHash::from([i as u8 + 1; 32]),
1039 rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
1040 amount: Amount::from(amount),
1041 price: Amount::from(amount),
1042 }
1043 });
1044
1045 PreparedChunk {
1046 content: Bytes::from(vec![0xAA; 32]),
1047 address: [0u8; 32],
1048 quoted_peers: Vec::new(),
1049 payment: SingleNodePayment { quotes },
1050 peer_quotes: Vec::new(),
1051 }
1052 }
1053
1054 #[test]
1055 fn payment_intent_from_single_chunk() {
1056 let chunk = make_prepared_chunk(300);
1057 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1058
1059 assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
1060 assert_eq!(intent.total_amount, Amount::from(300));
1061
1062 let (hash, addr, amt) = &intent.payments[0];
1063 assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
1064 assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
1065 assert_eq!(*amt, Amount::from(300));
1066 }
1067
1068 #[test]
1069 fn payment_intent_from_multiple_chunks() {
1070 let c1 = make_prepared_chunk(100);
1071 let c2 = make_prepared_chunk(250);
1072 let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
1073
1074 assert_eq!(intent.payments.len(), 2);
1075 assert_eq!(intent.total_amount, Amount::from(350));
1076 }
1077
1078 #[test]
1079 fn payment_intent_skips_all_zero_chunks() {
1080 let chunk = make_prepared_chunk(0);
1081 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
1082
1083 assert!(intent.payments.is_empty());
1084 assert_eq!(intent.total_amount, Amount::ZERO);
1085 }
1086
1087 #[test]
1088 fn payment_intent_empty_input() {
1089 let intent = PaymentIntent::from_prepared_chunks(&[]);
1090 assert!(intent.payments.is_empty());
1091 assert_eq!(intent.total_amount, Amount::ZERO);
1092 }
1093
1094 #[test]
1095 fn finalize_batch_payment_builds_proofs() {
1096 let chunk = make_prepared_chunk(500);
1097 let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
1098
1099 let mut tx_map = HashMap::new();
1100 tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
1101
1102 let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
1103
1104 assert_eq!(paid.len(), 1);
1105 assert!(!paid[0].proof_bytes.is_empty());
1106 assert_eq!(paid[0].address, [0u8; 32]);
1107 }
1108
1109 #[test]
1110 fn finalize_batch_payment_empty_input() {
1111 let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
1112 assert!(paid.is_empty());
1113 }
1114
1115 #[test]
1116 fn finalize_batch_payment_missing_tx_hash_errors() {
1117 let chunk = make_prepared_chunk(500);
1120
1121 let result = finalize_batch_payment(vec![chunk], &HashMap::new());
1122 assert!(result.is_err());
1123 let err = result.unwrap_err().to_string();
1124 assert!(err.contains("Missing tx hash"), "got: {err}");
1125 }
1126
1127 #[test]
1128 fn finalize_batch_payment_multiple_chunks() {
1129 let c1 = make_prepared_chunk(100);
1130 let c2 = make_prepared_chunk(200);
1131 let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
1132 let mut tx_map = HashMap::new();
1133 tx_map.insert(q1, TxHash::from([0xCC; 32]));
1136
1137 let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
1138 assert_eq!(paid.len(), 2);
1139 }
1140
1141 fn make_proof_with_timestamps(timestamps: &[std::time::SystemTime]) -> ProofOfPayment {
1151 let peer_quotes = timestamps
1152 .iter()
1153 .enumerate()
1154 .map(|(i, ts)| {
1155 let quote = PaymentQuote {
1156 content: xor_name::XorName([0u8; 32]),
1157 timestamp: *ts,
1158 price: Amount::from(1u64),
1159 rewards_address: RewardsAddress::new([1u8; 20]),
1160 pub_key: vec![],
1161 signature: vec![],
1162 };
1163 (EncodedPeerId::from([i as u8; 32]), quote)
1164 })
1165 .collect();
1166 ProofOfPayment { peer_quotes }
1167 }
1168
1169 fn default_max_future_skew() -> Duration {
1170 Duration::from_secs(CACHED_PROOF_FUTURE_SKEW_TOLERANCE_SECS)
1171 }
1172
1173 #[test]
1174 fn proof_is_safely_fresh_accepts_recent_quote() {
1175 let proof = make_proof_with_timestamps(&[std::time::SystemTime::now()]);
1176 assert!(proof_is_safely_fresh(
1177 &proof,
1178 std::time::SystemTime::now(),
1179 Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1180 default_max_future_skew(),
1181 ));
1182 }
1183
1184 #[test]
1185 fn proof_is_safely_fresh_rejects_quote_past_safe_window() {
1186 let too_old = std::time::SystemTime::now() - Duration::from_secs(23 * 60 * 60 + 57 * 60);
1191 let proof = make_proof_with_timestamps(&[too_old]);
1192 let max_safe = Duration::from_secs(
1193 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1194 );
1195 assert!(
1196 !proof_is_safely_fresh(
1197 &proof,
1198 std::time::SystemTime::now(),
1199 max_safe,
1200 default_max_future_skew(),
1201 ),
1202 "23h57m-old quote must fail safe-reuse check (limit is 24h - 5min margin)"
1203 );
1204 }
1205
1206 #[test]
1207 fn proof_is_safely_fresh_rejects_if_any_quote_is_stale() {
1208 let now = std::time::SystemTime::now();
1211 let fresh = now;
1212 let stale = now - Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1213 let proof = make_proof_with_timestamps(&[fresh, fresh, stale, fresh]);
1214 let max_safe = Duration::from_secs(
1215 CACHED_PROOF_MAX_AGE_SECS.saturating_sub(CACHED_PROOF_SAFETY_MARGIN_SECS),
1216 );
1217 assert!(!proof_is_safely_fresh(
1218 &proof,
1219 now,
1220 max_safe,
1221 default_max_future_skew(),
1222 ));
1223 }
1224
1225 #[test]
1226 fn proof_is_safely_fresh_accepts_slight_future_skew_within_node_tolerance() {
1227 let now = std::time::SystemTime::now();
1232 let slight_future = now + Duration::from_secs(60);
1233 let proof = make_proof_with_timestamps(&[slight_future]);
1234 let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1235 assert!(
1236 proof_is_safely_fresh(&proof, now, max_safe, default_max_future_skew()),
1237 "60s-future quote must be accepted (within node's 300s skew tolerance)"
1238 );
1239 }
1240
1241 #[test]
1242 fn proof_is_safely_fresh_rejects_far_future_dated_quote() {
1243 let now = std::time::SystemTime::now();
1247 let far_future = now + Duration::from_secs(3600);
1248 let proof = make_proof_with_timestamps(&[far_future]);
1249 let max_safe = Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS);
1250 assert!(!proof_is_safely_fresh(
1251 &proof,
1252 now,
1253 max_safe,
1254 default_max_future_skew(),
1255 ));
1256 }
1257
1258 #[test]
1259 fn proof_is_safely_fresh_empty_quotes_is_vacuously_safe() {
1260 let proof = make_proof_with_timestamps(&[]);
1265 assert!(proof_is_safely_fresh(
1266 &proof,
1267 std::time::SystemTime::now(),
1268 Duration::from_secs(CACHED_PROOF_MAX_AGE_SECS),
1269 default_max_future_skew(),
1270 ));
1271 }
1272}