1use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, Result};
13use ant_protocol::evm::{
14 Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15 RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment};
18use ant_protocol::transport::{MultiAddr, PeerId};
19use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
20use bytes::Bytes;
21use futures::stream::{self, StreamExt};
22use std::collections::{HashMap, HashSet};
23use std::time::Duration;
24use tokio::sync::mpsc;
25use tracing::{debug, info, warn};
26
27const PAYMENT_WAVE_SIZE: usize = 64;
29
30#[derive(Debug)]
32pub struct PreparedChunk {
33 pub content: Bytes,
35 pub address: XorName,
37 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
39 pub payment: SingleNodePayment,
41 pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
43}
44
45#[derive(Debug, Clone)]
47pub struct PaidChunk {
48 pub content: Bytes,
50 pub address: XorName,
52 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
54 pub proof_bytes: Vec<u8>,
56}
57
58#[derive(Debug)]
60pub struct WaveResult {
61 pub stored: Vec<XorName>,
63 pub failed: Vec<(XorName, String)>,
65}
66
67#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
72pub struct PaymentIntent {
73 pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
75 pub total_amount: Amount,
77}
78
79impl PaymentIntent {
80 pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
84 let mut payments = Vec::new();
85 let mut total = Amount::ZERO;
86 for chunk in prepared {
87 for info in &chunk.payment.quotes {
88 if !info.amount.is_zero() {
89 payments.push((info.quote_hash, info.rewards_address, info.amount));
90 total += info.amount;
91 }
92 }
93 }
94 Self {
95 payments,
96 total_amount: total,
97 }
98 }
99}
100
101fn build_paid_chunks(
108 prepared: Vec<PreparedChunk>,
109 tx_hash_map: &HashMap<QuoteHash, TxHash>,
110) -> Result<Vec<PaidChunk>> {
111 let mut paid_chunks = Vec::with_capacity(prepared.len());
112 for chunk in prepared {
113 let mut tx_hashes = Vec::new();
114 for info in &chunk.payment.quotes {
115 if !info.amount.is_zero() {
116 let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
117 Error::Payment(format!(
118 "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
119 hex::encode(info.quote_hash)
120 ))
121 })?;
122 tx_hashes.push(tx_hash);
123 }
124 }
125
126 let proof = PaymentProof {
127 proof_of_payment: ProofOfPayment {
128 peer_quotes: chunk.peer_quotes,
129 },
130 tx_hashes,
131 };
132
133 let proof_bytes = serialize_single_node_proof(&proof)
134 .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
135
136 paid_chunks.push(PaidChunk {
137 content: chunk.content,
138 address: chunk.address,
139 quoted_peers: chunk.quoted_peers,
140 proof_bytes,
141 });
142 }
143 Ok(paid_chunks)
144}
145
146pub fn finalize_batch_payment(
151 prepared: Vec<PreparedChunk>,
152 tx_hash_map: &HashMap<QuoteHash, TxHash>,
153) -> Result<Vec<PaidChunk>> {
154 build_paid_chunks(prepared, tx_hash_map)
155}
156
157impl Client {
158 pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
168 let address = compute_address(&content);
169 let data_size = u64::try_from(content.len())
170 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
171
172 let quotes_with_peers = match self
173 .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK)
174 .await
175 {
176 Ok(quotes) => quotes,
177 Err(Error::AlreadyStored) => {
178 debug!("Chunk {} already stored, skipping", hex::encode(address));
179 return Ok(None);
180 }
181 Err(e) => return Err(e),
182 };
183
184 let quoted_peers: Vec<(PeerId, Vec<MultiAddr>)> = quotes_with_peers
186 .iter()
187 .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone()))
188 .collect();
189
190 let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
193 let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
194
195 for (peer_id, _addrs, quote, price) in quotes_with_peers {
196 let encoded = peer_id_to_encoded(&peer_id)?;
197 peer_quotes.push((encoded, quote.clone()));
198 quotes_for_payment.push((quote, price));
199 }
200
201 let payment = SingleNodePayment::from_quotes(quotes_for_payment)
202 .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
203
204 Ok(Some(PreparedChunk {
205 content,
206 address,
207 quoted_peers,
208 payment,
209 peer_quotes,
210 }))
211 }
212
213 pub async fn batch_pay(
225 &self,
226 prepared: Vec<PreparedChunk>,
227 ) -> Result<(Vec<PaidChunk>, String, u128)> {
228 if prepared.is_empty() {
229 return Ok((Vec::new(), "0".to_string(), 0));
230 }
231
232 let wallet = self.require_wallet()?;
233
234 let intent = PaymentIntent::from_prepared_chunks(&prepared);
236 let storage_cost_atto = intent.total_amount.to_string();
237
238 let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
240 let mut all_payments = Vec::with_capacity(total_quotes);
241 for chunk in &prepared {
242 for info in &chunk.payment.quotes {
243 all_payments.push((info.quote_hash, info.rewards_address, info.amount));
244 }
245 }
246
247 debug!(
248 "Batch payment for {} chunks ({} quote entries)",
249 prepared.len(),
250 all_payments.len()
251 );
252
253 let (tx_hash_map, gas_info) =
254 wallet
255 .pay_for_quotes(all_payments)
256 .await
257 .map_err(|PayForQuotesError(err, _)| {
258 Error::Payment(format!("Batch payment failed: {err}"))
259 })?;
260
261 info!(
262 "Batch payment succeeded: {} transactions",
263 tx_hash_map.len()
264 );
265
266 let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
267 let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
268 Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
269 }
270
271 pub async fn batch_upload_chunks(
286 &self,
287 chunks: Vec<Bytes>,
288 ) -> Result<(Vec<XorName>, String, u128)> {
289 self.batch_upload_chunks_with_events(chunks, None, 0, 0)
290 .await
291 }
292
293 pub async fn batch_upload_chunks_with_events(
300 &self,
301 chunks: Vec<Bytes>,
302 progress: Option<&mpsc::Sender<UploadEvent>>,
303 stored_offset: usize,
304 file_total: usize,
305 ) -> Result<(Vec<XorName>, String, u128)> {
306 if chunks.is_empty() {
307 return Ok((Vec::new(), "0".to_string(), 0));
308 }
309
310 let total_chunks = chunks.len();
311 let quote_cap = self.controller().quote.current();
312 let store_cap = self.controller().store.current();
313 debug!(
314 "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
315 (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
316 );
317
318 let mut all_addresses = Vec::with_capacity(total_chunks);
319 let mut seen_addresses: HashSet<XorName> = HashSet::new();
320
321 let mut total_storage = Amount::ZERO;
323 let mut total_gas: u128 = 0;
324
325 let mut unique_chunks = Vec::with_capacity(total_chunks);
327 for chunk in chunks {
328 let address = compute_address(&chunk);
329 if seen_addresses.insert(address) {
330 unique_chunks.push(chunk);
331 } else {
332 debug!("Skipping duplicate chunk {}", hex::encode(address));
333 all_addresses.push(address);
334 if let Some(tx) = progress {
335 let _ = tx.try_send(UploadEvent::ChunkStored {
336 stored: stored_offset + all_addresses.len(),
337 total: file_total,
338 });
339 }
340 }
341 }
342
343 let waves: Vec<Vec<Bytes>> = unique_chunks
345 .chunks(PAYMENT_WAVE_SIZE)
346 .map(<[Bytes]>::to_vec)
347 .collect();
348 let wave_count = waves.len();
349
350 debug!(
351 "{total_chunks} chunks -> {} unique -> {wave_count} waves",
352 seen_addresses.len()
353 );
354
355 let mut pending_store: Option<Vec<PaidChunk>> = None;
356 let mut total_quoted: usize = 0;
357
358 for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
359 let wave_num = wave_idx + 1;
360 let wave_size = wave_chunks.len();
361
362 let (prepare_result, store_result) = match pending_store.take() {
364 Some(paid_chunks) => {
365 let store_offset = stored_offset + all_addresses.len();
366 let quoted_offset = stored_offset + total_quoted;
367 let (prep, stored) = tokio::join!(
368 self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
369 self.store_paid_chunks_with_events(
370 paid_chunks,
371 progress,
372 store_offset,
373 file_total
374 )
375 );
376 (prep, Some(stored))
377 }
378 None => {
379 let quoted_offset = stored_offset + total_quoted;
380 let result = self
381 .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
382 .await;
383 (result, None)
384 }
385 };
386 total_quoted += wave_size;
387
388 if let Some(wave_result) = store_result {
390 all_addresses.extend(&wave_result.stored);
391 if !wave_result.failed.is_empty() {
392 let failed_count = wave_result.failed.len();
393 warn!("{failed_count} chunks failed to store after retries");
394 return Err(Error::PartialUpload {
395 stored: all_addresses.clone(),
396 stored_count: stored_offset + all_addresses.len(),
397 failed: wave_result.failed,
398 failed_count,
399 total_chunks: file_total,
400 reason: "wave store failed after retries".into(),
401 });
402 }
403 }
404
405 let (prepared_chunks, already_stored) = prepare_result?;
406 all_addresses.extend(&already_stored);
407 if let Some(tx) = progress {
408 for _ in &already_stored {
409 let _ = tx.try_send(UploadEvent::ChunkStored {
410 stored: stored_offset + all_addresses.len(),
411 total: file_total,
412 });
413 }
414 }
415
416 if prepared_chunks.is_empty() {
417 info!("Wave {wave_num}/{wave_count}: all chunks already stored");
418 continue;
419 }
420
421 info!(
422 "Wave {wave_num}/{wave_count}: paying for {} chunks",
423 prepared_chunks.len()
424 );
425 let (paid_chunks, wave_storage, wave_gas) = self.batch_pay(prepared_chunks).await?;
426 if let Ok(cost) = wave_storage.parse::<Amount>() {
427 total_storage += cost;
428 }
429 total_gas = total_gas.saturating_add(wave_gas);
430 pending_store = Some(paid_chunks);
431 }
432
433 if let Some(paid_chunks) = pending_store {
435 let store_offset = stored_offset + all_addresses.len();
436 let wave_result = self
437 .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
438 .await;
439 all_addresses.extend(&wave_result.stored);
440 if !wave_result.failed.is_empty() {
441 let failed_count = wave_result.failed.len();
442 warn!("{failed_count} chunks failed to store after retries (final wave)");
443 return Err(Error::PartialUpload {
444 stored: all_addresses.clone(),
445 stored_count: stored_offset + all_addresses.len(),
446 failed: wave_result.failed,
447 failed_count,
448 total_chunks: file_total,
449 reason: "final wave store failed after retries".into(),
450 });
451 }
452 }
453
454 debug!("Batch upload complete: {} addresses", all_addresses.len());
455 Ok((all_addresses, total_storage.to_string(), total_gas))
456 }
457
458 async fn prepare_wave(
463 &self,
464 chunks: Vec<Bytes>,
465 progress: Option<&mpsc::Sender<UploadEvent>>,
466 quoted_offset: usize,
467 file_total: usize,
468 ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
469 let chunk_count = chunks.len();
470 let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
471 .into_iter()
472 .map(|c| {
473 let addr = compute_address(&c);
474 (c, addr)
475 })
476 .collect();
477
478 let quote_limiter = self.controller().quote.clone();
479 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
484 let mut quote_stream = stream::iter(chunks_with_addr)
485 .map(|(content, address)| {
486 let limiter = quote_limiter.clone();
487 async move {
488 let result = observe_op(
489 &limiter,
490 || async move { self.prepare_chunk_payment(content).await },
491 classify_error,
492 )
493 .await;
494 (address, result)
495 }
496 })
497 .buffer_unordered(quote_concurrency);
498
499 let mut prepared = Vec::with_capacity(chunk_count);
500 let mut already_stored = Vec::new();
501 let mut quoted_count = 0usize;
502
503 while let Some((address, result)) = quote_stream.next().await {
504 let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
505 match result? {
506 Some(chunk) => prepared.push(chunk),
507 None => already_stored.push(address),
508 }
509 quoted_count += 1;
510 let progress_num = quoted_offset + quoted_count;
511 if file_total > 0 {
512 if chunk_already_stored {
513 info!("Verified {progress_num}/{file_total} (already stored)");
514 } else {
515 info!("Quoted {progress_num}/{file_total}");
516 }
517 }
518 if let Some(tx) = progress {
519 let _ = tx.try_send(UploadEvent::ChunkQuoted {
520 quoted: progress_num,
521 total: file_total,
522 });
523 }
524 }
525
526 Ok((prepared, already_stored))
527 }
528
529 pub(crate) async fn store_paid_chunks_with_events(
541 &self,
542 paid_chunks: Vec<PaidChunk>,
543 progress: Option<&mpsc::Sender<UploadEvent>>,
544 stored_before: usize,
545 total_chunks: usize,
546 ) -> WaveResult {
547 const MAX_RETRIES: u32 = 3;
548 const BASE_DELAY_MS: u64 = 500;
549
550 let mut stored = Vec::new();
551 let mut to_retry = paid_chunks;
552
553 for attempt in 0..=MAX_RETRIES {
554 if attempt > 0 {
555 let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
556 tokio::time::sleep(delay).await;
557 info!(
558 "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
559 to_retry.len()
560 );
561 }
562
563 let store_limiter = self.controller().store.clone();
564 let store_concurrency = store_limiter.current().min(to_retry.len().max(1));
565 let mut upload_stream = stream::iter(to_retry)
566 .map(|chunk| {
567 let chunk_clone = chunk.clone();
568 let limiter = store_limiter.clone();
569 async move {
570 let result = observe_op(
571 &limiter,
572 || async move {
573 self.chunk_put_to_close_group(
574 chunk.content,
575 chunk.proof_bytes,
576 &chunk.quoted_peers,
577 )
578 .await
579 },
580 classify_error,
581 )
582 .await;
583 (chunk_clone, result)
584 }
585 })
586 .buffer_unordered(store_concurrency);
587
588 let mut failed_this_round = Vec::new();
589 while let Some((chunk, result)) = upload_stream.next().await {
590 match result {
591 Ok(name) => {
592 stored.push(name);
593 let stored_num = stored_before + stored.len();
594 if total_chunks > 0 {
595 info!("Stored {stored_num}/{total_chunks}");
596 }
597 if let Some(tx) = progress {
598 let _ = tx.try_send(UploadEvent::ChunkStored {
599 stored: stored_num,
600 total: total_chunks,
601 });
602 }
603 }
604 Err(e) => failed_this_round.push((chunk, e.to_string())),
605 }
606 }
607
608 if failed_this_round.is_empty() {
609 return WaveResult {
610 stored,
611 failed: Vec::new(),
612 };
613 }
614
615 if attempt == MAX_RETRIES {
616 let failed = failed_this_round
617 .into_iter()
618 .map(|(c, e)| (c.address, e))
619 .collect();
620 return WaveResult { stored, failed };
621 }
622
623 warn!(
624 "{} chunks failed on attempt {}, will retry",
625 failed_this_round.len(),
626 attempt + 1
627 );
628 to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
629 }
630
631 WaveResult {
633 stored,
634 failed: Vec::new(),
635 }
636 }
637}
638
639#[cfg(test)]
641mod send_assertions {
642 use super::*;
643
644 fn _assert_send<T: Send>(_: &T) {}
645
646 #[allow(dead_code)]
647 async fn _batch_upload_is_send(client: &Client) {
648 let fut = client.batch_upload_chunks(Vec::new());
649 _assert_send(&fut);
650 }
651}
652
653#[cfg(test)]
654#[allow(clippy::unwrap_used)]
655mod tests {
656 use super::*;
657 use ant_protocol::payment::QuotePaymentInfo;
658 use ant_protocol::CLOSE_GROUP_SIZE;
659
660 const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
662
663 fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
667 let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
668 let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
669 QuotePaymentInfo {
670 quote_hash: QuoteHash::from([i as u8 + 1; 32]),
671 rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
672 amount: Amount::from(amount),
673 price: Amount::from(amount),
674 }
675 });
676
677 PreparedChunk {
678 content: Bytes::from(vec![0xAA; 32]),
679 address: [0u8; 32],
680 quoted_peers: Vec::new(),
681 payment: SingleNodePayment { quotes },
682 peer_quotes: Vec::new(),
683 }
684 }
685
686 #[test]
687 fn payment_intent_from_single_chunk() {
688 let chunk = make_prepared_chunk(300);
689 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
690
691 assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
692 assert_eq!(intent.total_amount, Amount::from(300));
693
694 let (hash, addr, amt) = &intent.payments[0];
695 assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
696 assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
697 assert_eq!(*amt, Amount::from(300));
698 }
699
700 #[test]
701 fn payment_intent_from_multiple_chunks() {
702 let c1 = make_prepared_chunk(100);
703 let c2 = make_prepared_chunk(250);
704 let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
705
706 assert_eq!(intent.payments.len(), 2);
707 assert_eq!(intent.total_amount, Amount::from(350));
708 }
709
710 #[test]
711 fn payment_intent_skips_all_zero_chunks() {
712 let chunk = make_prepared_chunk(0);
713 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
714
715 assert!(intent.payments.is_empty());
716 assert_eq!(intent.total_amount, Amount::ZERO);
717 }
718
719 #[test]
720 fn payment_intent_empty_input() {
721 let intent = PaymentIntent::from_prepared_chunks(&[]);
722 assert!(intent.payments.is_empty());
723 assert_eq!(intent.total_amount, Amount::ZERO);
724 }
725
726 #[test]
727 fn finalize_batch_payment_builds_proofs() {
728 let chunk = make_prepared_chunk(500);
729 let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
730
731 let mut tx_map = HashMap::new();
732 tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
733
734 let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
735
736 assert_eq!(paid.len(), 1);
737 assert!(!paid[0].proof_bytes.is_empty());
738 assert_eq!(paid[0].address, [0u8; 32]);
739 }
740
741 #[test]
742 fn finalize_batch_payment_empty_input() {
743 let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
744 assert!(paid.is_empty());
745 }
746
747 #[test]
748 fn finalize_batch_payment_missing_tx_hash_errors() {
749 let chunk = make_prepared_chunk(500);
752
753 let result = finalize_batch_payment(vec![chunk], &HashMap::new());
754 assert!(result.is_err());
755 let err = result.unwrap_err().to_string();
756 assert!(err.contains("Missing tx hash"), "got: {err}");
757 }
758
759 #[test]
760 fn finalize_batch_payment_multiple_chunks() {
761 let c1 = make_prepared_chunk(100);
762 let c2 = make_prepared_chunk(200);
763 let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
764 let mut tx_map = HashMap::new();
765 tx_map.insert(q1, TxHash::from([0xCC; 32]));
768
769 let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
770 assert_eq!(paid.len(), 2);
771 }
772}