Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::observe_op;
14use crate::data::client::batch::{finalize_batch_payment, PaymentIntent, PreparedChunk};
15use crate::data::client::classify_error;
16use crate::data::client::merkle::{
17    finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
18    PreparedMerkleBatch,
19};
20use crate::data::client::Client;
21use crate::data::error::{Error, Result};
22use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
23use ant_protocol::transport::{MultiAddr, PeerId};
24use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
25use bytes::Bytes;
26use fs2::FileExt;
27use futures::stream::{self, StreamExt};
28use self_encryption::{get_root_data_map_parallel, stream_encrypt, streaming_decrypt, DataMap};
29use std::collections::{HashMap, HashSet};
30use std::io::Write;
31use std::path::{Path, PathBuf};
32use std::sync::{Arc, Mutex};
33use tokio::runtime::Handle;
34use tokio::sync::mpsc;
35use tracing::{debug, info, warn};
36use xor_name::XorName;
37
38/// Progress events emitted during file upload for UI feedback.
39#[derive(Debug, Clone)]
40pub enum UploadEvent {
41    /// A chunk has been encrypted and spilled to disk.
42    Encrypting { chunks_done: usize },
43    /// File encryption complete.
44    Encrypted { total_chunks: usize },
45    /// Starting quote collection for a wave.
46    QuotingChunks {
47        wave: usize,
48        total_waves: usize,
49        chunks_in_wave: usize,
50    },
51    /// A chunk has been quoted (peer discovery + price received).
52    /// This is the slow phase — each quote involves network round-trips.
53    ChunkQuoted { quoted: usize, total: usize },
54    /// A chunk has been stored on the network.
55    ChunkStored { stored: usize, total: usize },
56    /// A wave has completed.
57    WaveComplete {
58        wave: usize,
59        total_waves: usize,
60        stored_so_far: usize,
61        total: usize,
62    },
63}
64
65/// Progress events emitted during file download for UI feedback.
66#[derive(Debug, Clone)]
67pub enum DownloadEvent {
68    /// Resolving hierarchical DataMap to discover real chunk count.
69    ResolvingDataMap { total_map_chunks: usize },
70    /// A DataMap chunk has been fetched during resolution.
71    MapChunkFetched { fetched: usize },
72    /// DataMap resolved — total data chunk count now known.
73    DataMapResolved { total_chunks: usize },
74    /// Data chunks are being fetched from the network.
75    ChunksFetched { fetched: usize, total: usize },
76}
77
78/// One entry in the per-chunk quote list returned by
79/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
80/// signed quote it returned, and the payment amount it is demanding.
81type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
82
83/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
84const UPLOAD_WAVE_SIZE: usize = 64;
85
86/// Maximum number of distinct chunk addresses to sample when probing for a
87/// representative quote in [`Client::estimate_upload_cost`].
88///
89/// Bounded small so we never spend more than a couple of round-trips on the
90/// `AlreadyStored` retry path, which only matters when many leading chunks
91/// of a file already live on the network.
92const ESTIMATE_SAMPLE_CAP: usize = 5;
93
94/// Gas used by one `pay_for_quotes` transaction that packs up to
95/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
96///
97/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
98/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
99/// the base tx overhead. On Arbitrum that is roughly
100/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
101/// conservative per-wave upper bound.
102const GAS_PER_WAVE_TX: u128 = 1_500_000;
103
104/// Gas used by one merkle batch payment transaction.
105///
106/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
107/// and posts a pool commitment, so budget higher than a plain transfer.
108const GAS_PER_MERKLE_TX: u128 = 500_000;
109
110/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
111/// figure when no live gas oracle is consulted.
112///
113/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
114/// that as the default so the CLI prints a sensible order-of-magnitude
115/// number. Users should treat the reported gas cost as an estimate, not a
116/// commitment — real gas is bid at submission time.
117const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
118
119/// Extra headroom percentage for disk space check.
120///
121/// Encrypted chunks are slightly larger than the source data due to padding
122/// and self-encryption overhead. We require file_size + 10% free space in
123/// the temp directory to account for this.
124const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
125
126/// Temporary on-disk buffer for encrypted chunks.
127///
128/// During file encryption, chunks are written to a temp directory so that
129/// only their 32-byte addresses stay in memory. At upload time chunks are
130/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
131/// Maximum age (in seconds) for orphaned spill directories.
132/// Dirs older than this are cleaned up if they have no active lockfile.
133const SPILL_MAX_AGE_SECS: u64 = 24 * 60 * 60; // 24 hours
134
135/// Prefix for spill directory names to distinguish from user files.
136const SPILL_DIR_PREFIX: &str = "spill_";
137
138/// Lockfile name inside each spill dir to signal active use.
139const SPILL_LOCK_NAME: &str = ".lock";
140
141struct ChunkSpill {
142    /// Directory holding spilled chunk files (named by hex address).
143    dir: PathBuf,
144    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
145    _lock: std::fs::File,
146    /// Deduplicated list of chunk addresses.
147    addresses: Vec<[u8; 32]>,
148    /// Tracks seen addresses for deduplication.
149    seen: HashSet<[u8; 32]>,
150    /// Running total of unique chunk byte sizes (for average-size calculation).
151    total_bytes: u64,
152}
153
154impl ChunkSpill {
155    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
156    fn spill_root() -> Result<PathBuf> {
157        use crate::config;
158        let root = config::data_dir()
159            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
160            .join("spill");
161        Ok(root)
162    }
163
164    /// Create a new spill directory under `<data_dir>/spill/`.
165    ///
166    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
167    /// identified by prefix and cleaned up by age. A lockfile inside the
168    /// dir prevents concurrent cleanup from deleting an active spill.
169    fn new() -> Result<Self> {
170        let root = Self::spill_root()?;
171        std::fs::create_dir_all(&root)?;
172
173        // Clean up stale spill dirs from previous crashed runs.
174        Self::cleanup_stale(&root);
175
176        let now = std::time::SystemTime::now()
177            .duration_since(std::time::UNIX_EPOCH)
178            .unwrap_or_default()
179            .as_secs();
180        let unique: u64 = rand::random();
181        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
182        std::fs::create_dir(&dir)?;
183
184        // Create and hold a lockfile for the lifetime of this spill.
185        // cleanup_stale() will skip dirs with locked files.
186        let lock_path = dir.join(SPILL_LOCK_NAME);
187        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
188            Error::Io(std::io::Error::new(
189                e.kind(),
190                format!("failed to create spill lockfile: {e}"),
191            ))
192        })?;
193        lock_file.try_lock_exclusive().map_err(|e| {
194            Error::Io(std::io::Error::new(
195                e.kind(),
196                format!("failed to lock spill lockfile: {e}"),
197            ))
198        })?;
199
200        Ok(Self {
201            dir,
202            _lock: lock_file,
203            addresses: Vec::new(),
204            seen: HashSet::new(),
205            total_bytes: 0,
206        })
207    }
208
209    /// Clean up stale spill directories. Best-effort, errors are logged.
210    ///
211    /// Only removes directories that:
212    /// 1. Start with `SPILL_DIR_PREFIX` (ignores unrelated files)
213    /// 2. Are actual directories (not symlinks -- prevents symlink attacks)
214    /// 3. Have a timestamp older than `SPILL_MAX_AGE_SECS`
215    /// 4. Do NOT have an active lockfile (prevents deleting in-progress uploads)
216    ///
217    /// Safe to call concurrently from multiple processes.
218    fn cleanup_stale(root: &Path) {
219        let now = std::time::SystemTime::now()
220            .duration_since(std::time::UNIX_EPOCH)
221            .unwrap_or_default()
222            .as_secs();
223
224        if now == 0 {
225            // Clock is broken (before Unix epoch). Skip cleanup to avoid
226            // misidentifying dirs as stale.
227            warn!("System clock before Unix epoch, skipping spill cleanup");
228            return;
229        }
230
231        let entries = match std::fs::read_dir(root) {
232            Ok(entries) => entries,
233            Err(_) => return,
234        };
235
236        for entry in entries.flatten() {
237            let name = entry.file_name();
238            let name_str = name.to_string_lossy();
239
240            // Only process dirs with our prefix.
241            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
242                Some(s) => s,
243                None => continue,
244            };
245
246            // Parse timestamp: "spill_<timestamp>_<random>"
247            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
248                Some(ts) => ts,
249                None => continue,
250            };
251
252            if now.saturating_sub(timestamp) <= SPILL_MAX_AGE_SECS {
253                continue;
254            }
255
256            // Safety: only delete actual directories, not symlinks.
257            let file_type = match entry.file_type() {
258                Ok(ft) => ft,
259                Err(_) => continue,
260            };
261            if !file_type.is_dir() {
262                continue;
263            }
264
265            let path = entry.path();
266
267            // Check lockfile: if locked, the dir is in active use -- skip it.
268            let lock_path = path.join(SPILL_LOCK_NAME);
269            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
270                use fs2::FileExt;
271                if lock_file.try_lock_exclusive().is_err() {
272                    // Lock held by another process -- dir is active.
273                    debug!("Skipping active spill dir: {}", path.display());
274                    continue;
275                }
276                // We acquired the lock, so no one else holds it.
277                // Drop it before deleting.
278                drop(lock_file);
279            }
280
281            info!("Cleaning up stale spill dir: {}", path.display());
282            if let Err(e) = std::fs::remove_dir_all(&path) {
283                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
284            }
285        }
286    }
287
288    /// Run stale spill cleanup. Call at client startup or periodically.
289    #[allow(dead_code)]
290    pub(crate) fn run_cleanup() {
291        if let Ok(root) = Self::spill_root() {
292            Self::cleanup_stale(&root);
293        }
294    }
295
296    /// Write one encrypted chunk to disk and record its address.
297    ///
298    /// Deduplicates by content address: if the same chunk was already
299    /// spilled, the write and accounting are skipped. This prevents
300    /// double-uploads and inflated quoting metrics.
301    fn push(&mut self, content: &[u8]) -> Result<()> {
302        let address = compute_address(content);
303        if !self.seen.insert(address) {
304            return Ok(());
305        }
306        let path = self.dir.join(hex::encode(address));
307        std::fs::write(&path, content)?;
308        self.total_bytes += content.len() as u64;
309        self.addresses.push(address);
310        Ok(())
311    }
312
313    /// Number of chunks stored.
314    fn len(&self) -> usize {
315        self.addresses.len()
316    }
317
318    /// Total bytes of all spilled chunks.
319    fn total_bytes(&self) -> u64 {
320        self.total_bytes
321    }
322
323    /// Average chunk size in bytes (for quoting metrics).
324    fn avg_chunk_size(&self) -> u64 {
325        if self.addresses.is_empty() {
326            return 0;
327        }
328        self.total_bytes / self.addresses.len() as u64
329    }
330
331    /// Read a single chunk back from disk by address.
332    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
333        let path = self.dir.join(hex::encode(address));
334        let data = std::fs::read(&path).map_err(|e| {
335            Error::Io(std::io::Error::new(
336                e.kind(),
337                format!("reading spilled chunk {}: {e}", hex::encode(address)),
338            ))
339        })?;
340        Ok(Bytes::from(data))
341    }
342
343    /// Iterate over address slices in wave-sized groups.
344    fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> {
345        self.addresses.chunks(UPLOAD_WAVE_SIZE)
346    }
347
348    /// Read a wave of chunks from disk.
349    fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
350        let mut out = Vec::with_capacity(wave_addrs.len());
351        for addr in wave_addrs {
352            let content = self.read_chunk(addr)?;
353            out.push((content, *addr));
354        }
355        Ok(out)
356    }
357
358    /// Clean up the spill directory.
359    fn cleanup(&self) {
360        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
361            warn!(
362                "Failed to clean up chunk spill dir {}: {e}",
363                self.dir.display()
364            );
365        }
366    }
367}
368
369impl Drop for ChunkSpill {
370    fn drop(&mut self) {
371        self.cleanup();
372    }
373}
374
375/// Check that the spill directory has enough free space for the spilled chunks.
376///
377/// `file_size` is the source file's byte count. We require
378/// `file_size + 10%` free space to account for self-encryption overhead.
379fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
380    let spill_root = ChunkSpill::spill_root()?;
381
382    // Ensure the root exists so fs2 can query it.
383    std::fs::create_dir_all(&spill_root)?;
384
385    let available = fs2::available_space(&spill_root).map_err(|e| {
386        Error::Io(std::io::Error::new(
387            e.kind(),
388            format!(
389                "failed to query disk space on {}: {e}",
390                spill_root.display()
391            ),
392        ))
393    })?;
394
395    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
396    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
397    let required = file_size.saturating_add(headroom);
398
399    if available < required {
400        let avail_mb = available / (1024 * 1024);
401        let req_mb = required / (1024 * 1024);
402        return Err(Error::InsufficientDiskSpace(format!(
403            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
404            spill_root.display()
405        )));
406    }
407
408    debug!(
409        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
410        spill_root.display()
411    );
412    Ok(())
413}
414
415/// Whether the data map is published to the network for address-based retrieval.
416///
417/// A private upload stores only the data chunks and returns the `DataMap` to
418/// the caller — only someone holding that `DataMap` can reconstruct the file.
419/// A public upload additionally stores the serialized `DataMap` as a chunk on
420/// the network, yielding a single chunk address that anyone can use to
421/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
422#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
423pub enum Visibility {
424    /// Keep the data map local; only the holder can retrieve the file.
425    #[default]
426    Private,
427    /// Publish the data map as a network chunk so anyone with the returned
428    /// address can retrieve and decrypt the file.
429    Public,
430}
431
432/// Estimated cost of uploading a file, returned by
433/// [`Client::estimate_upload_cost`].
434#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
435pub struct UploadCostEstimate {
436    /// Original file size in bytes.
437    pub file_size: u64,
438    /// Number of chunks the file would be split into (data chunks only,
439    /// does not include the DataMap chunk added during public uploads).
440    pub chunk_count: usize,
441    /// Estimated total storage cost in atto (token smallest unit).
442    pub storage_cost_atto: String,
443    /// Estimated gas cost in wei as a string. This is a rough heuristic
444    /// based on chunk count and payment mode, NOT a live gas price query.
445    pub estimated_gas_cost_wei: String,
446    /// Payment mode that would be used.
447    pub payment_mode: PaymentMode,
448}
449
450/// Result of a file upload: the `DataMap` needed to retrieve the file.
451///
452/// Marked `#[non_exhaustive]` so adding a new field in future is not a
453/// breaking change for downstream consumers that construct or pattern-match
454/// on this struct.
455#[derive(Debug, Clone)]
456#[non_exhaustive]
457pub struct FileUploadResult {
458    /// The data map containing chunk metadata for reconstruction.
459    pub data_map: DataMap,
460    /// Number of chunks stored on the network.
461    pub chunks_stored: usize,
462    /// Number of chunks that failed to store. Always 0 for a successful
463    /// upload — partial-failure information is conveyed via
464    /// [`crate::data::Error::PartialUpload`] instead.
465    pub chunks_failed: usize,
466    /// Total number of chunks the upload attempted to store. On full
467    /// success this equals `chunks_stored`.
468    pub total_chunks: usize,
469    /// Which payment mode was actually used (not just requested).
470    pub payment_mode_used: PaymentMode,
471    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
472    pub storage_cost_atto: String,
473    /// Total gas cost in wei. 0 if no on-chain transactions were made.
474    pub gas_cost_wei: u128,
475    /// Chunk address of the serialized `DataMap`, set only for
476    /// [`Visibility::Public`] uploads. **`Some` means this address is
477    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
478    /// necessarily that *this* upload paid to store it — if the serialized
479    /// `DataMap` hashed to a chunk that was already on the network (same
480    /// file uploaded before; deterministic via self-encryption), the address
481    /// is still returned but no storage payment was made for it.
482    pub data_map_address: Option<[u8; 32]>,
483}
484
485/// Payment information for external signing — either wave-batch or merkle.
486#[derive(Debug)]
487pub enum ExternalPaymentInfo {
488    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
489    WaveBatch {
490        /// Chunks ready for payment (needed for finalize).
491        prepared_chunks: Vec<PreparedChunk>,
492        /// Payment intent for external signing.
493        payment_intent: PaymentIntent,
494    },
495    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
496    Merkle {
497        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
498        prepared_batch: PreparedMerkleBatch,
499        /// Raw chunk contents (needed for upload after payment).
500        chunk_contents: Vec<Bytes>,
501        /// Chunk addresses in order (needed for upload after payment).
502        chunk_addresses: Vec<[u8; 32]>,
503    },
504}
505
506/// Prepared upload ready for external payment.
507///
508/// Contains everything needed to construct the on-chain payment transaction
509/// externally (e.g. via WalletConnect in a desktop app) and then finalize
510/// the upload without a Rust-side wallet.
511///
512/// Note: This struct stays in Rust memory — only the public fields of
513/// `payment_info` are sent to the frontend. `PreparedChunk` contains
514/// non-serializable network types, so the full struct cannot derive `Serialize`.
515///
516/// Marked `#[non_exhaustive]` so adding a new field in future is not a
517/// breaking change for downstream consumers.
518#[derive(Debug)]
519#[non_exhaustive]
520pub struct PreparedUpload {
521    /// The data map for later retrieval.
522    pub data_map: DataMap,
523    /// Payment information — either wave-batch or merkle depending on chunk count.
524    pub payment_info: ExternalPaymentInfo,
525    /// Chunk address of the serialized `DataMap` when this upload was
526    /// prepared with [`Visibility::Public`]. `Some` means the address is
527    /// retrievable on the network after finalization — either because this
528    /// upload paid to store the chunk in `payment_info`, or because the
529    /// chunk was already on the network (deterministic self-encryption).
530    /// Carried through to [`FileUploadResult::data_map_address`].
531    pub data_map_address: Option<[u8; 32]>,
532}
533
534/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
535type EncryptionChannels = (
536    tokio::sync::mpsc::Receiver<Bytes>,
537    tokio::sync::oneshot::Receiver<DataMap>,
538    tokio::task::JoinHandle<Result<()>>,
539);
540
541/// Spawn a blocking task that streams file encryption through a channel.
542fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
543    let metadata = std::fs::metadata(&path)?;
544    let data_size = usize::try_from(metadata.len())
545        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
546
547    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
548    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
549
550    let handle = tokio::task::spawn_blocking(move || {
551        let file = std::fs::File::open(&path)?;
552        let mut reader = std::io::BufReader::new(file);
553
554        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
555        let read_error_clone = Arc::clone(&read_error);
556
557        let data_iter = std::iter::from_fn(move || {
558            let mut buffer = vec![0u8; 8192];
559            match std::io::Read::read(&mut reader, &mut buffer) {
560                Ok(0) => None,
561                Ok(n) => {
562                    buffer.truncate(n);
563                    Some(Bytes::from(buffer))
564                }
565                Err(e) => {
566                    let mut guard = read_error_clone
567                        .lock()
568                        .unwrap_or_else(|poisoned| poisoned.into_inner());
569                    *guard = Some(e);
570                    None
571                }
572            }
573        });
574
575        let mut stream = stream_encrypt(data_size, data_iter)
576            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
577
578        for chunk_result in stream.chunks() {
579            // Check for captured read errors immediately after each chunk.
580            // stream_encrypt sees None (EOF) when a read fails, so it stops
581            // producing chunks. We must detect this before sending the
582            // partial results to avoid uploading a truncated DataMap.
583            {
584                let guard = read_error
585                    .lock()
586                    .unwrap_or_else(|poisoned| poisoned.into_inner());
587                if let Some(ref e) = *guard {
588                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
589                }
590            }
591
592            let (_hash, content) = chunk_result
593                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
594            if chunk_tx.blocking_send(content).is_err() {
595                return Err(Error::Encryption("upload receiver dropped".to_string()));
596            }
597        }
598
599        // Final check: read error after last chunk (stream saw EOF).
600        {
601            let guard = read_error
602                .lock()
603                .unwrap_or_else(|poisoned| poisoned.into_inner());
604            if let Some(ref e) = *guard {
605                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
606            }
607        }
608
609        let datamap = stream
610            .into_datamap()
611            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
612        if datamap_tx.send(datamap).is_err() {
613            warn!("DataMap receiver dropped — upload may have been cancelled");
614        }
615        Ok(())
616    });
617
618    Ok((chunk_rx, datamap_rx, handle))
619}
620
621impl Client {
622    /// Upload a file to the network using streaming self-encryption.
623    ///
624    /// Automatically selects merkle batch payment for files that produce
625    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
626    /// directory so peak memory stays at ~256 MB regardless of file size.
627    ///
628    /// # Errors
629    ///
630    /// Returns an error if the file cannot be read, encryption fails,
631    /// or any chunk cannot be stored.
632    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
633        self.file_upload_with_mode(path, PaymentMode::Auto).await
634    }
635
636    /// Estimate the cost of uploading a file without actually uploading.
637    ///
638    /// Encrypts the file to determine chunk count and sizes, then requests
639    /// a single quote from the network for a representative chunk. The
640    /// per-chunk price is extrapolated to the total chunk count.
641    ///
642    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
643    /// chunks are cleaned up automatically when the function returns.
644    ///
645    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
646    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
647    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
648    /// varies with network conditions.
649    ///
650    /// If the first sampled chunk is already stored on the network, the
651    /// function retries with subsequent chunk addresses (up to
652    /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
653    /// a [`Error::CostEstimationInconclusive`] is returned so callers can
654    /// decide how to react rather than trust a bogus "free" estimate. Only
655    /// when every address in the file is stored do we return a zero-cost
656    /// estimate.
657    ///
658    /// # Errors
659    ///
660    /// Returns an error if the file cannot be read, encryption fails,
661    /// the network cannot provide a quote, or every sampled chunk is
662    /// already stored ([`Error::CostEstimationInconclusive`]).
663    pub async fn estimate_upload_cost(
664        &self,
665        path: &Path,
666        mode: PaymentMode,
667        progress: Option<mpsc::Sender<UploadEvent>>,
668    ) -> Result<UploadCostEstimate> {
669        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
670
671        if file_size < 3 {
672            return Err(Error::InvalidData(
673                "File too small: self-encryption requires at least 3 bytes".into(),
674            ));
675        }
676
677        check_disk_space_for_spill(file_size)?;
678
679        info!(
680            "Estimating upload cost for {} ({file_size} bytes)",
681            path.display()
682        );
683
684        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
685        let chunk_count = spill.len();
686
687        if let Some(ref tx) = progress {
688            let _ = tx
689                .send(UploadEvent::Encrypted {
690                    total_chunks: chunk_count,
691                })
692                .await;
693        }
694
695        info!("Encrypted into {chunk_count} chunks, requesting quote");
696
697        // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
698        // AlreadyStored result says nothing about the rest of the file — the
699        // first chunk is often a DataMap-adjacent chunk that collides with
700        // prior uploads even when 99% of the file is new. Only treat the
701        // whole file as "fully stored" when every sample comes back stored.
702        let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
703        let mut sampled = 0usize;
704        let mut all_already_stored = true;
705        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
706
707        for addr in spill.addresses.iter().take(sample_limit) {
708            sampled += 1;
709            let chunk_bytes = spill.read_chunk(addr)?;
710            let data_size = u64::try_from(chunk_bytes.len())
711                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
712            match self
713                .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
714                .await
715            {
716                Ok(q) => {
717                    quotes_opt = Some(q);
718                    all_already_stored = false;
719                    break;
720                }
721                Err(Error::AlreadyStored) => {
722                    debug!(
723                        "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
724                        hex::encode(addr)
725                    );
726                    continue;
727                }
728                Err(e) => return Err(e),
729            }
730        }
731
732        let uses_merkle = should_use_merkle(chunk_count, mode);
733
734        let quotes = match quotes_opt {
735            Some(q) => q,
736            None if all_already_stored && sampled == chunk_count => {
737                // Every address in the file was sampled and every one is
738                // already on the network — returning a zero-cost estimate is
739                // accurate in this case.
740                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
741                return Ok(UploadCostEstimate {
742                    file_size,
743                    chunk_count,
744                    storage_cost_atto: "0".into(),
745                    estimated_gas_cost_wei: "0".into(),
746                    payment_mode: if uses_merkle {
747                        PaymentMode::Merkle
748                    } else {
749                        PaymentMode::Single
750                    },
751                });
752            }
753            None => {
754                return Err(Error::CostEstimationInconclusive(format!(
755                    "sampled {sampled} chunk addresses out of {chunk_count} and every \
756                     one reported AlreadyStored; cannot infer a representative price \
757                     for the remaining chunks"
758                )));
759            }
760        };
761
762        // Use the median price × 3 (matches SingleNodePayment::from_quotes
763        // which pays 3x the median to incentivize reliable storage).
764        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
765        prices.sort();
766        let median_price = prices
767            .get(prices.len() / 2)
768            .copied()
769            .unwrap_or(Amount::ZERO);
770        let per_chunk_cost = median_price * Amount::from(3u64);
771
772        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
773        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
774
775        // Estimate gas cost from realistic per-transaction budgets rather
776        // than a flat per-chunk or per-wave number.
777        //
778        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
779        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
780        //   The dominant cost is one SSTORE per entry plus base tx overhead,
781        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
782        //   on a full wave and multiply by the number of waves. The previous
783        //   per-wave figure of 150k was closer to a single-entry transfer
784        //   and understated cost by 5–10x for full waves.
785        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
786        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
787        //
788        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
789        // Arbitrum baseline). Treat the result as advisory, not a commitment.
790        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
791        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
792        let estimated_gas: u128 = if uses_merkle {
793            merkle_batches
794                .saturating_mul(GAS_PER_MERKLE_TX)
795                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
796        } else {
797            waves
798                .saturating_mul(GAS_PER_WAVE_TX)
799                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
800        };
801
802        info!(
803            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
804        );
805
806        Ok(UploadCostEstimate {
807            file_size,
808            chunk_count,
809            storage_cost_atto: total_storage.to_string(),
810            estimated_gas_cost_wei: estimated_gas.to_string(),
811            payment_mode: if uses_merkle {
812                PaymentMode::Merkle
813            } else {
814                PaymentMode::Single
815            },
816        })
817    }
818
819    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
820    ///
821    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
822    /// [`Visibility::Private`] — see that method for details.
823    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
824        self.file_prepare_upload_with_progress(path, Visibility::Private, None)
825            .await
826    }
827
828    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
829    ///
830    /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
831    /// `progress: None` — see that method for details.
832    pub async fn file_prepare_upload_with_visibility(
833        &self,
834        path: &Path,
835        visibility: Visibility,
836    ) -> Result<PreparedUpload> {
837        self.file_prepare_upload_with_progress(path, visibility, None)
838            .await
839    }
840
841    /// Phase 1 of external-signer upload with progress events.
842    ///
843    /// Requires an EVM network (for contract price queries) but NOT a wallet.
844    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
845    /// and a [`PaymentIntent`] that the external signer uses to construct
846    /// and submit the on-chain payment transaction.
847    ///
848    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
849    /// is bundled into the payment batch as an additional chunk and its
850    /// address is recorded on the returned [`PreparedUpload`]. After
851    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
852    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
853    /// can share a single address from which anyone can retrieve the file.
854    ///
855    /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
856    /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
857    /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
858    /// emitted later by [`Client::finalize_upload_with_progress`] /
859    /// [`Client::finalize_upload_merkle_with_progress`].
860    ///
861    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
862    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
863    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
864    /// inherent to the two-phase external-signer protocol — the chunks must
865    /// stay in memory until [`Client::finalize_upload`] stores them. For very
866    /// large files, prefer [`Client::file_upload`] which streams directly.
867    ///
868    /// # Errors
869    ///
870    /// Returns an error if there is insufficient disk space, the file cannot
871    /// be read, encryption fails, or quote collection fails.
872    pub async fn file_prepare_upload_with_progress(
873        &self,
874        path: &Path,
875        visibility: Visibility,
876        progress: Option<mpsc::Sender<UploadEvent>>,
877    ) -> Result<PreparedUpload> {
878        debug!(
879            "Preparing file upload for external signing (visibility={visibility:?}): {}",
880            path.display()
881        );
882
883        let file_size = std::fs::metadata(path)?.len();
884        check_disk_space_for_spill(file_size)?;
885
886        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
887
888        info!(
889            "Encrypted {} into {} chunks for external signing (spilled to disk)",
890            path.display(),
891            spill.len()
892        );
893
894        // Read each chunk from disk and collect quotes concurrently.
895        // Note: all PreparedChunks accumulate in memory because the external-signer
896        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
897        let mut chunk_data: Vec<Bytes> = spill
898            .addresses
899            .iter()
900            .map(|addr| spill.read_chunk(addr))
901            .collect::<std::result::Result<Vec<_>, _>>()?;
902
903        // For public uploads, bundle the serialized DataMap as an extra chunk
904        // in the same payment batch. This lets the external signer pay for
905        // the data chunks and the DataMap chunk in one flow, and lets the
906        // finalize step return the DataMap's chunk address as the shareable
907        // retrieval address.
908        let data_map_address = match visibility {
909            Visibility::Private => None,
910            Visibility::Public => {
911                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
912                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
913                })?;
914                let bytes = Bytes::from(serialized);
915                let address = compute_address(&bytes);
916                info!(
917                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
918                    bytes.len(),
919                    hex::encode(address)
920                );
921                chunk_data.push(bytes);
922                Some(address)
923            }
924        };
925
926        let chunk_count = chunk_data.len();
927
928        if let Some(ref tx) = progress {
929            let _ = tx
930                .send(UploadEvent::Encrypted {
931                    total_chunks: chunk_count,
932                })
933                .await;
934        }
935
936        let payment_info = if should_use_merkle(chunk_count, PaymentMode::Auto) {
937            // Merkle path: build tree, collect candidate pools, return for external payment.
938            info!("Using merkle batch preparation for {chunk_count} file chunks");
939
940            let addresses: Vec<[u8; 32]> = chunk_data.iter().map(|c| compute_address(c)).collect();
941
942            let avg_size =
943                chunk_data.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
944            let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
945
946            let prepared_batch = self
947                .prepare_merkle_batch_external(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
948                .await?;
949
950            info!(
951                "File prepared for external merkle signing: {} chunks, depth={} ({})",
952                chunk_count,
953                prepared_batch.depth,
954                path.display()
955            );
956
957            ExternalPaymentInfo::Merkle {
958                prepared_batch,
959                chunk_contents: chunk_data,
960                chunk_addresses: addresses,
961            }
962        } else {
963            // Wave-batch path: collect quotes per chunk concurrently, emitting
964            // a `ChunkQuoted` event after each completion so callers can drive
965            // a progress bar through the (slow) quoting phase.
966            // Clamp fan-out to chunk_count so a partial wave doesn't
967            // pay for slots it can't fill (see PERF-RESULTS.md).
968            let quote_limiter = self.controller().quote.clone();
969            let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
970            let mut quote_stream = stream::iter(chunk_data)
971                .map(|content| {
972                    let limiter = quote_limiter.clone();
973                    async move {
974                        observe_op(
975                            &limiter,
976                            || async move { self.prepare_chunk_payment(content).await },
977                            classify_error,
978                        )
979                        .await
980                    }
981                })
982                .buffer_unordered(quote_concurrency);
983
984            let mut prepared_chunks = Vec::with_capacity(spill.len());
985            let mut quoted = 0usize;
986            while let Some(result) = quote_stream.next().await {
987                if let Some(prepared) = result? {
988                    prepared_chunks.push(prepared);
989                }
990                quoted += 1;
991                if let Some(ref tx) = progress {
992                    let _ = tx.try_send(UploadEvent::ChunkQuoted {
993                        quoted,
994                        total: chunk_count,
995                    });
996                }
997            }
998
999            // Surface the "DataMap chunk was already on the network" case
1000            // so debugging "why is data_map_address set but no storage cost
1001            // appears for it?" doesn't require reading the source. See the
1002            // `data_map_address` doc comment for why this is still a valid
1003            // `Some(addr)` outcome.
1004            if let Some(addr) = data_map_address {
1005                if !prepared_chunks.iter().any(|c| c.address == addr) {
1006                    info!(
1007                        "Public upload: DataMap chunk {} was already stored \
1008                         on the network — address is retrievable without a \
1009                         new payment",
1010                        hex::encode(addr)
1011                    );
1012                }
1013            }
1014
1015            let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1016
1017            info!(
1018                "File prepared for external signing: {} chunks, total {} atto ({})",
1019                prepared_chunks.len(),
1020                payment_intent.total_amount,
1021                path.display()
1022            );
1023
1024            ExternalPaymentInfo::WaveBatch {
1025                prepared_chunks,
1026                payment_intent,
1027            }
1028        };
1029
1030        Ok(PreparedUpload {
1031            data_map,
1032            payment_info,
1033            data_map_address,
1034        })
1035    }
1036
1037    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1038    ///
1039    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1040    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1041    /// payment. Builds payment proofs and stores chunks on the network.
1042    ///
1043    /// # Errors
1044    ///
1045    /// Returns an error if the prepared upload used merkle payment (use
1046    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1047    /// or any chunk cannot be stored.
1048    pub async fn finalize_upload(
1049        &self,
1050        prepared: PreparedUpload,
1051        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1052    ) -> Result<FileUploadResult> {
1053        self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1054            .await
1055    }
1056
1057    /// Phase 2 of external-signer upload (wave-batch) with progress events.
1058    ///
1059    /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1060    /// on the provided channel as each chunk is successfully stored.
1061    ///
1062    /// # Errors
1063    ///
1064    /// Same as [`Client::finalize_upload`].
1065    pub async fn finalize_upload_with_progress(
1066        &self,
1067        prepared: PreparedUpload,
1068        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1069        progress: Option<mpsc::Sender<UploadEvent>>,
1070    ) -> Result<FileUploadResult> {
1071        let data_map_address = prepared.data_map_address;
1072        match prepared.payment_info {
1073            ExternalPaymentInfo::WaveBatch {
1074                prepared_chunks,
1075                payment_intent: _,
1076            } => {
1077                let total_chunks = prepared_chunks.len();
1078                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1079                let wave_result = self
1080                    .store_paid_chunks_with_events(paid_chunks, progress.as_ref(), 0, total_chunks)
1081                    .await;
1082                if !wave_result.failed.is_empty() {
1083                    let failed_count = wave_result.failed.len();
1084                    let stored_count = wave_result.stored.len();
1085                    return Err(Error::PartialUpload {
1086                        stored: wave_result.stored.clone(),
1087                        stored_count,
1088                        failed: wave_result.failed,
1089                        failed_count,
1090                        total_chunks: stored_count + failed_count,
1091                        reason: "finalize_upload: chunk storage failed after retries".into(),
1092                    });
1093                }
1094                let chunks_stored = wave_result.stored.len();
1095
1096                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1097
1098                Ok(FileUploadResult {
1099                    data_map: prepared.data_map,
1100                    chunks_stored,
1101                    chunks_failed: 0,
1102                    total_chunks: chunks_stored,
1103                    payment_mode_used: PaymentMode::Single,
1104                    storage_cost_atto: "0".into(),
1105                    gas_cost_wei: 0,
1106                    data_map_address,
1107                })
1108            }
1109            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1110                "Cannot finalize merkle upload with wave-batch tx hashes. \
1111                 Use finalize_upload_merkle() instead."
1112                    .to_string(),
1113            )),
1114        }
1115    }
1116
1117    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1118    ///
1119    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1120    /// returned by the on-chain merkle payment transaction. Generates proofs and
1121    /// stores chunks on the network.
1122    ///
1123    /// # Errors
1124    ///
1125    /// Returns an error if the prepared upload used wave-batch payment (use
1126    /// [`Client::finalize_upload`] instead), proof generation fails,
1127    /// or any chunk cannot be stored.
1128    pub async fn finalize_upload_merkle(
1129        &self,
1130        prepared: PreparedUpload,
1131        winner_pool_hash: [u8; 32],
1132    ) -> Result<FileUploadResult> {
1133        self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1134            .await
1135    }
1136
1137    /// Phase 2 of external-signer upload (merkle) with progress events.
1138    ///
1139    /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1140    /// on the provided channel as each chunk is successfully stored.
1141    ///
1142    /// # Errors
1143    ///
1144    /// Same as [`Client::finalize_upload_merkle`].
1145    pub async fn finalize_upload_merkle_with_progress(
1146        &self,
1147        prepared: PreparedUpload,
1148        winner_pool_hash: [u8; 32],
1149        progress: Option<mpsc::Sender<UploadEvent>>,
1150    ) -> Result<FileUploadResult> {
1151        let data_map_address = prepared.data_map_address;
1152        match prepared.payment_info {
1153            ExternalPaymentInfo::Merkle {
1154                prepared_batch,
1155                chunk_contents,
1156                chunk_addresses,
1157            } => {
1158                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1159                let chunks_stored = self
1160                    .merkle_upload_chunks(
1161                        chunk_contents,
1162                        chunk_addresses,
1163                        &batch_result,
1164                        progress.as_ref(),
1165                    )
1166                    .await?;
1167
1168                info!("External-signer merkle upload finalized: {chunks_stored} chunks stored");
1169
1170                Ok(FileUploadResult {
1171                    data_map: prepared.data_map,
1172                    chunks_stored,
1173                    chunks_failed: 0,
1174                    total_chunks: chunks_stored,
1175                    payment_mode_used: PaymentMode::Merkle,
1176                    storage_cost_atto: "0".into(),
1177                    gas_cost_wei: 0,
1178                    data_map_address,
1179                })
1180            }
1181            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1182                "Cannot finalize wave-batch upload with merkle winner hash. \
1183                 Use finalize_upload() instead."
1184                    .to_string(),
1185            )),
1186        }
1187    }
1188
1189    /// Upload a file with a specific payment mode.
1190    ///
1191    /// Before encryption, checks that the temp directory has enough free
1192    /// disk space for the spilled chunks (~1.1× source file size).
1193    ///
1194    /// Encrypted chunks are spilled to a temp directory during encryption
1195    /// so that only their 32-byte addresses stay in memory. At upload time,
1196    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1197    ///
1198    /// # Errors
1199    ///
1200    /// Returns an error if there is insufficient disk space, the file cannot
1201    /// be read, encryption fails, or any chunk cannot be stored.
1202    #[allow(clippy::too_many_lines)]
1203    pub async fn file_upload_with_mode(
1204        &self,
1205        path: &Path,
1206        mode: PaymentMode,
1207    ) -> Result<FileUploadResult> {
1208        self.file_upload_with_progress(path, mode, None).await
1209    }
1210
1211    /// Upload a file with progress events sent to the given channel.
1212    ///
1213    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1214    /// provided channel for UI progress feedback.
1215    #[allow(clippy::too_many_lines)]
1216    pub async fn file_upload_with_progress(
1217        &self,
1218        path: &Path,
1219        mode: PaymentMode,
1220        progress: Option<mpsc::Sender<UploadEvent>>,
1221    ) -> Result<FileUploadResult> {
1222        debug!(
1223            "Streaming file upload with mode {mode:?}: {}",
1224            path.display()
1225        );
1226
1227        // Pre-flight: verify enough temp disk space for the chunk spill.
1228        let file_size = std::fs::metadata(path)?.len();
1229        check_disk_space_for_spill(file_size)?;
1230
1231        // Phase 1: Encrypt file and spill chunks to temp directory.
1232        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1233        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1234
1235        let chunk_count = spill.len();
1236        info!(
1237            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1238            path.display()
1239        );
1240        if let Some(ref tx) = progress {
1241            let _ = tx
1242                .send(UploadEvent::Encrypted {
1243                    total_chunks: chunk_count,
1244                })
1245                .await;
1246        }
1247
1248        // Phase 2: Decide payment mode and upload in waves from disk.
1249        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei) =
1250            if self.should_use_merkle(chunk_count, mode) {
1251                info!("Using merkle batch payment for {chunk_count} file chunks");
1252
1253                let batch_result = match self
1254                    .pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size())
1255                    .await
1256                {
1257                    Ok(result) => result,
1258                    Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1259                        info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1260                        let (stored, sc, gc) =
1261                            self.upload_waves_single(&spill, progress.as_ref()).await?;
1262                        return Ok(FileUploadResult {
1263                            data_map,
1264                            chunks_stored: stored,
1265                            chunks_failed: 0,
1266                            total_chunks: chunk_count,
1267                            payment_mode_used: PaymentMode::Single,
1268                            storage_cost_atto: sc,
1269                            gas_cost_wei: gc,
1270                            data_map_address: None,
1271                        });
1272                    }
1273                    Err(e) => return Err(e),
1274                };
1275
1276                let (stored, sc, gc) = self
1277                    .upload_waves_merkle(&spill, &batch_result, progress.as_ref())
1278                    .await?;
1279                (stored, PaymentMode::Merkle, sc, gc)
1280            } else {
1281                let (stored, sc, gc) = self.upload_waves_single(&spill, progress.as_ref()).await?;
1282                (stored, PaymentMode::Single, sc, gc)
1283            };
1284
1285        info!(
1286            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1287            path.display()
1288        );
1289
1290        Ok(FileUploadResult {
1291            data_map,
1292            chunks_stored,
1293            chunks_failed: 0,
1294            total_chunks: chunk_count,
1295            payment_mode_used: actual_mode,
1296            storage_cost_atto,
1297            gas_cost_wei,
1298            data_map_address: None,
1299        })
1300    }
1301
1302    /// Encrypt a file and spill chunks to a temp directory.
1303    ///
1304    /// Logs progress every 100 chunks so users get feedback during
1305    /// multi-GB encryptions.
1306    ///
1307    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1308    async fn encrypt_file_to_spill(
1309        &self,
1310        path: &Path,
1311        progress: Option<&mpsc::Sender<UploadEvent>>,
1312    ) -> Result<(ChunkSpill, DataMap)> {
1313        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1314
1315        let mut spill = ChunkSpill::new()?;
1316        while let Some(content) = chunk_rx.recv().await {
1317            spill.push(&content)?;
1318            let chunks_done = spill.len();
1319            if let Some(tx) = progress {
1320                if chunks_done.is_multiple_of(10) {
1321                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1322                }
1323            }
1324            if chunks_done % 100 == 0 {
1325                let mb = spill.total_bytes() / (1024 * 1024);
1326                info!(
1327                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1328                    path.display()
1329                );
1330            }
1331        }
1332
1333        // Await encryption completion to catch errors before paying.
1334        handle
1335            .await
1336            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1337            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1338
1339        let data_map = datamap_rx
1340            .await
1341            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1342
1343        Ok((spill, data_map))
1344    }
1345
1346    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1347    ///
1348    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1349    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1350    ///
1351    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1352    async fn upload_waves_single(
1353        &self,
1354        spill: &ChunkSpill,
1355        progress: Option<&mpsc::Sender<UploadEvent>>,
1356    ) -> Result<(usize, String, u128)> {
1357        let mut total_stored = 0usize;
1358        let mut total_storage = Amount::ZERO;
1359        let mut total_gas: u128 = 0;
1360        let total_chunks = spill.len();
1361        let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1362        let wave_count = waves.len();
1363
1364        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1365            let wave_num = wave_idx + 1;
1366            let wave_data: Vec<Bytes> = wave_addrs
1367                .iter()
1368                .map(|addr| spill.read_chunk(addr))
1369                .collect::<Result<Vec<_>>>()?;
1370
1371            info!(
1372                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1373                wave_data.len()
1374            );
1375            if let Some(tx) = progress {
1376                let _ = tx
1377                    .send(UploadEvent::QuotingChunks {
1378                        wave: wave_num,
1379                        total_waves: wave_count,
1380                        chunks_in_wave: wave_data.len(),
1381                    })
1382                    .await;
1383            }
1384            let (addresses, wave_storage, wave_gas) = self
1385                .batch_upload_chunks_with_events(wave_data, progress, total_stored, total_chunks)
1386                .await?;
1387            total_stored += addresses.len();
1388            if let Ok(cost) = wave_storage.parse::<Amount>() {
1389                total_storage += cost;
1390            }
1391            total_gas = total_gas.saturating_add(wave_gas);
1392            if let Some(tx) = progress {
1393                let _ = tx
1394                    .send(UploadEvent::WaveComplete {
1395                        wave: wave_num,
1396                        total_waves: wave_count,
1397                        stored_so_far: total_stored,
1398                        total: total_chunks,
1399                    })
1400                    .await;
1401            }
1402        }
1403
1404        Ok((total_stored, total_storage.to_string(), total_gas))
1405    }
1406
1407    /// Upload chunks from a spill using pre-computed merkle proofs.
1408    ///
1409    /// Reads one wave at a time from disk, pairs each chunk with its proof,
1410    /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1411    ///
1412    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1413    /// Costs come from the `batch_result` which was populated during payment.
1414    async fn upload_waves_merkle(
1415        &self,
1416        spill: &ChunkSpill,
1417        batch_result: &MerkleBatchPaymentResult,
1418        progress: Option<&mpsc::Sender<UploadEvent>>,
1419    ) -> Result<(usize, String, u128)> {
1420        let mut total_stored = 0usize;
1421        let total_chunks = spill.len();
1422        let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1423        let wave_count = waves.len();
1424        let mut stored_addresses: Vec<[u8; 32]> = Vec::new();
1425
1426        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1427            let wave_num = wave_idx + 1;
1428            let wave = spill.read_wave(wave_addrs)?;
1429
1430            info!(
1431                "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1432                wave.len()
1433            );
1434
1435            let store_limiter = self.controller().store.clone();
1436            // Clamp fan-out to wave size — partial last wave should
1437            // not pay for extra slots (see PERF-RESULTS.md).
1438            let store_concurrency = store_limiter.current().min(wave.len().max(1));
1439            let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1440                let proof_bytes = batch_result.proofs.get(&addr).cloned();
1441                let limiter = store_limiter.clone();
1442                async move {
1443                    let proof = proof_bytes.ok_or_else(|| {
1444                        (
1445                            addr,
1446                            Error::Payment(format!(
1447                                "Missing merkle proof for chunk {}",
1448                                hex::encode(addr)
1449                            )),
1450                        )
1451                    })?;
1452                    let peers = self.close_group_peers(&addr).await.map_err(|e| (addr, e))?;
1453                    observe_op(
1454                        &limiter,
1455                        || async move {
1456                            self.chunk_put_to_close_group(content, proof, &peers).await
1457                        },
1458                        classify_error,
1459                    )
1460                    .await
1461                    .map(|_| addr)
1462                    .map_err(|e| (addr, e))
1463                }
1464            }))
1465            .buffer_unordered(store_concurrency);
1466
1467            while let Some(result) = upload_stream.next().await {
1468                match result {
1469                    Ok(addr) => {
1470                        stored_addresses.push(addr);
1471                        total_stored += 1;
1472                        info!("Stored {total_stored}/{total_chunks}");
1473                        if let Some(tx) = progress {
1474                            let _ = tx
1475                                .send(UploadEvent::ChunkStored {
1476                                    stored: total_stored,
1477                                    total: total_chunks,
1478                                })
1479                                .await;
1480                        }
1481                    }
1482                    Err((addr, e)) => {
1483                        warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
1484                        return Err(Error::PartialUpload {
1485                            stored: stored_addresses,
1486                            stored_count: total_stored,
1487                            failed: vec![(addr, e.to_string())],
1488                            failed_count: 1,
1489                            total_chunks,
1490                            reason: format!("merkle chunk upload failed: {e}"),
1491                        });
1492                    }
1493                }
1494            }
1495
1496            if let Some(tx) = progress {
1497                let _ = tx
1498                    .send(UploadEvent::WaveComplete {
1499                        wave: wave_num,
1500                        total_waves: wave_count,
1501                        stored_so_far: total_stored,
1502                        total: total_chunks,
1503                    })
1504                    .await;
1505            }
1506        }
1507
1508        Ok((
1509            total_stored,
1510            batch_result.storage_cost_atto.clone(),
1511            batch_result.gas_cost_wei,
1512        ))
1513    }
1514
1515    /// Download and decrypt a file from the network, writing it to disk.
1516    ///
1517    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
1518    /// memory at a time, avoiding OOM on large files. Chunks are fetched
1519    /// concurrently within each batch, then decrypted data is written to
1520    /// disk incrementally.
1521    ///
1522    /// Returns the number of bytes written.
1523    ///
1524    /// # Panics
1525    ///
1526    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
1527    /// Will panic if called from a `current_thread` runtime because
1528    /// `streaming_decrypt` takes a synchronous callback that must bridge
1529    /// back to async via `block_in_place`.
1530    ///
1531    /// # Errors
1532    ///
1533    /// Returns an error if any chunk cannot be retrieved, decryption fails,
1534    /// or the file cannot be written.
1535    #[allow(clippy::unused_async)]
1536    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
1537        self.file_download_with_progress(data_map, output, None)
1538            .await
1539    }
1540
1541    /// Download and decrypt a file with progress events.
1542    ///
1543    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
1544    ///
1545    /// Progress reporting:
1546    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
1547    ///    `ChunksFetched` with `total: 0` during resolution)
1548    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
1549    /// 3. Fetches data chunks with accurate `fetched/total` progress
1550    #[allow(clippy::unused_async)]
1551    pub async fn file_download_with_progress(
1552        &self,
1553        data_map: &DataMap,
1554        output: &Path,
1555        progress: Option<mpsc::Sender<DownloadEvent>>,
1556    ) -> Result<u64> {
1557        debug!("Downloading file to {}", output.display());
1558
1559        let handle = Handle::current();
1560
1561        // Phase 1: Resolve hierarchical DataMap to root level.
1562        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
1563        let root_map = if data_map.is_child() {
1564            let dm_chunks = data_map.len();
1565            if let Some(ref tx) = progress {
1566                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
1567                    total_map_chunks: dm_chunks,
1568                });
1569            }
1570
1571            let resolve_progress = progress.clone();
1572            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1573
1574            let resolved = tokio::task::block_in_place(|| {
1575                let counter_ref = resolve_counter.clone();
1576                let progress_ref = resolve_progress.clone();
1577                let fetch_limiter = self.controller().fetch.clone();
1578                let fetch = |batch: &[(usize, XorName)]| {
1579                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1580                    let counter = counter_ref.clone();
1581                    let prog = progress_ref.clone();
1582                    let limiter = fetch_limiter.clone();
1583                    handle.block_on(async {
1584                        // Build ONE peer pool for this batch via a
1585                        // single DHT walk; reuse for every chunk.
1586                        let addrs: Vec<[u8; 32]> = batch_owned.iter().map(|(_, h)| h.0).collect();
1587                        let pool = self.build_peer_pool_for(&addrs).await.map_err(|e| {
1588                            self_encryption::Error::Generic(format!(
1589                                "DataMap resolution: peer pool build failed: {e}"
1590                            ))
1591                        })?;
1592                        let pool_ref = &pool;
1593                        // Clamp fan-out to batch size — self_encryption
1594                        // requests small batches (default 10), so a
1595                        // higher cap from the controller would be slots
1596                        // we never fill (see PERF-RESULTS.md).
1597                        let cap = limiter.current().min(batch_owned.len().max(1));
1598                        let mut stream = futures::stream::iter(batch_owned)
1599                            .map(|(idx, hash)| {
1600                                let addr = hash.0;
1601                                let limiter = limiter.clone();
1602                                async move {
1603                                    let result = observe_op(
1604                                        &limiter,
1605                                        || async move {
1606                                            self.chunk_get_with_pool(&addr, pool_ref).await
1607                                        },
1608                                        classify_error,
1609                                    )
1610                                    .await;
1611                                    (idx, hash, result)
1612                                }
1613                            })
1614                            .buffer_unordered(cap);
1615                        let mut results = Vec::new();
1616                        while let Some((idx, hash, result)) =
1617                            futures::StreamExt::next(&mut stream).await
1618                        {
1619                            let chunk = result
1620                                .map_err(|e| {
1621                                    self_encryption::Error::Generic(format!(
1622                                        "DataMap resolution failed: {e}"
1623                                    ))
1624                                })?
1625                                .ok_or_else(|| {
1626                                    self_encryption::Error::Generic(format!(
1627                                        "DataMap chunk not found: {}",
1628                                        hex::encode(hash.0)
1629                                    ))
1630                                })?;
1631                            results.push((idx, chunk.content));
1632                            let fetched =
1633                                counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1634                            if let Some(ref tx) = prog {
1635                                let _ = tx.try_send(DownloadEvent::MapChunkFetched { fetched });
1636                            }
1637                        }
1638                        // CRITICAL: self_encryption::get_root_data_map_parallel
1639                        // pairs the returned Vec POSITIONALLY with the input
1640                        // hashes via .zip() and discards our idx field. We
1641                        // used buffer_unordered, so completions arrive in
1642                        // arbitrary order. Sort by idx to restore the input
1643                        // order before returning, otherwise chunk bytes get
1644                        // paired with the wrong hashes and the root data
1645                        // map is corrupted.
1646                        results.sort_by_key(|(idx, _)| *idx);
1647                        Ok(results)
1648                    })
1649                };
1650                get_root_data_map_parallel(data_map.clone(), &fetch)
1651            })
1652            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
1653
1654            info!(
1655                "Resolved hierarchical DataMap: {} data chunks",
1656                resolved.len()
1657            );
1658            resolved
1659        } else {
1660            data_map.clone()
1661        };
1662
1663        // Phase 2: Now we know the real chunk count.
1664        let total_chunks = root_map.len();
1665        if let Some(ref tx) = progress {
1666            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
1667        }
1668
1669        // Phase 3: Fetch and decrypt data chunks with accurate progress.
1670        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1671        let fetched_for_closure = fetched_counter.clone();
1672        let progress_for_closure = progress.clone();
1673
1674        let fetch_limiter_outer = self.controller().fetch.clone();
1675        let stream = streaming_decrypt(&root_map, |batch: &[(usize, XorName)]| {
1676            let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1677            let fetched_ref = fetched_for_closure.clone();
1678            let progress_ref = progress_for_closure.clone();
1679            let fetch_limiter = fetch_limiter_outer.clone();
1680
1681            tokio::task::block_in_place(|| {
1682                handle.block_on(async {
1683                    // Per-batch peer pool: one DHT walk shared across
1684                    // all the chunks in this self_encryption batch.
1685                    let addrs: Vec<[u8; 32]> = batch_owned.iter().map(|(_, h)| h.0).collect();
1686                    let pool = self.build_peer_pool_for(&addrs).await.map_err(|e| {
1687                        self_encryption::Error::Generic(format!(
1688                            "Streaming decrypt: peer pool build failed: {e}"
1689                        ))
1690                    })?;
1691                    let pool_ref = &pool;
1692                    // Clamp fan-out to batch size — see PERF-RESULTS.md.
1693                    let cap = fetch_limiter.current().min(batch_owned.len().max(1));
1694                    let mut stream = futures::stream::iter(batch_owned)
1695                        .map(|(idx, hash)| {
1696                            let addr = hash.0;
1697                            let limiter = fetch_limiter.clone();
1698                            async move {
1699                                let result =
1700                                    observe_op(
1701                                        &limiter,
1702                                        || async move {
1703                                            self.chunk_get_with_pool(&addr, pool_ref).await
1704                                        },
1705                                        classify_error,
1706                                    )
1707                                    .await;
1708                                (idx, hash, result)
1709                            }
1710                        })
1711                        .buffer_unordered(cap);
1712
1713                    let mut results = Vec::new();
1714                    while let Some((idx, hash, result)) =
1715                        futures::StreamExt::next(&mut stream).await
1716                    {
1717                        let addr_hex = hex::encode(hash.0);
1718                        let chunk = result
1719                            .map_err(|e| {
1720                                self_encryption::Error::Generic(format!(
1721                                    "Network fetch failed for {addr_hex}: {e}"
1722                                ))
1723                            })?
1724                            .ok_or_else(|| {
1725                                self_encryption::Error::Generic(format!(
1726                                    "Chunk not found: {addr_hex}"
1727                                ))
1728                            })?;
1729                        results.push((idx, chunk.content));
1730                        let fetched =
1731                            fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1732                        info!("Downloaded {fetched}/{total_chunks}");
1733                        if let Some(ref tx) = progress_ref {
1734                            let _ = tx.try_send(DownloadEvent::ChunksFetched {
1735                                fetched,
1736                                total: total_chunks,
1737                            });
1738                        }
1739                    }
1740                    // streaming_decrypt itself sort_by_keys before
1741                    // zipping, but the same closure is also passed
1742                    // through get_root_data_map_parallel internally
1743                    // (see self_encryption::stream_decrypt.rs::new), and
1744                    // THAT path zips positionally without sorting. Sort
1745                    // here so both consumers see input order.
1746                    results.sort_by_key(|(idx, _)| *idx);
1747                    Ok(results)
1748                })
1749            })
1750        })
1751        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
1752
1753        // Write decrypted chunks to a temp file, then rename atomically.
1754        let parent = output.parent().unwrap_or_else(|| Path::new("."));
1755        let unique: u64 = rand::random();
1756        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
1757
1758        let write_result = (|| -> Result<u64> {
1759            let mut file = std::fs::File::create(&tmp_path)?;
1760            let mut bytes_written = 0u64;
1761            for chunk_result in stream {
1762                let chunk_bytes = chunk_result
1763                    .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
1764                file.write_all(&chunk_bytes)?;
1765                bytes_written += chunk_bytes.len() as u64;
1766            }
1767            file.flush()?;
1768            Ok(bytes_written)
1769        })();
1770
1771        match write_result {
1772            Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
1773                Ok(()) => {
1774                    info!(
1775                        "File downloaded: {bytes_written} bytes written to {}",
1776                        output.display()
1777                    );
1778                    Ok(bytes_written)
1779                }
1780                Err(rename_err) => {
1781                    if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1782                        warn!(
1783                            "Failed to remove temp download file {}: {cleanup_err}",
1784                            tmp_path.display()
1785                        );
1786                    }
1787                    Err(rename_err.into())
1788                }
1789            },
1790            Err(e) => {
1791                if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1792                    warn!(
1793                        "Failed to remove temp download file {}: {cleanup_err}",
1794                        tmp_path.display()
1795                    );
1796                }
1797                Err(e)
1798            }
1799        }
1800    }
1801}
1802
1803#[cfg(test)]
1804#[allow(clippy::unwrap_used)]
1805mod tests {
1806    use super::*;
1807
1808    #[test]
1809    fn disk_space_check_passes_for_small_file() {
1810        // A 1 KB file should always pass the disk space check
1811        check_disk_space_for_spill(1024).unwrap();
1812    }
1813
1814    #[test]
1815    fn disk_space_check_fails_for_absurd_size() {
1816        // Requesting space for a 1 exabyte file should fail on any real system
1817        let result = check_disk_space_for_spill(u64::MAX / 2);
1818        assert!(result.is_err());
1819        let err = result.unwrap_err();
1820        assert!(
1821            matches!(err, Error::InsufficientDiskSpace(_)),
1822            "expected InsufficientDiskSpace, got: {err}"
1823        );
1824    }
1825
1826    #[test]
1827    fn chunk_spill_round_trip() {
1828        let mut spill = ChunkSpill::new().unwrap();
1829        let data1 = vec![0xAA; 1024];
1830        let data2 = vec![0xBB; 2048];
1831
1832        spill.push(&data1).unwrap();
1833        spill.push(&data2).unwrap();
1834
1835        assert_eq!(spill.len(), 2);
1836        assert_eq!(spill.total_bytes(), 1024 + 2048);
1837        assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2);
1838
1839        // Read back and verify
1840        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
1841        assert_eq!(&chunk1[..], &data1[..]);
1842
1843        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
1844        assert_eq!(&chunk2[..], &data2[..]);
1845
1846        // Verify waves with 1-chunk wave size
1847        let waves: Vec<_> = spill.addresses.chunks(1).collect();
1848        assert_eq!(waves.len(), 2);
1849    }
1850
1851    #[test]
1852    fn chunk_spill_cleanup_on_drop() {
1853        let dir;
1854        {
1855            let spill = ChunkSpill::new().unwrap();
1856            dir = spill.dir.clone();
1857            assert!(dir.exists());
1858        }
1859        // After drop, the directory should be cleaned up
1860        assert!(!dir.exists(), "spill dir should be removed on drop");
1861    }
1862
1863    #[test]
1864    fn chunk_spill_deduplicates_identical_content() {
1865        let mut spill = ChunkSpill::new().unwrap();
1866        let data = vec![0xCC; 512];
1867
1868        spill.push(&data).unwrap();
1869        spill.push(&data).unwrap(); // same content, should be skipped
1870        spill.push(&data).unwrap(); // again
1871
1872        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
1873        assert_eq!(
1874            spill.total_bytes(),
1875            512,
1876            "total_bytes should count unique only"
1877        );
1878
1879        // Different content should still be added
1880        let data2 = vec![0xDD; 256];
1881        spill.push(&data2).unwrap();
1882        assert_eq!(spill.len(), 2);
1883        assert_eq!(spill.total_bytes(), 512 + 256);
1884    }
1885}
1886
1887/// Compile-time assertions that Client file method futures are Send.
1888#[cfg(test)]
1889mod send_assertions {
1890    use super::*;
1891
1892    fn _assert_send<T: Send>(_: &T) {}
1893
1894    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
1895    async fn _file_upload_is_send(client: &Client) {
1896        let fut = client.file_upload(Path::new("/dev/null"));
1897        _assert_send(&fut);
1898    }
1899
1900    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
1901    async fn _file_upload_with_mode_is_send(client: &Client) {
1902        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
1903        _assert_send(&fut);
1904    }
1905
1906    #[allow(
1907        dead_code,
1908        unreachable_code,
1909        unused_variables,
1910        clippy::diverging_sub_expression
1911    )]
1912    async fn _file_download_is_send(client: &Client) {
1913        let dm: DataMap = todo!();
1914        let fut = client.file_download(&dm, Path::new("/dev/null"));
1915        _assert_send(&fut);
1916    }
1917}