Skip to main content

ant_core/data/client/
batch.rs

1//! Batch chunk upload with wave-based pipelined EVM payments.
2//!
3//! Groups chunks into waves of 64 and pays for each
4//! wave in a single EVM transaction. Stores from wave N are pipelined
5//! with quote collection for wave N+1 via `tokio::join!`.
6
7use crate::data::client::file::UploadEvent;
8use crate::data::client::payment::peer_id_to_encoded;
9use crate::data::client::Client;
10use crate::data::error::{Error, Result};
11use ant_protocol::evm::{
12    Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
13    RewardsAddress, TxHash,
14};
15use ant_protocol::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment};
16use ant_protocol::transport::{MultiAddr, PeerId};
17use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
18use bytes::Bytes;
19use futures::stream::{self, StreamExt};
20use std::collections::{HashMap, HashSet};
21use std::time::Duration;
22use tokio::sync::mpsc;
23use tracing::{debug, info, warn};
24
25/// Number of chunks per payment wave.
26const PAYMENT_WAVE_SIZE: usize = 64;
27
28/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`].
29#[derive(Debug)]
30pub struct PreparedChunk {
31    /// The chunk content bytes.
32    pub content: Bytes,
33    /// Content address (BLAKE3 hash).
34    pub address: XorName,
35    /// Closest peers from quote collection — PUT targets for close-group replication.
36    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
37    /// Payment structure (quotes sorted, median selected, not yet paid on-chain).
38    pub payment: SingleNodePayment,
39    /// Peer quotes for building `ProofOfPayment`.
40    pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
41}
42
43/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`].
44#[derive(Debug, Clone)]
45pub struct PaidChunk {
46    /// The chunk content bytes.
47    pub content: Bytes,
48    /// Content address (BLAKE3 hash).
49    pub address: XorName,
50    /// Closest peers from quote collection — PUT targets for close-group replication.
51    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
52    /// Serialized [`PaymentProof`] bytes.
53    pub proof_bytes: Vec<u8>,
54}
55
56/// Result of storing a wave of paid chunks, with retry tracking.
57#[derive(Debug)]
58pub struct WaveResult {
59    /// Successfully stored chunk addresses.
60    pub stored: Vec<XorName>,
61    /// Chunks that failed to store after all retries.
62    pub failed: Vec<(XorName, String)>,
63}
64
65/// Payment data for external signing.
66///
67/// Contains the information needed to construct and submit the on-chain
68/// payment transaction without requiring a local wallet or private key.
69#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
70pub struct PaymentIntent {
71    /// Individual payment entries: (quote_hash, rewards_address, amount).
72    pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
73    /// Total amount across all payments.
74    pub total_amount: Amount,
75}
76
77impl PaymentIntent {
78    /// Build from a set of prepared chunks.
79    ///
80    /// Collects all non-zero payment entries and computes the total.
81    pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
82        let mut payments = Vec::new();
83        let mut total = Amount::ZERO;
84        for chunk in prepared {
85            for info in &chunk.payment.quotes {
86                if !info.amount.is_zero() {
87                    payments.push((info.quote_hash, info.rewards_address, info.amount));
88                    total += info.amount;
89                }
90            }
91        }
92        Self {
93            payments,
94            total_amount: total,
95        }
96    }
97}
98
99/// Build [`PaidChunk`]s from prepared chunks and externally-provided transaction hashes.
100///
101/// Shared by [`Client::batch_pay`] (wallet flow) and [`finalize_batch_payment`] (external signer).
102///
103/// Returns an error if any non-zero-amount quote hash is missing from `tx_hash_map`,
104/// since chunks uploaded without valid proofs would be rejected by the network.
105fn build_paid_chunks(
106    prepared: Vec<PreparedChunk>,
107    tx_hash_map: &HashMap<QuoteHash, TxHash>,
108) -> Result<Vec<PaidChunk>> {
109    let mut paid_chunks = Vec::with_capacity(prepared.len());
110    for chunk in prepared {
111        let mut tx_hashes = Vec::new();
112        for info in &chunk.payment.quotes {
113            if !info.amount.is_zero() {
114                let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
115                    Error::Payment(format!(
116                        "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
117                        hex::encode(info.quote_hash)
118                    ))
119                })?;
120                tx_hashes.push(tx_hash);
121            }
122        }
123
124        let proof = PaymentProof {
125            proof_of_payment: ProofOfPayment {
126                peer_quotes: chunk.peer_quotes,
127            },
128            tx_hashes,
129        };
130
131        let proof_bytes = serialize_single_node_proof(&proof)
132            .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
133
134        paid_chunks.push(PaidChunk {
135            content: chunk.content,
136            address: chunk.address,
137            quoted_peers: chunk.quoted_peers,
138            proof_bytes,
139        });
140    }
141    Ok(paid_chunks)
142}
143
144/// Finalize a batch payment using externally-provided transaction hashes.
145///
146/// Takes prepared chunks and a map of `quote_hash -> tx_hash` from the
147/// external signer. Builds per-chunk `PaymentProof` bytes without needing a wallet.
148pub fn finalize_batch_payment(
149    prepared: Vec<PreparedChunk>,
150    tx_hash_map: &HashMap<QuoteHash, TxHash>,
151) -> Result<Vec<PaidChunk>> {
152    build_paid_chunks(prepared, tx_hash_map)
153}
154
155impl Client {
156    /// Prepare a single chunk for batch payment.
157    ///
158    /// Collects quotes and uses node-reported prices without making any
159    /// on-chain transaction. Returns `Ok(None)` if the chunk is already
160    /// stored on the network.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if quote collection or payment construction fails.
165    pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
166        let address = compute_address(&content);
167        let data_size = u64::try_from(content.len())
168            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
169
170        let quotes_with_peers = match self
171            .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK)
172            .await
173        {
174            Ok(quotes) => quotes,
175            Err(Error::AlreadyStored) => {
176                debug!("Chunk {} already stored, skipping", hex::encode(address));
177                return Ok(None);
178            }
179            Err(e) => return Err(e),
180        };
181
182        // Capture all quoted peers for close-group replication.
183        let quoted_peers: Vec<(PeerId, Vec<MultiAddr>)> = quotes_with_peers
184            .iter()
185            .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone()))
186            .collect();
187
188        // Build peer_quotes for ProofOfPayment + quotes for SingleNodePayment.
189        // Use node-reported prices directly — no contract price fetch needed.
190        let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
191        let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
192
193        for (peer_id, _addrs, quote, price) in quotes_with_peers {
194            let encoded = peer_id_to_encoded(&peer_id)?;
195            peer_quotes.push((encoded, quote.clone()));
196            quotes_for_payment.push((quote, price));
197        }
198
199        let payment = SingleNodePayment::from_quotes(quotes_for_payment)
200            .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
201
202        Ok(Some(PreparedChunk {
203            content,
204            address,
205            quoted_peers,
206            payment,
207            peer_quotes,
208        }))
209    }
210
211    /// Pay for multiple chunks in a single EVM transaction.
212    ///
213    /// Flattens all quote payments from the prepared chunks into one
214    /// `wallet.pay_for_quotes()` call, then maps transaction hashes
215    /// back to per-chunk [`PaymentProof`] bytes.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if the wallet is not configured or the on-chain
220    /// payment fails.
221    /// Returns `(paid_chunks, storage_cost_atto, gas_cost_wei)`.
222    pub async fn batch_pay(
223        &self,
224        prepared: Vec<PreparedChunk>,
225    ) -> Result<(Vec<PaidChunk>, String, u128)> {
226        if prepared.is_empty() {
227            return Ok((Vec::new(), "0".to_string(), 0));
228        }
229
230        let wallet = self.require_wallet()?;
231
232        // Compute total storage cost from the prepared chunks before paying.
233        let intent = PaymentIntent::from_prepared_chunks(&prepared);
234        let storage_cost_atto = intent.total_amount.to_string();
235
236        // Flatten all quote payments from all chunks into a single batch.
237        let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
238        let mut all_payments = Vec::with_capacity(total_quotes);
239        for chunk in &prepared {
240            for info in &chunk.payment.quotes {
241                all_payments.push((info.quote_hash, info.rewards_address, info.amount));
242            }
243        }
244
245        debug!(
246            "Batch payment for {} chunks ({} quote entries)",
247            prepared.len(),
248            all_payments.len()
249        );
250
251        let (tx_hash_map, gas_info) =
252            wallet
253                .pay_for_quotes(all_payments)
254                .await
255                .map_err(|PayForQuotesError(err, _)| {
256                    Error::Payment(format!("Batch payment failed: {err}"))
257                })?;
258
259        info!(
260            "Batch payment succeeded: {} transactions",
261            tx_hash_map.len()
262        );
263
264        let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
265        let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
266        Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
267    }
268
269    /// Upload chunks in waves with pipelined EVM payments.
270    ///
271    /// Processes chunks in waves of `PAYMENT_WAVE_SIZE` (64). Within each wave:
272    /// 1. **Prepare**: collect quotes for all chunks concurrently
273    /// 2. **Pay**: single EVM transaction for the whole wave
274    /// 3. **Store**: concurrent chunk replication to close group
275    ///
276    /// Stores from wave N overlap with quote collection for wave N+1
277    /// via `tokio::join!`.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if any payment or store operation fails.
282    /// Returns `(addresses, total_storage_cost_atto, total_gas_cost_wei)`.
283    pub async fn batch_upload_chunks(
284        &self,
285        chunks: Vec<Bytes>,
286    ) -> Result<(Vec<XorName>, String, u128)> {
287        self.batch_upload_chunks_with_events(chunks, None, 0, 0)
288            .await
289    }
290
291    /// Same as [`Client::batch_upload_chunks`] but sends [`UploadEvent::ChunkStored`]
292    /// events as each chunk is stored, enabling per-chunk progress bars.
293    ///
294    /// `stored_offset` is the number of chunks already stored in previous waves
295    /// (so events report cumulative progress). `file_total` is the total chunk
296    /// count across ALL waves (for the `total` field in events).
297    pub async fn batch_upload_chunks_with_events(
298        &self,
299        chunks: Vec<Bytes>,
300        progress: Option<&mpsc::Sender<UploadEvent>>,
301        stored_offset: usize,
302        file_total: usize,
303    ) -> Result<(Vec<XorName>, String, u128)> {
304        if chunks.is_empty() {
305            return Ok((Vec::new(), "0".to_string(), 0));
306        }
307
308        let total_chunks = chunks.len();
309        let quote_concurrency = self.config().quote_concurrency;
310        let store_concurrency = self.config().store_concurrency;
311        debug!("Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} (quote_concurrency: {quote_concurrency}, store_concurrency: {store_concurrency})");
312
313        let mut all_addresses = Vec::with_capacity(total_chunks);
314        let mut seen_addresses: HashSet<XorName> = HashSet::new();
315
316        // Accumulate costs across waves.
317        let mut total_storage = Amount::ZERO;
318        let mut total_gas: u128 = 0;
319
320        // Deduplicate chunks by content address.
321        let mut unique_chunks = Vec::with_capacity(total_chunks);
322        for chunk in chunks {
323            let address = compute_address(&chunk);
324            if seen_addresses.insert(address) {
325                unique_chunks.push(chunk);
326            } else {
327                debug!("Skipping duplicate chunk {}", hex::encode(address));
328                all_addresses.push(address);
329                if let Some(tx) = progress {
330                    let _ = tx.try_send(UploadEvent::ChunkStored {
331                        stored: stored_offset + all_addresses.len(),
332                        total: file_total,
333                    });
334                }
335            }
336        }
337
338        // Split into waves.
339        let waves: Vec<Vec<Bytes>> = unique_chunks
340            .chunks(PAYMENT_WAVE_SIZE)
341            .map(<[Bytes]>::to_vec)
342            .collect();
343        let wave_count = waves.len();
344
345        debug!(
346            "{total_chunks} chunks -> {} unique -> {wave_count} waves",
347            seen_addresses.len()
348        );
349
350        let mut pending_store: Option<Vec<PaidChunk>> = None;
351        let mut total_quoted: usize = 0;
352
353        for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
354            let wave_num = wave_idx + 1;
355            let wave_size = wave_chunks.len();
356
357            // Pipeline: store previous wave while preparing this one.
358            let (prepare_result, store_result) = match pending_store.take() {
359                Some(paid_chunks) => {
360                    let store_offset = stored_offset + all_addresses.len();
361                    let quoted_offset = stored_offset + total_quoted;
362                    let (prep, stored) = tokio::join!(
363                        self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
364                        self.store_paid_chunks_with_events(
365                            paid_chunks,
366                            progress,
367                            store_offset,
368                            file_total
369                        )
370                    );
371                    (prep, Some(stored))
372                }
373                None => {
374                    let quoted_offset = stored_offset + total_quoted;
375                    let result = self
376                        .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
377                        .await;
378                    (result, None)
379                }
380            };
381            total_quoted += wave_size;
382
383            // Track partial progress from previous wave.
384            if let Some(wave_result) = store_result {
385                all_addresses.extend(&wave_result.stored);
386                if !wave_result.failed.is_empty() {
387                    let failed_count = wave_result.failed.len();
388                    warn!("{failed_count} chunks failed to store after retries");
389                    return Err(Error::PartialUpload {
390                        stored: all_addresses.clone(),
391                        stored_count: stored_offset + all_addresses.len(),
392                        failed: wave_result.failed,
393                        failed_count,
394                        total_chunks: file_total,
395                        reason: "wave store failed after retries".into(),
396                    });
397                }
398            }
399
400            let (prepared_chunks, already_stored) = prepare_result?;
401            all_addresses.extend(&already_stored);
402            if let Some(tx) = progress {
403                for _ in &already_stored {
404                    let _ = tx.try_send(UploadEvent::ChunkStored {
405                        stored: stored_offset + all_addresses.len(),
406                        total: file_total,
407                    });
408                }
409            }
410
411            if prepared_chunks.is_empty() {
412                info!("Wave {wave_num}/{wave_count}: all chunks already stored");
413                continue;
414            }
415
416            info!(
417                "Wave {wave_num}/{wave_count}: paying for {} chunks",
418                prepared_chunks.len()
419            );
420            let (paid_chunks, wave_storage, wave_gas) = self.batch_pay(prepared_chunks).await?;
421            if let Ok(cost) = wave_storage.parse::<Amount>() {
422                total_storage += cost;
423            }
424            total_gas = total_gas.saturating_add(wave_gas);
425            pending_store = Some(paid_chunks);
426        }
427
428        // Store the last wave.
429        if let Some(paid_chunks) = pending_store {
430            let store_offset = stored_offset + all_addresses.len();
431            let wave_result = self
432                .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
433                .await;
434            all_addresses.extend(&wave_result.stored);
435            if !wave_result.failed.is_empty() {
436                let failed_count = wave_result.failed.len();
437                warn!("{failed_count} chunks failed to store after retries (final wave)");
438                return Err(Error::PartialUpload {
439                    stored: all_addresses.clone(),
440                    stored_count: stored_offset + all_addresses.len(),
441                    failed: wave_result.failed,
442                    failed_count,
443                    total_chunks: file_total,
444                    reason: "final wave store failed after retries".into(),
445                });
446            }
447        }
448
449        debug!("Batch upload complete: {} addresses", all_addresses.len());
450        Ok((all_addresses, total_storage.to_string(), total_gas))
451    }
452
453    /// Prepare a wave of chunks by collecting quotes concurrently.
454    ///
455    /// Fires [`UploadEvent::ChunkQuoted`] as each chunk's quote completes.
456    /// Returns `(prepared_chunks, already_stored_addresses)`.
457    async fn prepare_wave(
458        &self,
459        chunks: Vec<Bytes>,
460        progress: Option<&mpsc::Sender<UploadEvent>>,
461        quoted_offset: usize,
462        file_total: usize,
463    ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
464        let chunk_count = chunks.len();
465        let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
466            .into_iter()
467            .map(|c| {
468                let addr = compute_address(&c);
469                (c, addr)
470            })
471            .collect();
472
473        let mut quote_stream = stream::iter(chunks_with_addr)
474            .map(|(content, address)| async move {
475                (address, self.prepare_chunk_payment(content).await)
476            })
477            .buffer_unordered(self.config().quote_concurrency);
478
479        let mut prepared = Vec::with_capacity(chunk_count);
480        let mut already_stored = Vec::new();
481        let mut quoted_count = 0usize;
482
483        while let Some((address, result)) = quote_stream.next().await {
484            let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
485            match result? {
486                Some(chunk) => prepared.push(chunk),
487                None => already_stored.push(address),
488            }
489            quoted_count += 1;
490            let progress_num = quoted_offset + quoted_count;
491            if file_total > 0 {
492                if chunk_already_stored {
493                    info!("Verified {progress_num}/{file_total} (already stored)");
494                } else {
495                    info!("Quoted {progress_num}/{file_total}");
496                }
497            }
498            if let Some(tx) = progress {
499                let _ = tx.try_send(UploadEvent::ChunkQuoted {
500                    quoted: progress_num,
501                    total: file_total,
502                });
503            }
504        }
505
506        Ok((prepared, already_stored))
507    }
508
509    /// Store a batch of paid chunks concurrently to their close groups.
510    ///
511    /// Retries failed chunks up to 3 times with exponential backoff (500ms, 1s, 2s).
512    /// Returns a [`WaveResult`] with both successes and failures so callers can
513    /// track partial progress instead of losing information about stored chunks.
514    pub(crate) async fn store_paid_chunks(&self, paid_chunks: Vec<PaidChunk>) -> WaveResult {
515        self.store_paid_chunks_with_events(paid_chunks, None, 0, 0)
516            .await
517    }
518
519    /// Same as [`Client::store_paid_chunks`] but sends [`UploadEvent::ChunkStored`]
520    /// as each chunk is successfully stored.
521    ///
522    /// `stored_before` is the count of chunks already stored in previous waves,
523    /// so the event reports an accurate cumulative total. `total_chunks` is the
524    /// total across all waves.
525    pub(crate) async fn store_paid_chunks_with_events(
526        &self,
527        paid_chunks: Vec<PaidChunk>,
528        progress: Option<&mpsc::Sender<UploadEvent>>,
529        stored_before: usize,
530        total_chunks: usize,
531    ) -> WaveResult {
532        const MAX_RETRIES: u32 = 3;
533        const BASE_DELAY_MS: u64 = 500;
534
535        let mut stored = Vec::new();
536        let mut to_retry = paid_chunks;
537
538        for attempt in 0..=MAX_RETRIES {
539            if attempt > 0 {
540                let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
541                tokio::time::sleep(delay).await;
542                info!(
543                    "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
544                    to_retry.len()
545                );
546            }
547
548            let mut upload_stream = stream::iter(to_retry)
549                .map(|chunk| {
550                    let chunk_clone = chunk.clone();
551                    async move {
552                        let result = self
553                            .chunk_put_to_close_group(
554                                chunk.content,
555                                chunk.proof_bytes,
556                                &chunk.quoted_peers,
557                            )
558                            .await;
559                        (chunk_clone, result)
560                    }
561                })
562                .buffer_unordered(self.config().store_concurrency);
563
564            let mut failed_this_round = Vec::new();
565            while let Some((chunk, result)) = upload_stream.next().await {
566                match result {
567                    Ok(name) => {
568                        stored.push(name);
569                        let stored_num = stored_before + stored.len();
570                        if total_chunks > 0 {
571                            info!("Stored {stored_num}/{total_chunks}");
572                        }
573                        if let Some(tx) = progress {
574                            let _ = tx.try_send(UploadEvent::ChunkStored {
575                                stored: stored_num,
576                                total: total_chunks,
577                            });
578                        }
579                    }
580                    Err(e) => failed_this_round.push((chunk, e.to_string())),
581                }
582            }
583
584            if failed_this_round.is_empty() {
585                return WaveResult {
586                    stored,
587                    failed: Vec::new(),
588                };
589            }
590
591            if attempt == MAX_RETRIES {
592                let failed = failed_this_round
593                    .into_iter()
594                    .map(|(c, e)| (c.address, e))
595                    .collect();
596                return WaveResult { stored, failed };
597            }
598
599            warn!(
600                "{} chunks failed on attempt {}, will retry",
601                failed_this_round.len(),
602                attempt + 1
603            );
604            to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
605        }
606
607        // Unreachable due to loop structure, but satisfy the compiler.
608        WaveResult {
609            stored,
610            failed: Vec::new(),
611        }
612    }
613}
614
615/// Compile-time assertions that batch method futures are Send.
616#[cfg(test)]
617mod send_assertions {
618    use super::*;
619
620    fn _assert_send<T: Send>(_: &T) {}
621
622    #[allow(dead_code)]
623    async fn _batch_upload_is_send(client: &Client) {
624        let fut = client.batch_upload_chunks(Vec::new());
625        _assert_send(&fut);
626    }
627}
628
629#[cfg(test)]
630#[allow(clippy::unwrap_used)]
631mod tests {
632    use super::*;
633    use ant_protocol::payment::QuotePaymentInfo;
634    use ant_protocol::CLOSE_GROUP_SIZE;
635
636    /// Median index in the quotes array.
637    const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
638
639    /// Helper: build a `PreparedChunk` with `median_amount` at the median
640    /// quote index and zero for all other quotes. Adapts automatically to
641    /// `CLOSE_GROUP_SIZE` changes.
642    fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
643        let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
644            let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
645            QuotePaymentInfo {
646                quote_hash: QuoteHash::from([i as u8 + 1; 32]),
647                rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
648                amount: Amount::from(amount),
649                price: Amount::from(amount),
650            }
651        });
652
653        PreparedChunk {
654            content: Bytes::from(vec![0xAA; 32]),
655            address: [0u8; 32],
656            quoted_peers: Vec::new(),
657            payment: SingleNodePayment { quotes },
658            peer_quotes: Vec::new(),
659        }
660    }
661
662    #[test]
663    fn payment_intent_from_single_chunk() {
664        let chunk = make_prepared_chunk(300);
665        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
666
667        assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
668        assert_eq!(intent.total_amount, Amount::from(300));
669
670        let (hash, addr, amt) = &intent.payments[0];
671        assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
672        assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
673        assert_eq!(*amt, Amount::from(300));
674    }
675
676    #[test]
677    fn payment_intent_from_multiple_chunks() {
678        let c1 = make_prepared_chunk(100);
679        let c2 = make_prepared_chunk(250);
680        let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
681
682        assert_eq!(intent.payments.len(), 2);
683        assert_eq!(intent.total_amount, Amount::from(350));
684    }
685
686    #[test]
687    fn payment_intent_skips_all_zero_chunks() {
688        let chunk = make_prepared_chunk(0);
689        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
690
691        assert!(intent.payments.is_empty());
692        assert_eq!(intent.total_amount, Amount::ZERO);
693    }
694
695    #[test]
696    fn payment_intent_empty_input() {
697        let intent = PaymentIntent::from_prepared_chunks(&[]);
698        assert!(intent.payments.is_empty());
699        assert_eq!(intent.total_amount, Amount::ZERO);
700    }
701
702    #[test]
703    fn finalize_batch_payment_builds_proofs() {
704        let chunk = make_prepared_chunk(500);
705        let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
706
707        let mut tx_map = HashMap::new();
708        tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
709
710        let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
711
712        assert_eq!(paid.len(), 1);
713        assert!(!paid[0].proof_bytes.is_empty());
714        assert_eq!(paid[0].address, [0u8; 32]);
715    }
716
717    #[test]
718    fn finalize_batch_payment_empty_input() {
719        let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
720        assert!(paid.is_empty());
721    }
722
723    #[test]
724    fn finalize_batch_payment_missing_tx_hash_errors() {
725        // Missing tx hash for a non-zero-amount quote should error,
726        // since the chunk would be rejected by the network without a valid proof.
727        let chunk = make_prepared_chunk(500);
728
729        let result = finalize_batch_payment(vec![chunk], &HashMap::new());
730        assert!(result.is_err());
731        let err = result.unwrap_err().to_string();
732        assert!(err.contains("Missing tx hash"), "got: {err}");
733    }
734
735    #[test]
736    fn finalize_batch_payment_multiple_chunks() {
737        let c1 = make_prepared_chunk(100);
738        let c2 = make_prepared_chunk(200);
739        let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
740        let mut tx_map = HashMap::new();
741        // Both chunks have the same quote_hash (same index/byte pattern)
742        // so one tx_hash covers both
743        tx_map.insert(q1, TxHash::from([0xCC; 32]));
744
745        let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
746        assert_eq!(paid.len(), 2);
747    }
748}