Skip to main content

ant_core/data/client/
batch.rs

1//! Batch chunk upload with wave-based pipelined EVM payments.
2//!
3//! Groups chunks into waves of 64 and pays for each
4//! wave in a single EVM transaction. Stores from wave N are pipelined
5//! with quote collection for wave N+1 via `tokio::join!`.
6
7use crate::data::client::adaptive::observe_op;
8use crate::data::client::classify_error;
9use crate::data::client::file::UploadEvent;
10use crate::data::client::payment::peer_id_to_encoded;
11use crate::data::client::Client;
12use crate::data::error::{Error, Result};
13use ant_protocol::evm::{
14    Amount, EncodedPeerId, PayForQuotesError, PaymentQuote, ProofOfPayment, QuoteHash,
15    RewardsAddress, TxHash,
16};
17use ant_protocol::payment::{serialize_single_node_proof, PaymentProof, SingleNodePayment};
18use ant_protocol::transport::{MultiAddr, PeerId};
19use ant_protocol::{compute_address, XorName, DATA_TYPE_CHUNK};
20use bytes::Bytes;
21use futures::stream::{self, StreamExt};
22use std::collections::{HashMap, HashSet};
23use std::time::Duration;
24use tokio::sync::mpsc;
25use tracing::{debug, info, warn};
26
27/// Number of chunks per payment wave.
28const PAYMENT_WAVE_SIZE: usize = 64;
29
30/// Chunk quoted but not yet paid. Produced by [`Client::prepare_chunk_payment`].
31#[derive(Debug)]
32pub struct PreparedChunk {
33    /// The chunk content bytes.
34    pub content: Bytes,
35    /// Content address (BLAKE3 hash).
36    pub address: XorName,
37    /// Closest peers from quote collection — PUT targets for close-group replication.
38    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
39    /// Payment structure (quotes sorted, median selected, not yet paid on-chain).
40    pub payment: SingleNodePayment,
41    /// Peer quotes for building `ProofOfPayment`.
42    pub peer_quotes: Vec<(EncodedPeerId, PaymentQuote)>,
43}
44
45/// Chunk paid but not yet stored. Produced by [`Client::batch_pay`].
46#[derive(Debug, Clone)]
47pub struct PaidChunk {
48    /// The chunk content bytes.
49    pub content: Bytes,
50    /// Content address (BLAKE3 hash).
51    pub address: XorName,
52    /// Closest peers from quote collection — PUT targets for close-group replication.
53    pub quoted_peers: Vec<(PeerId, Vec<MultiAddr>)>,
54    /// Serialized [`PaymentProof`] bytes.
55    pub proof_bytes: Vec<u8>,
56}
57
58/// Result of storing a wave of paid chunks, with retry tracking.
59#[derive(Debug)]
60pub struct WaveResult {
61    /// Successfully stored chunk addresses.
62    pub stored: Vec<XorName>,
63    /// Chunks that failed to store after all retries.
64    pub failed: Vec<(XorName, String)>,
65}
66
67/// Payment data for external signing.
68///
69/// Contains the information needed to construct and submit the on-chain
70/// payment transaction without requiring a local wallet or private key.
71#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
72pub struct PaymentIntent {
73    /// Individual payment entries: (quote_hash, rewards_address, amount).
74    pub payments: Vec<(QuoteHash, RewardsAddress, Amount)>,
75    /// Total amount across all payments.
76    pub total_amount: Amount,
77}
78
79impl PaymentIntent {
80    /// Build from a set of prepared chunks.
81    ///
82    /// Collects all non-zero payment entries and computes the total.
83    pub fn from_prepared_chunks(prepared: &[PreparedChunk]) -> Self {
84        let mut payments = Vec::new();
85        let mut total = Amount::ZERO;
86        for chunk in prepared {
87            for info in &chunk.payment.quotes {
88                if !info.amount.is_zero() {
89                    payments.push((info.quote_hash, info.rewards_address, info.amount));
90                    total += info.amount;
91                }
92            }
93        }
94        Self {
95            payments,
96            total_amount: total,
97        }
98    }
99}
100
101/// Build [`PaidChunk`]s from prepared chunks and externally-provided transaction hashes.
102///
103/// Shared by [`Client::batch_pay`] (wallet flow) and [`finalize_batch_payment`] (external signer).
104///
105/// Returns an error if any non-zero-amount quote hash is missing from `tx_hash_map`,
106/// since chunks uploaded without valid proofs would be rejected by the network.
107fn build_paid_chunks(
108    prepared: Vec<PreparedChunk>,
109    tx_hash_map: &HashMap<QuoteHash, TxHash>,
110) -> Result<Vec<PaidChunk>> {
111    let mut paid_chunks = Vec::with_capacity(prepared.len());
112    for chunk in prepared {
113        let mut tx_hashes = Vec::new();
114        for info in &chunk.payment.quotes {
115            if !info.amount.is_zero() {
116                let tx_hash = tx_hash_map.get(&info.quote_hash).copied().ok_or_else(|| {
117                    Error::Payment(format!(
118                        "Missing tx hash for quote {} — external signer did not return a receipt for this payment",
119                        hex::encode(info.quote_hash)
120                    ))
121                })?;
122                tx_hashes.push(tx_hash);
123            }
124        }
125
126        let proof = PaymentProof {
127            proof_of_payment: ProofOfPayment {
128                peer_quotes: chunk.peer_quotes,
129            },
130            tx_hashes,
131        };
132
133        let proof_bytes = serialize_single_node_proof(&proof)
134            .map_err(|e| Error::Serialization(format!("Failed to serialize payment proof: {e}")))?;
135
136        paid_chunks.push(PaidChunk {
137            content: chunk.content,
138            address: chunk.address,
139            quoted_peers: chunk.quoted_peers,
140            proof_bytes,
141        });
142    }
143    Ok(paid_chunks)
144}
145
146/// Finalize a batch payment using externally-provided transaction hashes.
147///
148/// Takes prepared chunks and a map of `quote_hash -> tx_hash` from the
149/// external signer. Builds per-chunk `PaymentProof` bytes without needing a wallet.
150pub fn finalize_batch_payment(
151    prepared: Vec<PreparedChunk>,
152    tx_hash_map: &HashMap<QuoteHash, TxHash>,
153) -> Result<Vec<PaidChunk>> {
154    build_paid_chunks(prepared, tx_hash_map)
155}
156
157impl Client {
158    /// Prepare a single chunk for batch payment.
159    ///
160    /// Collects quotes and uses node-reported prices without making any
161    /// on-chain transaction. Returns `Ok(None)` if the chunk is already
162    /// stored on the network.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if quote collection or payment construction fails.
167    pub async fn prepare_chunk_payment(&self, content: Bytes) -> Result<Option<PreparedChunk>> {
168        let address = compute_address(&content);
169        let data_size = u64::try_from(content.len())
170            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
171
172        let quotes_with_peers = match self
173            .get_store_quotes(&address, data_size, DATA_TYPE_CHUNK)
174            .await
175        {
176            Ok(quotes) => quotes,
177            Err(Error::AlreadyStored) => {
178                debug!("Chunk {} already stored, skipping", hex::encode(address));
179                return Ok(None);
180            }
181            Err(e) => return Err(e),
182        };
183
184        // Capture all quoted peers for close-group replication.
185        let quoted_peers: Vec<(PeerId, Vec<MultiAddr>)> = quotes_with_peers
186            .iter()
187            .map(|(peer_id, addrs, _, _)| (*peer_id, addrs.clone()))
188            .collect();
189
190        // Build peer_quotes for ProofOfPayment + quotes for SingleNodePayment.
191        // Use node-reported prices directly — no contract price fetch needed.
192        let mut peer_quotes = Vec::with_capacity(quotes_with_peers.len());
193        let mut quotes_for_payment = Vec::with_capacity(quotes_with_peers.len());
194
195        for (peer_id, _addrs, quote, price) in quotes_with_peers {
196            let encoded = peer_id_to_encoded(&peer_id)?;
197            peer_quotes.push((encoded, quote.clone()));
198            quotes_for_payment.push((quote, price));
199        }
200
201        let payment = SingleNodePayment::from_quotes(quotes_for_payment)
202            .map_err(|e| Error::Payment(format!("Failed to create payment: {e}")))?;
203
204        Ok(Some(PreparedChunk {
205            content,
206            address,
207            quoted_peers,
208            payment,
209            peer_quotes,
210        }))
211    }
212
213    /// Pay for multiple chunks in a single EVM transaction.
214    ///
215    /// Flattens all quote payments from the prepared chunks into one
216    /// `wallet.pay_for_quotes()` call, then maps transaction hashes
217    /// back to per-chunk [`PaymentProof`] bytes.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the wallet is not configured or the on-chain
222    /// payment fails.
223    /// Returns `(paid_chunks, storage_cost_atto, gas_cost_wei)`.
224    pub async fn batch_pay(
225        &self,
226        prepared: Vec<PreparedChunk>,
227    ) -> Result<(Vec<PaidChunk>, String, u128)> {
228        if prepared.is_empty() {
229            return Ok((Vec::new(), "0".to_string(), 0));
230        }
231
232        let wallet = self.require_wallet()?;
233
234        // Compute total storage cost from the prepared chunks before paying.
235        let intent = PaymentIntent::from_prepared_chunks(&prepared);
236        let storage_cost_atto = intent.total_amount.to_string();
237
238        // Flatten all quote payments from all chunks into a single batch.
239        let total_quotes: usize = prepared.iter().map(|c| c.payment.quotes.len()).sum();
240        let mut all_payments = Vec::with_capacity(total_quotes);
241        for chunk in &prepared {
242            for info in &chunk.payment.quotes {
243                all_payments.push((info.quote_hash, info.rewards_address, info.amount));
244            }
245        }
246
247        debug!(
248            "Batch payment for {} chunks ({} quote entries)",
249            prepared.len(),
250            all_payments.len()
251        );
252
253        let (tx_hash_map, gas_info) =
254            wallet
255                .pay_for_quotes(all_payments)
256                .await
257                .map_err(|PayForQuotesError(err, _)| {
258                    Error::Payment(format!("Batch payment failed: {err}"))
259                })?;
260
261        info!(
262            "Batch payment succeeded: {} transactions",
263            tx_hash_map.len()
264        );
265
266        let tx_hash_map: HashMap<QuoteHash, TxHash> = tx_hash_map.into_iter().collect();
267        let paid_chunks = build_paid_chunks(prepared, &tx_hash_map)?;
268        Ok((paid_chunks, storage_cost_atto, gas_info.gas_cost_wei))
269    }
270
271    /// Upload chunks in waves with pipelined EVM payments.
272    ///
273    /// Processes chunks in waves of `PAYMENT_WAVE_SIZE` (64). Within each wave:
274    /// 1. **Prepare**: collect quotes for all chunks concurrently
275    /// 2. **Pay**: single EVM transaction for the whole wave
276    /// 3. **Store**: concurrent chunk replication to close group
277    ///
278    /// Stores from wave N overlap with quote collection for wave N+1
279    /// via `tokio::join!`.
280    ///
281    /// # Errors
282    ///
283    /// Returns an error if any payment or store operation fails.
284    /// Returns `(addresses, total_storage_cost_atto, total_gas_cost_wei)`.
285    pub async fn batch_upload_chunks(
286        &self,
287        chunks: Vec<Bytes>,
288    ) -> Result<(Vec<XorName>, String, u128)> {
289        self.batch_upload_chunks_with_events(chunks, None, 0, 0)
290            .await
291    }
292
293    /// Same as [`Client::batch_upload_chunks`] but sends [`UploadEvent::ChunkStored`]
294    /// events as each chunk is stored, enabling per-chunk progress bars.
295    ///
296    /// `stored_offset` is the number of chunks already stored in previous waves
297    /// (so events report cumulative progress). `file_total` is the total chunk
298    /// count across ALL waves (for the `total` field in events).
299    pub async fn batch_upload_chunks_with_events(
300        &self,
301        chunks: Vec<Bytes>,
302        progress: Option<&mpsc::Sender<UploadEvent>>,
303        stored_offset: usize,
304        file_total: usize,
305    ) -> Result<(Vec<XorName>, String, u128)> {
306        if chunks.is_empty() {
307            return Ok((Vec::new(), "0".to_string(), 0));
308        }
309
310        let total_chunks = chunks.len();
311        let quote_cap = self.controller().quote.current();
312        let store_cap = self.controller().store.current();
313        debug!(
314            "Batch uploading {total_chunks} chunks in waves of {PAYMENT_WAVE_SIZE} \
315             (current adaptive caps — quote: {quote_cap}, store: {store_cap})"
316        );
317
318        let mut all_addresses = Vec::with_capacity(total_chunks);
319        let mut seen_addresses: HashSet<XorName> = HashSet::new();
320
321        // Accumulate costs across waves.
322        let mut total_storage = Amount::ZERO;
323        let mut total_gas: u128 = 0;
324
325        // Deduplicate chunks by content address.
326        let mut unique_chunks = Vec::with_capacity(total_chunks);
327        for chunk in chunks {
328            let address = compute_address(&chunk);
329            if seen_addresses.insert(address) {
330                unique_chunks.push(chunk);
331            } else {
332                debug!("Skipping duplicate chunk {}", hex::encode(address));
333                all_addresses.push(address);
334                if let Some(tx) = progress {
335                    let _ = tx.try_send(UploadEvent::ChunkStored {
336                        stored: stored_offset + all_addresses.len(),
337                        total: file_total,
338                    });
339                }
340            }
341        }
342
343        // Split into waves.
344        let waves: Vec<Vec<Bytes>> = unique_chunks
345            .chunks(PAYMENT_WAVE_SIZE)
346            .map(<[Bytes]>::to_vec)
347            .collect();
348        let wave_count = waves.len();
349
350        debug!(
351            "{total_chunks} chunks -> {} unique -> {wave_count} waves",
352            seen_addresses.len()
353        );
354
355        let mut pending_store: Option<Vec<PaidChunk>> = None;
356        let mut total_quoted: usize = 0;
357
358        for (wave_idx, wave_chunks) in waves.into_iter().enumerate() {
359            let wave_num = wave_idx + 1;
360            let wave_size = wave_chunks.len();
361
362            // Pipeline: store previous wave while preparing this one.
363            let (prepare_result, store_result) = match pending_store.take() {
364                Some(paid_chunks) => {
365                    let store_offset = stored_offset + all_addresses.len();
366                    let quoted_offset = stored_offset + total_quoted;
367                    let (prep, stored) = tokio::join!(
368                        self.prepare_wave(wave_chunks, progress, quoted_offset, file_total),
369                        self.store_paid_chunks_with_events(
370                            paid_chunks,
371                            progress,
372                            store_offset,
373                            file_total
374                        )
375                    );
376                    (prep, Some(stored))
377                }
378                None => {
379                    let quoted_offset = stored_offset + total_quoted;
380                    let result = self
381                        .prepare_wave(wave_chunks, progress, quoted_offset, file_total)
382                        .await;
383                    (result, None)
384                }
385            };
386            total_quoted += wave_size;
387
388            // Track partial progress from previous wave.
389            if let Some(wave_result) = store_result {
390                all_addresses.extend(&wave_result.stored);
391                if !wave_result.failed.is_empty() {
392                    let failed_count = wave_result.failed.len();
393                    warn!("{failed_count} chunks failed to store after retries");
394                    return Err(Error::PartialUpload {
395                        stored: all_addresses.clone(),
396                        stored_count: stored_offset + all_addresses.len(),
397                        failed: wave_result.failed,
398                        failed_count,
399                        total_chunks: file_total,
400                        reason: "wave store failed after retries".into(),
401                    });
402                }
403            }
404
405            let (prepared_chunks, already_stored) = prepare_result?;
406            all_addresses.extend(&already_stored);
407            if let Some(tx) = progress {
408                for _ in &already_stored {
409                    let _ = tx.try_send(UploadEvent::ChunkStored {
410                        stored: stored_offset + all_addresses.len(),
411                        total: file_total,
412                    });
413                }
414            }
415
416            if prepared_chunks.is_empty() {
417                info!("Wave {wave_num}/{wave_count}: all chunks already stored");
418                continue;
419            }
420
421            info!(
422                "Wave {wave_num}/{wave_count}: paying for {} chunks",
423                prepared_chunks.len()
424            );
425            let (paid_chunks, wave_storage, wave_gas) = self.batch_pay(prepared_chunks).await?;
426            if let Ok(cost) = wave_storage.parse::<Amount>() {
427                total_storage += cost;
428            }
429            total_gas = total_gas.saturating_add(wave_gas);
430            pending_store = Some(paid_chunks);
431        }
432
433        // Store the last wave.
434        if let Some(paid_chunks) = pending_store {
435            let store_offset = stored_offset + all_addresses.len();
436            let wave_result = self
437                .store_paid_chunks_with_events(paid_chunks, progress, store_offset, file_total)
438                .await;
439            all_addresses.extend(&wave_result.stored);
440            if !wave_result.failed.is_empty() {
441                let failed_count = wave_result.failed.len();
442                warn!("{failed_count} chunks failed to store after retries (final wave)");
443                return Err(Error::PartialUpload {
444                    stored: all_addresses.clone(),
445                    stored_count: stored_offset + all_addresses.len(),
446                    failed: wave_result.failed,
447                    failed_count,
448                    total_chunks: file_total,
449                    reason: "final wave store failed after retries".into(),
450                });
451            }
452        }
453
454        debug!("Batch upload complete: {} addresses", all_addresses.len());
455        Ok((all_addresses, total_storage.to_string(), total_gas))
456    }
457
458    /// Prepare a wave of chunks by collecting quotes concurrently.
459    ///
460    /// Fires [`UploadEvent::ChunkQuoted`] as each chunk's quote completes.
461    /// Returns `(prepared_chunks, already_stored_addresses)`.
462    async fn prepare_wave(
463        &self,
464        chunks: Vec<Bytes>,
465        progress: Option<&mpsc::Sender<UploadEvent>>,
466        quoted_offset: usize,
467        file_total: usize,
468    ) -> Result<(Vec<PreparedChunk>, Vec<XorName>)> {
469        let chunk_count = chunks.len();
470        let chunks_with_addr: Vec<(Bytes, XorName)> = chunks
471            .into_iter()
472            .map(|c| {
473                let addr = compute_address(&c);
474                (c, addr)
475            })
476            .collect();
477
478        let quote_limiter = self.controller().quote.clone();
479        // Batch-aware fan-out: clamp to chunk_count so we never
480        // pay for fan-out slots we cannot fill on a partial wave.
481        // See PERF-RESULTS.md — measured ~30% slowdown when
482        // cap > batch size on quoting workloads (live mainnet).
483        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
484        let mut quote_stream = stream::iter(chunks_with_addr)
485            .map(|(content, address)| {
486                let limiter = quote_limiter.clone();
487                async move {
488                    let result = observe_op(
489                        &limiter,
490                        || async move { self.prepare_chunk_payment(content).await },
491                        classify_error,
492                    )
493                    .await;
494                    (address, result)
495                }
496            })
497            .buffer_unordered(quote_concurrency);
498
499        let mut prepared = Vec::with_capacity(chunk_count);
500        let mut already_stored = Vec::new();
501        let mut quoted_count = 0usize;
502
503        while let Some((address, result)) = quote_stream.next().await {
504            let chunk_already_stored = result.as_ref().is_ok_and(|r| r.is_none());
505            match result? {
506                Some(chunk) => prepared.push(chunk),
507                None => already_stored.push(address),
508            }
509            quoted_count += 1;
510            let progress_num = quoted_offset + quoted_count;
511            if file_total > 0 {
512                if chunk_already_stored {
513                    info!("Verified {progress_num}/{file_total} (already stored)");
514                } else {
515                    info!("Quoted {progress_num}/{file_total}");
516                }
517            }
518            if let Some(tx) = progress {
519                let _ = tx.try_send(UploadEvent::ChunkQuoted {
520                    quoted: progress_num,
521                    total: file_total,
522                });
523            }
524        }
525
526        Ok((prepared, already_stored))
527    }
528
529    /// Store a batch of paid chunks concurrently to their close groups.
530    ///
531    /// Retries failed chunks up to 3 times with exponential backoff (500ms, 1s, 2s).
532    /// Returns a [`WaveResult`] with both successes and failures so callers can
533    /// track partial progress instead of losing information about stored chunks.
534    ///
535    /// When `progress` is `Some`, sends [`UploadEvent::ChunkStored`] as each
536    /// chunk is successfully stored. `stored_before` is the count of chunks
537    /// already stored in previous waves so the event reports an accurate
538    /// cumulative total; `total_chunks` is the total across all waves. Pass
539    /// `None`/0/0 when progress reporting is not needed.
540    pub(crate) async fn store_paid_chunks_with_events(
541        &self,
542        paid_chunks: Vec<PaidChunk>,
543        progress: Option<&mpsc::Sender<UploadEvent>>,
544        stored_before: usize,
545        total_chunks: usize,
546    ) -> WaveResult {
547        const MAX_RETRIES: u32 = 3;
548        const BASE_DELAY_MS: u64 = 500;
549
550        let mut stored = Vec::new();
551        let mut to_retry = paid_chunks;
552
553        for attempt in 0..=MAX_RETRIES {
554            if attempt > 0 {
555                let delay = Duration::from_millis(BASE_DELAY_MS * 2u64.pow(attempt - 1));
556                tokio::time::sleep(delay).await;
557                info!(
558                    "Retry attempt {attempt}/{MAX_RETRIES} for {} chunks",
559                    to_retry.len()
560                );
561            }
562
563            let store_limiter = self.controller().store.clone();
564            let store_concurrency = store_limiter.current().min(to_retry.len().max(1));
565            let mut upload_stream = stream::iter(to_retry)
566                .map(|chunk| {
567                    let chunk_clone = chunk.clone();
568                    let limiter = store_limiter.clone();
569                    async move {
570                        let result = observe_op(
571                            &limiter,
572                            || async move {
573                                self.chunk_put_to_close_group(
574                                    chunk.content,
575                                    chunk.proof_bytes,
576                                    &chunk.quoted_peers,
577                                )
578                                .await
579                            },
580                            classify_error,
581                        )
582                        .await;
583                        (chunk_clone, result)
584                    }
585                })
586                .buffer_unordered(store_concurrency);
587
588            let mut failed_this_round = Vec::new();
589            while let Some((chunk, result)) = upload_stream.next().await {
590                match result {
591                    Ok(name) => {
592                        stored.push(name);
593                        let stored_num = stored_before + stored.len();
594                        if total_chunks > 0 {
595                            info!("Stored {stored_num}/{total_chunks}");
596                        }
597                        if let Some(tx) = progress {
598                            let _ = tx.try_send(UploadEvent::ChunkStored {
599                                stored: stored_num,
600                                total: total_chunks,
601                            });
602                        }
603                    }
604                    Err(e) => failed_this_round.push((chunk, e.to_string())),
605                }
606            }
607
608            if failed_this_round.is_empty() {
609                return WaveResult {
610                    stored,
611                    failed: Vec::new(),
612                };
613            }
614
615            if attempt == MAX_RETRIES {
616                let failed = failed_this_round
617                    .into_iter()
618                    .map(|(c, e)| (c.address, e))
619                    .collect();
620                return WaveResult { stored, failed };
621            }
622
623            warn!(
624                "{} chunks failed on attempt {}, will retry",
625                failed_this_round.len(),
626                attempt + 1
627            );
628            to_retry = failed_this_round.into_iter().map(|(c, _)| c).collect();
629        }
630
631        // Unreachable due to loop structure, but satisfy the compiler.
632        WaveResult {
633            stored,
634            failed: Vec::new(),
635        }
636    }
637}
638
639/// Compile-time assertions that batch method futures are Send.
640#[cfg(test)]
641mod send_assertions {
642    use super::*;
643
644    fn _assert_send<T: Send>(_: &T) {}
645
646    #[allow(dead_code)]
647    async fn _batch_upload_is_send(client: &Client) {
648        let fut = client.batch_upload_chunks(Vec::new());
649        _assert_send(&fut);
650    }
651}
652
653#[cfg(test)]
654#[allow(clippy::unwrap_used)]
655mod tests {
656    use super::*;
657    use ant_protocol::payment::QuotePaymentInfo;
658    use ant_protocol::CLOSE_GROUP_SIZE;
659
660    /// Median index in the quotes array.
661    const MEDIAN_INDEX: usize = CLOSE_GROUP_SIZE / 2;
662
663    /// Helper: build a `PreparedChunk` with `median_amount` at the median
664    /// quote index and zero for all other quotes. Adapts automatically to
665    /// `CLOSE_GROUP_SIZE` changes.
666    fn make_prepared_chunk(median_amount: u64) -> PreparedChunk {
667        let quotes: [QuotePaymentInfo; CLOSE_GROUP_SIZE] = std::array::from_fn(|i| {
668            let amount = if i == MEDIAN_INDEX { median_amount } else { 0 };
669            QuotePaymentInfo {
670                quote_hash: QuoteHash::from([i as u8 + 1; 32]),
671                rewards_address: RewardsAddress::new([i as u8 + 10; 20]),
672                amount: Amount::from(amount),
673                price: Amount::from(amount),
674            }
675        });
676
677        PreparedChunk {
678            content: Bytes::from(vec![0xAA; 32]),
679            address: [0u8; 32],
680            quoted_peers: Vec::new(),
681            payment: SingleNodePayment { quotes },
682            peer_quotes: Vec::new(),
683        }
684    }
685
686    #[test]
687    fn payment_intent_from_single_chunk() {
688        let chunk = make_prepared_chunk(300);
689        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
690
691        assert_eq!(intent.payments.len(), 1, "only non-zero amounts");
692        assert_eq!(intent.total_amount, Amount::from(300));
693
694        let (hash, addr, amt) = &intent.payments[0];
695        assert_eq!(*hash, QuoteHash::from([MEDIAN_INDEX as u8 + 1; 32]));
696        assert_eq!(*addr, RewardsAddress::new([MEDIAN_INDEX as u8 + 10; 20]));
697        assert_eq!(*amt, Amount::from(300));
698    }
699
700    #[test]
701    fn payment_intent_from_multiple_chunks() {
702        let c1 = make_prepared_chunk(100);
703        let c2 = make_prepared_chunk(250);
704        let intent = PaymentIntent::from_prepared_chunks(&[c1, c2]);
705
706        assert_eq!(intent.payments.len(), 2);
707        assert_eq!(intent.total_amount, Amount::from(350));
708    }
709
710    #[test]
711    fn payment_intent_skips_all_zero_chunks() {
712        let chunk = make_prepared_chunk(0);
713        let intent = PaymentIntent::from_prepared_chunks(&[chunk]);
714
715        assert!(intent.payments.is_empty());
716        assert_eq!(intent.total_amount, Amount::ZERO);
717    }
718
719    #[test]
720    fn payment_intent_empty_input() {
721        let intent = PaymentIntent::from_prepared_chunks(&[]);
722        assert!(intent.payments.is_empty());
723        assert_eq!(intent.total_amount, Amount::ZERO);
724    }
725
726    #[test]
727    fn finalize_batch_payment_builds_proofs() {
728        let chunk = make_prepared_chunk(500);
729        let quote_hash = chunk.payment.quotes[MEDIAN_INDEX].quote_hash;
730
731        let mut tx_map = HashMap::new();
732        tx_map.insert(quote_hash, TxHash::from([0xBB; 32]));
733
734        let paid = finalize_batch_payment(vec![chunk], &tx_map).unwrap();
735
736        assert_eq!(paid.len(), 1);
737        assert!(!paid[0].proof_bytes.is_empty());
738        assert_eq!(paid[0].address, [0u8; 32]);
739    }
740
741    #[test]
742    fn finalize_batch_payment_empty_input() {
743        let paid = finalize_batch_payment(vec![], &HashMap::new()).unwrap();
744        assert!(paid.is_empty());
745    }
746
747    #[test]
748    fn finalize_batch_payment_missing_tx_hash_errors() {
749        // Missing tx hash for a non-zero-amount quote should error,
750        // since the chunk would be rejected by the network without a valid proof.
751        let chunk = make_prepared_chunk(500);
752
753        let result = finalize_batch_payment(vec![chunk], &HashMap::new());
754        assert!(result.is_err());
755        let err = result.unwrap_err().to_string();
756        assert!(err.contains("Missing tx hash"), "got: {err}");
757    }
758
759    #[test]
760    fn finalize_batch_payment_multiple_chunks() {
761        let c1 = make_prepared_chunk(100);
762        let c2 = make_prepared_chunk(200);
763        let q1 = c1.payment.quotes[MEDIAN_INDEX].quote_hash;
764        let mut tx_map = HashMap::new();
765        // Both chunks have the same quote_hash (same index/byte pattern)
766        // so one tx_hash covers both
767        tx_map.insert(q1, TxHash::from([0xCC; 32]));
768
769        let paid = finalize_batch_payment(vec![c1, c2], &tx_map).unwrap();
770        assert_eq!(paid.len(), 2);
771    }
772}