1use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, Result};
13use ant_protocol::evm::{
14 Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15 RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment};
18use ant_protocol::transport::{MultiAddr, PeerId};
19use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
20use bytes::Bytes;
21use futures::stream::{self, StreamExt};
22use std::collections::{HashMap, HashSet};
23use std::time::{Duration, Instant};
24use tokio::sync::mpsc;
25use tracing::{debug, info, warn};
26
27const PAYMENT_WAVE_SIZE: usize = 64;
29
30#[derive(Debug)]
32pub struct PreparedChunk {
33 pub content: Bytes,
35 pub address: XorName,
37 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
39 pub payment: SingleNodePayment,
41 pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
43}
44
45#[derive(Debug, Clone)]
47pub struct PaidChunk {
48 pub content: Bytes,
50 pub address: XorName,
52 pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
54 pub proof_bytes: Vec<u8>,
56}
57
58#[derive(Debug)]
60pub struct WaveResult {
61 pub stored: Vec<XorName>,
63 pub failed: Vec<(XorName, String)>,
65 pub chunk_attempts_total: usize,
67 pub store_durations_ms: Vec<u64>,
69 pub retries_per_chunk: Vec<u32>,
71}
72
73#[derive(Debug, Default, Clone)]
80pub struct WaveAggregateStats {
81 pub chunk_attempts_total: usize,
83 pub store_durations_ms: Vec<u64>,
86 pub retries_histogram: [usize; 4],
91}
92
93impl WaveAggregateStats {
94 pub fn absorb(&mut self, wave: &WaveResult) {
96 self.chunk_attempts_total = self
97 .chunk_attempts_total
98 .saturating_add(wave.chunk_attempts_total);
99 self.store_durations_ms.extend(&wave.store_durations_ms);
100 for &r in &wave.retries_per_chunk {
101 let idx = (r as usize).min(self.retries_histogram.len() - 1);
102 self.retries_histogram[idx] = self.retries_histogram[idx].saturating_add(1);
103 }
104 }
105}
106
107fn percentile(values: &[u64], p: f64) -> u64 {
113 if values.is_empty() {
114 return 0;
115 }
116 let mut sorted = values.to_vec();
117 sorted.sort_unstable();
118 let p = p.clamp(0.0, 1.0);
119 let n = sorted.len();
121 #[allow(
122 clippy::cast_possible_truncation,
123 clippy::cast_sign_loss,
124 clippy::cast_precision_loss
125 )]
126 let rank = ((p * n as f64).ceil() as usize)
127 .saturating_sub(1)
128 .min(n - 1);
129 sorted[rank]
130}
131
132#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
137pub struct PaymentIntent {
138 pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
140 pub total_amount: Amount,
142}
143
144impl PaymentIntent {
145 pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
149 let mut payments = Vec::new();
150 let mut total = Amount::ZERO;
151 for chunk in prepared {
152 for info in &chunk.payment.quotes {
153 if !info.amount.is_zero() {
154 payments.push((info.quote_hash, info.rewards_address, info.amount));
155 total += info.amount;
156 }
157 }
158 }
159 Self {
160 payments,
161 total_amount: total,
162 }
163 }
164}
165
166fn build_paid_chunks(
173 prepared: Vec<PreparedChunk>,
174 tx_hash_map: &HashMap<QuoteHash, TxHash>,
175) -> Result<Vec<PaidChunk>> {
176 let mut paid_chunks = Vec::with_capacity(prepared.len());
177 for chunk in prepared {
178 let mut tx_hashes = Vec::new();
179 for info in &chunk.payment.quotes {
180 if !info.amount.is_zero() {
181 let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
182 Error::Payment(format!(
183 "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
184 hex::encode(info.quote_hash)
185 ))
186 })?;
187 tx_hashes.push(tx_hash);
188 }
189 }
190
191 let proof = PaymentProof {
192 proof_of_payment: ProofOfPayment {
193 peer_quotes: chunk.peer_quotes,
194 },
195 tx_hashes,
196 };
197
198 let proof_bytes = serialize_single_node_proof(&proof)
199 .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
200
201 paid_chunks.push(PaidChunk {
202 content: chunk.content,
203 address: chunk.address,
204 quoted_peers: chunk.quoted_peers,
205 proof_bytes,
206 });
207 }
208 Ok(paid_chunks)
209}
210
211pub fn finalize_batch_payment(
216 prepared: Vec<PreparedChunk>,
217 tx_hash_map: &HashMap<QuoteHash, TxHash>,
218) -> Result<Vec<PaidChunk>> {
219 build_paid_chunks(prepared, tx_hash_map)
220}
221
222impl Client {
223 pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
233 let address = compute_address(&content);
234 let data_size = u64::try_from(content.len())
235 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
236
237 let quotes_with_peers = match self
238 .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK)
239 .await
240 {
241 Ok(quotes) => quotes,
242 Err(Error::AlreadyStored) => {
243 debug!("Chunk {} already stored, skipping", hex::encode(address));
244 return Ok(None);
245 }
246 Err(e) => return Err(e),
247 };
248
249 let quoted_peers: Vec<(PeerId, Vec<MultiAddr>)> = quotes_with_peers
251 .iter()
252 .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone()))
253 .collect();
254
255 let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
258 let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
259
260 for (peer_id, _addrs, quote, price) in quotes_with_peers {
261 let encoded = peer_id_to_encoded(&peer_id)?;
262 peer_quotes.push((encoded, quote.clone()));
263 quotes_for_payment.push((quote, price));
264 }
265
266 let payment = SingleNodePayment::from_quotes(quotes_for_payment)
267 .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
268
269 Ok(Some(PreparedChunk {
270 content,
271 address,
272 quoted_peers,
273 payment,
274 peer_quotes,
275 }))
276 }
277
278 pub async fn batch_pay(
290 &self,
291 prepared: Vec<PreparedChunk>,
292 ) -> Result<(Vec<PaidChunk>, String, u128)> {
293 if prepared.is_empty() {
294 return Ok((Vec::new(), "0".to_string(), 0));
295 }
296
297 let wallet = self.require_wallet()?;
298
299 let intent = PaymentIntent::from_prepared_chunks(&prepared);
301 let storage_cost_atto = intent.total_amount.to_string();
302
303 let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
305 let mut all_payments = Vec::with_capacity(total_quotes);
306 for chunk in &prepared {
307 for info in &chunk.payment.quotes {
308 all_payments.push((info.quote_hash, info.rewards_address, info.amount));
309 }
310 }
311
312 debug!(
313 "Batch payment for {} chunks ({} quote entries)",
314 prepared.len(),
315 all_payments.len()
316 );
317
318 let (tx_hash_map, gas_info) =
319 wallet
320 .pay_for_quotes(all_payments)
321 .await
322 .map_err(|PayForQuotesError(err, _)| {
323 Error::Payment(format!("Batch payment failed: {err}"))
324 })?;
325
326 info!(
327 "Batch payment succeeded: {} transactions",
328 tx_hash_map.len()
329 );
330
331 let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
332 let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
333 Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
334 }
335
336 pub async fn batch_upload_chunks(
351 &self,
352 chunks: Vec<Bytes>,
353 ) -> Result<(Vec<XorName>, String, u128)> {
354 let (addresses, storage, gas, _stats) = self
355 .batch_upload_chunks_with_events(chunks, None, 0, 0)
356 .await?;
357 Ok((addresses, storage, gas))
358 }
359
360 pub async fn batch_upload_chunks_with_events(
367 &self,
368 chunks: Vec<Bytes>,
369 progress: Option<&mpsc::Sender<UploadEvent>>,
370 stored_offset: usize,
371 file_total: usize,
372 ) -> Result<(Vec<XorName>, String, u128, WaveAggregateStats)> {
373 if chunks.is_empty() {
374 return Ok((
375 Vec::new(),
376 "0".to_string(),
377 0,
378 WaveAggregateStats::default(),
379 ));
380 }
381
382 let total_chunks = chunks.len();
383 let quote_cap = self.controller().quote.current();
384 let store_cap = self.controller().store.current();
385 debug!(
386 "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
387 (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
388 );
389
390 let mut all_addresses = Vec::with_capacity(total_chunks);
391 let mut seen_addresses: HashSet<XorName> = HashSet::new();
392
393 let mut total_storage = Amount::ZERO;
395 let mut total_gas: u128 = 0;
396 let mut agg_stats = WaveAggregateStats::default();
397
398 let mut unique_chunks = Vec::with_capacity(total_chunks);
400 for chunk in chunks {
401 let address = compute_address(&chunk);
402 if seen_addresses.insert(address) {
403 unique_chunks.push(chunk);
404 } else {
405 debug!("Skipping duplicate chunk {}", hex::encode(address));
406 all_addresses.push(address);
407 if let Some(tx) = progress {
408 let _ = tx.try_send(UploadEvent::ChunkStored {
409 stored: stored_offset + all_addresses.len(),
410 total: file_total,
411 });
412 }
413 }
414 }
415
416 let waves: Vec<Vec<Bytes>> = unique_chunks
418 .chunks(PAYMENT_WAVE_SIZE)
419 .map(<[Bytes]>::to_vec)
420 .collect();
421 let wave_count = waves.len();
422
423 debug!(
424 "{total_chunks} chunks -> {} unique -> {wave_count} waves",
425 seen_addresses.len()
426 );
427
428 let mut pending_store: Option<Vec<PaidChunk>> = None;
429 let mut total_quoted: usize = 0;
430
431 for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
432 let wave_num = wave_idx + 1;
433 let wave_size = wave_chunks.len();
434
435 let (prepare_result, store_result) = match pending_store.take() {
437 Some(paid_chunks) => {
438 let store_offset = stored_offset + all_addresses.len();
439 let quoted_offset = stored_offset + total_quoted;
440 let (prep, stored) = tokio::join!(
441 self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
442 self.store_paid_chunks_with_events(
443 paid_chunks,
444 progress,
445 store_offset,
446 file_total
447 )
448 );
449 (prep, Some(stored))
450 }
451 None => {
452 let quoted_offset = stored_offset + total_quoted;
453 let result = self
454 .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
455 .await;
456 (result, None)
457 }
458 };
459 total_quoted += wave_size;
460
461 if let Some(wave_result) = store_result {
463 all_addresses.extend(&wave_result.stored);
464 agg_stats.absorb(&wave_result);
465 if !wave_result.failed.is_empty() {
466 let failed_count = wave_result.failed.len();
467 warn!("{failed_count} chunks failed to store after retries");
468 return Err(Error::PartialUpload {
469 stored: all_addresses.clone(),
470 stored_count: stored_offset + all_addresses.len(),
471 failed: wave_result.failed,
472 failed_count,
473 total_chunks: file_total,
474 reason: "wave store failed after retries".into(),
475 });
476 }
477 }
478
479 let (prepared_chunks, already_stored) = prepare_result?;
480 all_addresses.extend(&already_stored);
481 if let Some(tx) = progress {
482 for _ in &already_stored {
483 let _ = tx.try_send(UploadEvent::ChunkStored {
484 stored: stored_offset + all_addresses.len(),
485 total: file_total,
486 });
487 }
488 }
489
490 if prepared_chunks.is_empty() {
491 info!("Wave {wave_num}/{wave_count}: all chunks already stored");
492 continue;
493 }
494
495 info!(
496 "Wave {wave_num}/{wave_count}: paying for {} chunks",
497 prepared_chunks.len()
498 );
499 let (paid_chunks, wave_storage, wave_gas) = self.batch_pay(prepared_chunks).await?;
500 if let Ok(cost) = wave_storage.parse::<Amount>() {
501 total_storage += cost;
502 }
503 total_gas = total_gas.saturating_add(wave_gas);
504 pending_store = Some(paid_chunks);
505 }
506
507 if let Some(paid_chunks) = pending_store {
509 let store_offset = stored_offset + all_addresses.len();
510 let wave_result = self
511 .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
512 .await;
513 all_addresses.extend(&wave_result.stored);
514 agg_stats.absorb(&wave_result);
515 if !wave_result.failed.is_empty() {
516 let failed_count = wave_result.failed.len();
517 warn!("{failed_count} chunks failed to store after retries (final wave)");
518 return Err(Error::PartialUpload {
519 stored: all_addresses.clone(),
520 stored_count: stored_offset + all_addresses.len(),
521 failed: wave_result.failed,
522 failed_count,
523 total_chunks: file_total,
524 reason: "final wave store failed after retries".into(),
525 });
526 }
527 }
528
529 debug!("Batch upload complete: {} addresses", all_addresses.len());
530 Ok((
531 all_addresses,
532 total_storage.to_string(),
533 total_gas,
534 agg_stats,
535 ))
536 }
537
538 async fn prepare_wave(
543 &self,
544 chunks: Vec<Bytes>,
545 progress: Option<&mpsc::Sender<UploadEvent>>,
546 quoted_offset: usize,
547 file_total: usize,
548 ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
549 let chunk_count = chunks.len();
550 let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
551 .into_iter()
552 .map(|c| {
553 let addr = compute_address(&c);
554 (c, addr)
555 })
556 .collect();
557
558 let quote_limiter = self.controller().quote.clone();
559 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
564 let mut quote_stream = stream::iter(chunks_with_addr)
565 .map(|(content, address)| {
566 let limiter = quote_limiter.clone();
567 async move {
568 let result = observe_op(
569 &limiter,
570 || async move { self.prepare_chunk_payment(content).await },
571 classify_error,
572 )
573 .await;
574 (address, result)
575 }
576 })
577 .buffer_unordered(quote_concurrency);
578
579 let mut prepared = Vec::with_capacity(chunk_count);
580 let mut already_stored = Vec::new();
581 let mut quoted_count = 0usize;
582
583 while let Some((address, result)) = quote_stream.next().await {
584 let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
585 match result? {
586 Some(chunk) => prepared.push(chunk),
587 None => already_stored.push(address),
588 }
589 quoted_count += 1;
590 let progress_num = quoted_offset + quoted_count;
591 if file_total > 0 {
592 if chunk_already_stored {
593 info!("Verified {progress_num}/{file_total} (already stored)");
594 } else {
595 info!("Quoted {progress_num}/{file_total}");
596 }
597 }
598 if let Some(tx) = progress {
599 let _ = tx.try_send(UploadEvent::ChunkQuoted {
600 quoted: progress_num,
601 total: file_total,
602 });
603 }
604 }
605
606 Ok((prepared, already_stored))
607 }
608
609 pub(crate) async fn store_paid_chunks_with_events(
621 &self,
622 paid_chunks: Vec<PaidChunk>,
623 progress: Option<&mpsc::Sender<UploadEvent>>,
624 stored_before: usize,
625 total_chunks: usize,
626 ) -> WaveResult {
627 const MAX_RETRIES: u32 = 3;
628 const BASE_DELAY_MS: u64 = 500;
629
630 let mut stored = Vec::new();
631 let mut to_retry = paid_chunks;
632
633 let mut first_seen: HashMap<XorName, Instant> = HashMap::with_capacity(to_retry.len());
637 for chunk in &to_retry {
638 first_seen.entry(chunk.address).or_insert_with(Instant::now);
639 }
640
641 let mut chunk_attempts_total: usize = 0;
642 let mut store_durations_ms: Vec<u64> = Vec::new();
643 let mut retries_per_chunk: Vec<u32> = Vec::new();
644
645 for attempt in 0..=MAX_RETRIES {
646 if attempt > 0 {
647 let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
648 tokio::time::sleep(delay).await;
649 info!(
650 "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
651 to_retry.len()
652 );
653 }
654
655 chunk_attempts_total = chunk_attempts_total.saturating_add(to_retry.len());
657
658 let store_limiter = self.controller().store.clone();
659 let store_concurrency = store_limiter.current().min(to_retry.len().max(1));
660 let mut upload_stream = stream::iter(to_retry)
661 .map(|chunk| {
662 let chunk_clone = chunk.clone();
663 let limiter = store_limiter.clone();
664 async move {
665 let result = observe_op(
666 &limiter,
667 || async move {
668 self.chunk_put_to_close_group(
669 chunk.content,
670 chunk.proof_bytes,
671 &chunk.quoted_peers,
672 )
673 .await
674 },
675 classify_error,
676 )
677 .await;
678 (chunk_clone, result)
679 }
680 })
681 .buffer_unordered(store_concurrency);
682
683 let mut failed_this_round = Vec::new();
684 while let Some((chunk, result)) = upload_stream.next().await {
685 match result {
686 Ok(name) => {
687 let duration_ms = first_seen
688 .get(&chunk.address)
689 .map(|t| u64::try_from(t.elapsed().as_millis()).unwrap_or(u64::MAX))
690 .unwrap_or(0);
691 store_durations_ms.push(duration_ms);
692 retries_per_chunk.push(attempt);
693 stored.push(name);
694 let stored_num = stored_before + stored.len();
695 if total_chunks > 0 {
696 info!("Stored {stored_num}/{total_chunks}");
697 }
698 if let Some(tx) = progress {
699 let _ = tx.try_send(UploadEvent::ChunkStored {
700 stored: stored_num,
701 total: total_chunks,
702 });
703 }
704 }
705 Err(e) => failed_this_round.push((chunk, e.to_string())),
706 }
707 }
708
709 if failed_this_round.is_empty() {
710 let result = WaveResult {
711 stored,
712 failed: Vec::new(),
713 chunk_attempts_total,
714 store_durations_ms,
715 retries_per_chunk,
716 };
717 log_wave_summary(&result);
718 return result;
719 }
720
721 if attempt == MAX_RETRIES {
722 let failed = failed_this_round
723 .into_iter()
724 .map(|(c, e)| (c.address, e))
725 .collect();
726 let result = WaveResult {
727 stored,
728 failed,
729 chunk_attempts_total,
730 store_durations_ms,
731 retries_per_chunk,
732 };
733 log_wave_summary(&result);
734 return result;
735 }
736
737 warn!(
738 "{} chunks failed on attempt {}, will retry",
739 failed_this_round.len(),
740 attempt + 1
741 );
742 to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
743 }
744
745 let result = WaveResult {
747 stored,
748 failed: Vec::new(),
749 chunk_attempts_total,
750 store_durations_ms,
751 retries_per_chunk,
752 };
753 log_wave_summary(&result);
754 result
755 }
756}
757
758fn log_wave_summary(result: &WaveResult) {
764 let retries_round_1 = result.retries_per_chunk.iter().filter(|&&r| r == 1).count();
765 let retries_round_2 = result.retries_per_chunk.iter().filter(|&&r| r == 2).count();
766 let retries_round_3 = result.retries_per_chunk.iter().filter(|&&r| r == 3).count();
767 let chunk_attempts_total = result.chunk_attempts_total;
768 info!(
769 chunks_stored = result.stored.len(),
770 chunks_failed = result.failed.len(),
771 chunk_attempts_total,
772 retries_round_1,
773 retries_round_2,
774 retries_round_3,
775 store_duration_p50_ms = percentile(&result.store_durations_ms, 0.50),
776 store_duration_p95_ms = percentile(&result.store_durations_ms, 0.95),
777 store_duration_max_ms = result.store_durations_ms.iter().max().copied().unwrap_or(0),
778 "chunk_store_wave_complete"
779 );
780}
781
782#[cfg(test)]
784mod send_assertions {
785 use super::*;
786
787 fn _assert_send<T: Send>(_: &T) {}
788
789 #[allow(dead_code)]
790 async fn _batch_upload_is_send(client: &Client) {
791 let fut = client.batch_upload_chunks(Vec::new());
792 _assert_send(&fut);
793 }
794}
795
796#[cfg(test)]
797#[allow(clippy::unwrap_used)]
798mod tests {
799 use super::*;
800 use ant_protocol::payment::QuotePaymentInfo;
801 use ant_protocol::CLOSE_GROUP_SIZE;
802
803 const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
805
806 fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
810 let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
811 let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
812 QuotePaymentInfo {
813 quote_hash: QuoteHash::from([i as u8 + 1; 32]),
814 rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
815 amount: Amount::from(amount),
816 price: Amount::from(amount),
817 }
818 });
819
820 PreparedChunk {
821 content: Bytes::from(vec![0xAA; 32]),
822 address: [0u8; 32],
823 quoted_peers: Vec::new(),
824 payment: SingleNodePayment { quotes },
825 peer_quotes: Vec::new(),
826 }
827 }
828
829 #[test]
830 fn payment_intent_from_single_chunk() {
831 let chunk = make_prepared_chunk(300);
832 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
833
834 assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
835 assert_eq!(intent.total_amount, Amount::from(300));
836
837 let (hash, addr, amt) = &intent.payments[0];
838 assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
839 assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
840 assert_eq!(*amt, Amount::from(300));
841 }
842
843 #[test]
844 fn payment_intent_from_multiple_chunks() {
845 let c1 = make_prepared_chunk(100);
846 let c2 = make_prepared_chunk(250);
847 let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
848
849 assert_eq!(intent.payments.len(), 2);
850 assert_eq!(intent.total_amount, Amount::from(350));
851 }
852
853 #[test]
854 fn payment_intent_skips_all_zero_chunks() {
855 let chunk = make_prepared_chunk(0);
856 let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
857
858 assert!(intent.payments.is_empty());
859 assert_eq!(intent.total_amount, Amount::ZERO);
860 }
861
862 #[test]
863 fn payment_intent_empty_input() {
864 let intent = PaymentIntent::from_prepared_chunks(&[]);
865 assert!(intent.payments.is_empty());
866 assert_eq!(intent.total_amount, Amount::ZERO);
867 }
868
869 #[test]
870 fn finalize_batch_payment_builds_proofs() {
871 let chunk = make_prepared_chunk(500);
872 let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
873
874 let mut tx_map = HashMap::new();
875 tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
876
877 let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
878
879 assert_eq!(paid.len(), 1);
880 assert!(!paid[0].proof_bytes.is_empty());
881 assert_eq!(paid[0].address, [0u8; 32]);
882 }
883
884 #[test]
885 fn finalize_batch_payment_empty_input() {
886 let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
887 assert!(paid.is_empty());
888 }
889
890 #[test]
891 fn finalize_batch_payment_missing_tx_hash_errors() {
892 let chunk = make_prepared_chunk(500);
895
896 let result = finalize_batch_payment(vec![chunk], &HashMap::new());
897 assert!(result.is_err());
898 let err = result.unwrap_err().to_string();
899 assert!(err.contains("Missing tx hash"), "got: {err}");
900 }
901
902 #[test]
903 fn finalize_batch_payment_multiple_chunks() {
904 let c1 = make_prepared_chunk(100);
905 let c2 = make_prepared_chunk(200);
906 let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
907 let mut tx_map = HashMap::new();
908 tx_map.insert(q1, TxHash::from([0xCC; 32]));
911
912 let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
913 assert_eq!(paid.len(), 2);
914 }
915}