1use crate::data::client::file::UploadEvent;
8use crate::data::client::payment::peer_id_to_encoded;
9use crate::data::client::Client;
10use crate::data::error::{Error, Result};
11use ant_protocol::evm::{
12 Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
13 RewardsAddress, TxHash,
14};
15use ant_protocol::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment};
16use ant_protocol::transport::{MultiAddr, PeerId};
17use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
18use bytes::Bytes;
19use futures::stream::{self, StreamExt};
20use std::collections::{HashMap, HashSet};
21use std::time::Duration;
22use tokio::sync::mpsc;
23use tracing::{debug, info, warn};
24
25const PAYMENT_WAVE_SIZE: usize = 64;
27
28#[derive(Debug)]
30pub struct PreparedChunk {
31 pub content: Bytes,
33 pub address: XorName,
35 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
37 pub payment: SingleNodePayment,
39 pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
41}
42
43#[derive(Debug, Clone)]
45pub struct PaidChunk {
46 pub content: Bytes,
48 pub address: XorName,
50 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
52 pub proof_bytes: Vec<u8>,
54}
55
56#[derive(Debug)]
58pub struct WaveResult {
59 pub stored: Vec<XorName>,
61 pub failed: Vec<(XorName, String)>,
63}
64
65#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
70pub struct PaymentIntent {
71 pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
73 pub total_amount: Amount,
75}
76
77impl PaymentIntent {
78 pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
82 let mut payments = Vec::new();
83 let mut total = Amount::ZERO;
84 for chunk in prepared {
85 for info in &chunk.payment.quotes {
86 if !info.amount.is_zero() {
87 payments.push((info.quote_hash, info.rewards_address, info.amount));
88 total += info.amount;
89 }
90 }
91 }
92 Self {
93 payments,
94 total_amount: total,
95 }
96 }
97}
98
99fn build_paid_chunks(
106 prepared: Vec<PreparedChunk>,
107 tx_hash_map: &HashMap<QuoteHash, TxHash>,
108) -> Result<Vec<PaidChunk>> {
109 let mut paid_chunks = Vec::with_capacity(prepared.len());
110 for chunk in prepared {
111 let mut tx_hashes = Vec::new();
112 for info in &chunk.payment.quotes {
113 if !info.amount.is_zero() {
114 let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
115 Error::Payment(format!(
116 "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
117 hex::encode(info.quote_hash)
118 ))
119 })?;
120 tx_hashes.push(tx_hash);
121 }
122 }
123
124 let proof = PaymentProof {
125 proof_of_payment: ProofOfPayment {
126 peer_quotes: chunk.peer_quotes,
127 },
128 tx_hashes,
129 };
130
131 let proof_bytes = serialize_single_node_proof(&proof)
132 .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
133
134 paid_chunks.push(PaidChunk {
135 content: chunk.content,
136 address: chunk.address,
137 quoted_peers: chunk.quoted_peers,
138 proof_bytes,
139 });
140 }
141 Ok(paid_chunks)
142}
143
144pub fn finalize_batch_payment(
149 prepared: Vec<PreparedChunk>,
150 tx_hash_map: &HashMap<QuoteHash, TxHash>,
151) -> Result<Vec<PaidChunk>> {
152 build_paid_chunks(prepared, tx_hash_map)
153}
154
155impl Client {
156 pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
166 let address = compute_address(&content);
167 let data_size = u64::try_from(content.len())
168 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
169
170 let quotes_with_peers = match self
171 .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK)
172 .await
173 {
174 Ok(quotes) => quotes,
175 Err(Error::AlreadyStored) => {
176 debug!("Chunk {} already stored, skipping", hex::encode(address));
177 return Ok(None);
178 }
179 Err(e) => return Err(e),
180 };
181
182 let quoted_peers: Vec<(PeerId, Vec<MultiAddr>)> = quotes_with_peers
184 .iter()
185 .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone()))
186 .collect();
187
188 let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
191 let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
192
193 for (peer_id, _addrs, quote, price) in quotes_with_peers {
194 let encoded = peer_id_to_encoded(&peer_id)?;
195 peer_quotes.push((encoded, quote.clone()));
196 quotes_for_payment.push((quote, price));
197 }
198
199 let payment = SingleNodePayment::from_quotes(quotes_for_payment)
200 .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
201
202 Ok(Some(PreparedChunk {
203 content,
204 address,
205 quoted_peers,
206 payment,
207 peer_quotes,
208 }))
209 }
210
211 pub async fn batch_pay(
223 &self,
224 prepared: Vec<PreparedChunk>,
225 ) -> Result<(Vec<PaidChunk>, String, u128)> {
226 if prepared.is_empty() {
227 return Ok((Vec::new(), "0".to_string(), 0));
228 }
229
230 let wallet = self.require_wallet()?;
231
232 let intent = PaymentIntent::from_prepared_chunks(&prepared);
234 let storage_cost_atto = intent.total_amount.to_string();
235
236 let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
238 let mut all_payments = Vec::with_capacity(total_quotes);
239 for chunk in &prepared {
240 for info in &chunk.payment.quotes {
241 all_payments.push((info.quote_hash, info.rewards_address, info.amount));
242 }
243 }
244
245 debug!(
246 "Batch payment for {} chunks ({} quote entries)",
247 prepared.len(),
248 all_payments.len()
249 );
250
251 let (tx_hash_map, gas_info) =
252 wallet
253 .pay_for_quotes(all_payments)
254 .await
255 .map_err(|PayForQuotesError(err, _)| {
256 Error::Payment(format!("Batch payment failed: {err}"))
257 })?;
258
259 info!(
260 "Batch payment succeeded: {} transactions",
261 tx_hash_map.len()
262 );
263
264 let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
265 let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
266 Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
267 }
268
269 pub async fn batch_upload_chunks(
284 &self,
285 chunks: Vec<Bytes>,
286 ) -> Result<(Vec<XorName>, String, u128)> {
287 self.batch_upload_chunks_with_events(chunks, None, 0, 0)
288 .await
289 }
290
291 pub async fn batch_upload_chunks_with_events(
298 &self,
299 chunks: Vec<Bytes>,
300 progress: Option<&mpsc::Sender<UploadEvent>>,
301 stored_offset: usize,
302 file_total: usize,
303 ) -> Result<(Vec<XorName>, String, u128)> {
304 if chunks.is_empty() {
305 return Ok((Vec::new(), "0".to_string(), 0));
306 }
307
308 let total_chunks = chunks.len();
309 let quote_concurrency = self.config().quote_concurrency;
310 let store_concurrency = self.config().store_concurrency;
311 debug!("Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} (quote_concurrency: {quote_concurrency}, store_concurrency: {store_concurrency})");
312
313 let mut all_addresses = Vec::with_capacity(total_chunks);
314 let mut seen_addresses: HashSet<XorName> = HashSet::new();
315
316 let mut total_storage = Amount::ZERO;
318 let mut total_gas: u128 = 0;
319
320 let mut unique_chunks = Vec::with_capacity(total_chunks);
322 for chunk in chunks {
323 let address = compute_address(&chunk);
324 if seen_addresses.insert(address) {
325 unique_chunks.push(chunk);
326 } else {
327 debug!("Skipping duplicate chunk {}", hex::encode(address));
328 all_addresses.push(address);
329 if let Some(tx) = progress {
330 let _ = tx.try_send(UploadEvent::ChunkStored {
331 stored: stored_offset + all_addresses.len(),
332 total: file_total,
333 });
334 }
335 }
336 }
337
338 let waves: Vec<Vec<Bytes>> = unique_chunks
340 .chunks(PAYMENT_WAVE_SIZE)
341 .map(<[Bytes]>::to_vec)
342 .collect();
343 let wave_count = waves.len();
344
345 debug!(
346 "{total_chunks} chunks -> {} unique -> {wave_count} waves",
347 seen_addresses.len()
348 );
349
350 let mut pending_store: Option<Vec<PaidChunk>> = None;
351 let mut total_quoted: usize = 0;
352
353 for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
354 let wave_num = wave_idx + 1;
355 let wave_size = wave_chunks.len();
356
357 let (prepare_result, store_result) = match pending_store.take() {
359 Some(paid_chunks) => {
360 let store_offset = stored_offset + all_addresses.len();
361 let quoted_offset = stored_offset + total_quoted;
362 let (prep, stored) = tokio::join!(
363 self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
364 self.store_paid_chunks_with_events(
365 paid_chunks,
366 progress,
367 store_offset,
368 file_total
369 )
370 );
371 (prep, Some(stored))
372 }
373 None => {
374 let quoted_offset = stored_offset + total_quoted;
375 let result = self
376 .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
377 .await;
378 (result, None)
379 }
380 };
381 total_quoted += wave_size;
382
383 if let Some(wave_result) = store_result {
385 all_addresses.extend(&wave_result.stored);
386 if !wave_result.failed.is_empty() {
387 let failed_count = wave_result.failed.len();
388 warn!("{failed_count} chunks failed to store after retries");
389 return Err(Error::PartialUpload {
390 stored: all_addresses.clone(),
391 stored_count: stored_offset + all_addresses.len(),
392 failed: wave_result.failed,
393 failed_count,
394 total_chunks: file_total,
395 reason: "wave store failed after retries".into(),
396 });
397 }
398 }
399
400 let (prepared_chunks, already_stored) = prepare_result?;
401 all_addresses.extend(&already_stored);
402 if let Some(tx) = progress {
403 for _ in &already_stored {
404 let _ = tx.try_send(UploadEvent::ChunkStored {
405 stored: stored_offset + all_addresses.len(),
406 total: file_total,
407 });
408 }
409 }
410
411 if prepared_chunks.is_empty() {
412 info!("Wave {wave_num}/{wave_count}: all chunks already stored");
413 continue;
414 }
415
416 info!(
417 "Wave {wave_num}/{wave_count}: paying for {} chunks",
418 prepared_chunks.len()
419 );
420 let (paid_chunks, wave_storage, wave_gas) = self.batch_pay(prepared_chunks).await?;
421 if let Ok(cost) = wave_storage.parse::<Amount>() {
422 total_storage += cost;
423 }
424 total_gas = total_gas.saturating_add(wave_gas);
425 pending_store = Some(paid_chunks);
426 }
427
428 if let Some(paid_chunks) = pending_store {
430 let store_offset = stored_offset + all_addresses.len();
431 let wave_result = self
432 .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
433 .await;
434 all_addresses.extend(&wave_result.stored);
435 if !wave_result.failed.is_empty() {
436 let failed_count = wave_result.failed.len();
437 warn!("{failed_count} chunks failed to store after retries (final wave)");
438 return Err(Error::PartialUpload {
439 stored: all_addresses.clone(),
440 stored_count: stored_offset + all_addresses.len(),
441 failed: wave_result.failed,
442 failed_count,
443 total_chunks: file_total,
444 reason: "final wave store failed after retries".into(),
445 });
446 }
447 }
448
449 debug!("Batch upload complete: {} addresses", all_addresses.len());
450 Ok((all_addresses, total_storage.to_string(), total_gas))
451 }
452
453 async fn prepare_wave(
458 &self,
459 chunks: Vec<Bytes>,
460 progress: Option<&mpsc::Sender<UploadEvent>>,
461 quoted_offset: usize,
462 file_total: usize,
463 ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
464 let chunk_count = chunks.len();
465 let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
466 .into_iter()
467 .map(|c| {
468 let addr = compute_address(&c);
469 (c, addr)
470 })
471 .collect();
472
473 let mut quote_stream = stream::iter(chunks_with_addr)
474 .map(|(content, address)| async move {
475 (address, self.prepare_chunk_payment(content).await)
476 })
477 .buffer_unordered(self.config().quote_concurrency);
478
479 let mut prepared = Vec::with_capacity(chunk_count);
480 let mut already_stored = Vec::new();
481 let mut quoted_count = 0usize;
482
483 while let Some((address, result)) = quote_stream.next().await {
484 let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
485 match result? {
486 Some(chunk) => prepared.push(chunk),
487 None => already_stored.push(address),
488 }
489 quoted_count += 1;
490 let progress_num = quoted_offset + quoted_count;
491 if file_total > 0 {
492 if chunk_already_stored {
493 info!("Verified {progress_num}/{file_total} (already stored)");
494 } else {
495 info!("Quoted {progress_num}/{file_total}");
496 }
497 }
498 if let Some(tx) = progress {
499 let _ = tx.try_send(UploadEvent::ChunkQuoted {
500 quoted: progress_num,
501 total: file_total,
502 });
503 }
504 }
505
506 Ok((prepared, already_stored))
507 }
508
509 pub(crate) async fn store_paid_chunks(&self, paid_chunks: Vec<PaidChunk>) -> WaveResult {
515 self.store_paid_chunks_with_events(paid_chunks, None, 0, 0)
516 .await
517 }
518
519 pub(crate) async fn store_paid_chunks_with_events(
526 &self,
527 paid_chunks: Vec<PaidChunk>,
528 progress: Option<&mpsc::Sender<UploadEvent>>,
529 stored_before: usize,
530 total_chunks: usize,
531 ) -> WaveResult {
532 const MAX_RETRIES: u32 = 3;
533 const BASE_DELAY_MS: u64 = 500;
534
535 let mut stored = Vec::new();
536 let mut to_retry = paid_chunks;
537
538 for attempt in 0..=MAX_RETRIES {
539 if attempt > 0 {
540 let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
541 tokio::time::sleep(delay).await;
542 info!(
543 "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
544 to_retry.len()
545 );
546 }
547
548 let mut upload_stream = stream::iter(to_retry)
549 .map(|chunk| {
550 let chunk_clone = chunk.clone();
551 async move {
552 let result = self
553 .chunk_put_to_close_group(
554 chunk.content,
555 chunk.proof_bytes,
556 &chunk.quoted_peers,
557 )
558 .await;
559 (chunk_clone, result)
560 }
561 })
562 .buffer_unordered(self.config().store_concurrency);
563
564 let mut failed_this_round = Vec::new();
565 while let Some((chunk, result)) = upload_stream.next().await {
566 match result {
567 Ok(name) => {
568 stored.push(name);
569 let stored_num = stored_before + stored.len();
570 if total_chunks > 0 {
571 info!("Stored {stored_num}/{total_chunks}");
572 }
573 if let Some(tx) = progress {
574 let _ = tx.try_send(UploadEvent::ChunkStored {
575 stored: stored_num,
576 total: total_chunks,
577 });
578 }
579 }
580 Err(e) => failed_this_round.push((chunk, e.to_string())),
581 }
582 }
583
584 if failed_this_round.is_empty() {
585 return WaveResult {
586 stored,
587 failed: Vec::new(),
588 };
589 }
590
591 if attempt == MAX_RETRIES {
592 let failed = failed_this_round
593 .into_iter()
594 .map(|(c, e)| (c.address, e))
595 .collect();
596 return WaveResult { stored, failed };
597 }
598
599 warn!(
600 "{} chunks failed on attempt {}, will retry",
601 failed_this_round.len(),
602 attempt + 1
603 );
604 to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
605 }
606
607 WaveResult {
609 stored,
610 failed: Vec::new(),
611 }
612 }
613}
614
615#[cfg(test)]
617mod send_assertions {
618 use super::*;
619
620 fn _assert_send<T: Send>(_: &T) {}
621
622 #[allow(dead_code)]
623 async fn _batch_upload_is_send(client: &Client) {
624 let fut = client.batch_upload_chunks(Vec::new());
625 _assert_send(&fut);
626 }
627}
628
629#[cfg(test)]
630#[allow(clippy::unwrap_used)]
631mod tests {
632 use super::*;
633 use ant_protocol::payment::QuotePaymentInfo;
634 use ant_protocol::CLOSE_GROUP_SIZE;
635
636 const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
638
639 fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
643 let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
644 let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
645 QuotePaymentInfo {
646 quote_hash: QuoteHash::from([i as u8 + 1; 32]),
647 rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
648 amount: Amount::from(amount),
649 price: Amount::from(amount),
650 }
651 });
652
653 PreparedChunk {
654 content: Bytes::from(vec![0xAA; 32]),
655 address: [0u8; 32],
656 quoted_peers: Vec::new(),
657 payment: SingleNodePayment { quotes },
658 peer_quotes: Vec::new(),
659 }
660 }
661
662 #[test]
663 fn payment_intent_from_single_chunk() {
664 let chunk = make_prepared_chunk(300);
665 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
666
667 assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
668 assert_eq!(intent.total_amount, Amount::from(300));
669
670 let (hash, addr, amt) = &intent.payments[0];
671 assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
672 assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
673 assert_eq!(*amt, Amount::from(300));
674 }
675
676 #[test]
677 fn payment_intent_from_multiple_chunks() {
678 let c1 = make_prepared_chunk(100);
679 let c2 = make_prepared_chunk(250);
680 let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
681
682 assert_eq!(intent.payments.len(), 2);
683 assert_eq!(intent.total_amount, Amount::from(350));
684 }
685
686 #[test]
687 fn payment_intent_skips_all_zero_chunks() {
688 let chunk = make_prepared_chunk(0);
689 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
690
691 assert!(intent.payments.is_empty());
692 assert_eq!(intent.total_amount, Amount::ZERO);
693 }
694
695 #[test]
696 fn payment_intent_empty_input() {
697 let intent = PaymentIntent::from_prepared_chunks(&[]);
698 assert!(intent.payments.is_empty());
699 assert_eq!(intent.total_amount, Amount::ZERO);
700 }
701
702 #[test]
703 fn finalize_batch_payment_builds_proofs() {
704 let chunk = make_prepared_chunk(500);
705 let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
706
707 let mut tx_map = HashMap::new();
708 tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
709
710 let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
711
712 assert_eq!(paid.len(), 1);
713 assert!(!paid[0].proof_bytes.is_empty());
714 assert_eq!(paid[0].address, [0u8; 32]);
715 }
716
717 #[test]
718 fn finalize_batch_payment_empty_input() {
719 let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
720 assert!(paid.is_empty());
721 }
722
723 #[test]
724 fn finalize_batch_payment_missing_tx_hash_errors() {
725 let chunk = make_prepared_chunk(500);
728
729 let result = finalize_batch_payment(vec![chunk], &HashMap::new());
730 assert!(result.is_err());
731 let err = result.unwrap_err().to_string();
732 assert!(err.contains("Missing tx hash"), "got: {err}");
733 }
734
735 #[test]
736 fn finalize_batch_payment_multiple_chunks() {
737 let c1 = make_prepared_chunk(100);
738 let c2 = make_prepared_chunk(200);
739 let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
740 let mut tx_map = HashMap::new();
741 tx_map.insert(q1, TxHash::from([0xCC; 32]));
744
745 let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
746 assert_eq!(paid.len(), 2);
747 }
748}