Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15    finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19    chunk_contents_for_upload_addresses, finalize_merkle_batch, should_use_merkle,
20    MerkleBatchPaymentResult, PaymentMode, PreparedMerkleBatch,
21};
22use crate::data::client::Client;
23use crate::data::error::{Error, Result};
24use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
25use ant_protocol::transport::{MultiAddr, PeerId};
26use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
27use bytes::Bytes;
28use fs2::FileExt;
29use futures::stream::{self, StreamExt};
30use self_encryption::{
31    get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
32    streaming_decrypt_with_batch_size, DataMap,
33};
34use std::collections::{HashMap, HashSet};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::sync::{Arc, Mutex};
38use tokio::runtime::Handle;
39use tokio::sync::mpsc;
40use tracing::{debug, info, warn};
41use xor_name::XorName;
42
43/// Progress events emitted during file upload for UI feedback.
44#[derive(Debug, Clone)]
45pub enum UploadEvent {
46    /// A chunk has been encrypted and spilled to disk.
47    Encrypting { chunks_done: usize },
48    /// File encryption complete.
49    Encrypted { total_chunks: usize },
50    /// Starting quote collection for a wave.
51    QuotingChunks {
52        wave: usize,
53        total_waves: usize,
54        chunks_in_wave: usize,
55    },
56    /// A chunk has been quoted (peer discovery + price received).
57    /// This is the slow phase — each quote involves network round-trips.
58    ChunkQuoted { quoted: usize, total: usize },
59    /// A chunk has been stored on the network.
60    ChunkStored { stored: usize, total: usize },
61    /// A wave has completed.
62    WaveComplete {
63        wave: usize,
64        total_waves: usize,
65        stored_so_far: usize,
66        total: usize,
67    },
68}
69
70/// Progress events emitted during file download for UI feedback.
71#[derive(Debug, Clone)]
72pub enum DownloadEvent {
73    /// Resolving hierarchical DataMap to discover real chunk count.
74    ResolvingDataMap { total_map_chunks: usize },
75    /// A DataMap chunk has been fetched during resolution.
76    MapChunkFetched { fetched: usize },
77    /// DataMap resolved — total data chunk count now known.
78    DataMapResolved { total_chunks: usize },
79    /// Data chunks are being fetched from the network.
80    ChunksFetched { fetched: usize, total: usize },
81}
82
83/// One entry in the per-chunk quote list returned by
84/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
85/// signed quote it returned, and the payment amount it is demanding.
86type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
87
88/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
89const UPLOAD_WAVE_SIZE: usize = 64;
90
91/// Stream decrypt batches should be larger than fetch fan-out so
92/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
93/// ones complete, instead of stopping at each self-encryption batch boundary.
94const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
95
96/// Use at most this fraction of currently usable RAM for one decrypt batch.
97const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
98
99/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
100/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
101/// payload bytes alone.
102const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
103
104/// Maximum number of distinct chunk addresses to sample when probing for a
105/// representative quote in [`Client::estimate_upload_cost`].
106///
107/// Bounded small so we never spend more than a couple of round-trips on the
108/// `AlreadyStored` retry path, which only matters when many leading chunks
109/// of a file already live on the network.
110const ESTIMATE_SAMPLE_CAP: usize = 5;
111
112/// Gas used by one `pay_for_quotes` transaction that packs up to
113/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
114///
115/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
116/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
117/// the base tx overhead. On Arbitrum that is roughly
118/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
119/// conservative per-wave upper bound.
120const GAS_PER_WAVE_TX: u128 = 1_500_000;
121
122/// Gas used by one merkle batch payment transaction.
123///
124/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
125/// and posts a pool commitment, so budget higher than a plain transfer.
126const GAS_PER_MERKLE_TX: u128 = 500_000;
127
128/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
129/// figure when no live gas oracle is consulted.
130///
131/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
132/// that as the default so the CLI prints a sensible order-of-magnitude
133/// number. Users should treat the reported gas cost as an estimate, not a
134/// commitment — real gas is bid at submission time.
135const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
136
137/// Extra headroom percentage for disk space check.
138///
139/// Encrypted chunks are slightly larger than the source data due to padding
140/// and self-encryption overhead. We require file_size + 10% free space in
141/// the temp directory to account for this.
142const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
143
144/// Temporary on-disk buffer for encrypted chunks.
145///
146/// During file encryption, chunks are written to a temp directory so that
147/// only their 32-byte addresses stay in memory. At upload time chunks are
148/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
149/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
150///
151/// This is a small TOCTOU guard covering the sub-millisecond window inside
152/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
153/// dir is older than this and its lockfile is releasable, the owning process
154/// is gone and the dir is safe to reap — regardless of how old it is.
155///
156/// The previous policy waited 24 h before reaping any orphan, which meant
157/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
158/// spill dir until the next day's upload — and on a host being restart-looped
159/// by systemd, orphans could fill the disk well within that window.
160const SPILL_STALE_GRACE_SECS: u64 = 30;
161
162/// Prefix for spill directory names to distinguish from user files.
163const SPILL_DIR_PREFIX: &str = "spill_";
164
165/// Lockfile name inside each spill dir to signal active use.
166const SPILL_LOCK_NAME: &str = ".lock";
167
168struct ChunkSpill {
169    /// Directory holding spilled chunk files (named by hex address).
170    dir: PathBuf,
171    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
172    _lock: std::fs::File,
173    /// Deduplicated list of chunk addresses.
174    addresses: Vec<[u8; 32]>,
175    /// Tracks seen addresses for deduplication.
176    seen: HashSet<[u8; 32]>,
177    /// Byte size per spilled chunk address.
178    sizes: HashMap<[u8; 32], u64>,
179    /// Running total of unique chunk byte sizes (for average-size calculation).
180    total_bytes: u64,
181}
182
183impl ChunkSpill {
184    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
185    fn spill_root() -> Result<PathBuf> {
186        use crate::config;
187        let root = config::data_dir()
188            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
189            .join("spill");
190        Ok(root)
191    }
192
193    /// Create a new spill directory under `<data_dir>/spill/`.
194    ///
195    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
196    /// identified by prefix and cleaned up by age. A lockfile inside the
197    /// dir prevents concurrent cleanup from deleting an active spill.
198    fn new() -> Result<Self> {
199        let root = Self::spill_root()?;
200        std::fs::create_dir_all(&root)?;
201
202        // Clean up stale spill dirs from previous crashed runs.
203        Self::cleanup_stale(&root);
204
205        let now = std::time::SystemTime::now()
206            .duration_since(std::time::UNIX_EPOCH)
207            .unwrap_or_default()
208            .as_secs();
209        let unique: u64 = rand::random();
210        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
211        std::fs::create_dir(&dir)?;
212
213        // Create and hold a lockfile for the lifetime of this spill.
214        // cleanup_stale() will skip dirs with locked files.
215        let lock_path = dir.join(SPILL_LOCK_NAME);
216        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
217            Error::Io(std::io::Error::new(
218                e.kind(),
219                format!("failed to create spill lockfile: {e}"),
220            ))
221        })?;
222        lock_file.try_lock_exclusive().map_err(|e| {
223            Error::Io(std::io::Error::new(
224                e.kind(),
225                format!("failed to lock spill lockfile: {e}"),
226            ))
227        })?;
228
229        Ok(Self {
230            dir,
231            _lock: lock_file,
232            addresses: Vec::new(),
233            seen: HashSet::new(),
234            sizes: HashMap::new(),
235            total_bytes: 0,
236        })
237    }
238
239    /// Clean up stale spill directories. Best-effort, errors are logged.
240    ///
241    /// A spill dir is reaped when:
242    /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
243    /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
244    /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
245    /// 4. Its lockfile is releasable — i.e. no live process holds it
246    ///
247    /// The lockfile is the primary correctness gate: a releasable lock means
248    /// the owning `ChunkSpill` has been dropped or the process is gone, so
249    /// the dir is fair game. The grace period covers only the brief window
250    /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
251    ///
252    /// Safe to call concurrently from multiple processes.
253    fn cleanup_stale(root: &Path) {
254        let now = std::time::SystemTime::now()
255            .duration_since(std::time::UNIX_EPOCH)
256            .unwrap_or_default()
257            .as_secs();
258
259        if now == 0 {
260            // Clock is broken (before Unix epoch). Skip cleanup to avoid
261            // misidentifying dirs as stale.
262            warn!("System clock before Unix epoch, skipping spill cleanup");
263            return;
264        }
265
266        let entries = match std::fs::read_dir(root) {
267            Ok(entries) => entries,
268            Err(_) => return,
269        };
270
271        for entry in entries.flatten() {
272            let name = entry.file_name();
273            let name_str = name.to_string_lossy();
274
275            // Only process dirs with our prefix.
276            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
277                Some(s) => s,
278                None => continue,
279            };
280
281            // Parse timestamp: "spill_<timestamp>_<random>"
282            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
283                Some(ts) => ts,
284                None => continue,
285            };
286
287            if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
288                continue;
289            }
290
291            // Safety: only delete actual directories, not symlinks.
292            let file_type = match entry.file_type() {
293                Ok(ft) => ft,
294                Err(_) => continue,
295            };
296            if !file_type.is_dir() {
297                continue;
298            }
299
300            let path = entry.path();
301
302            // Check lockfile: if locked, the dir is in active use -- skip it.
303            let lock_path = path.join(SPILL_LOCK_NAME);
304            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
305                use fs2::FileExt;
306                if lock_file.try_lock_exclusive().is_err() {
307                    // Lock held by another process -- dir is active.
308                    debug!("Skipping active spill dir: {}", path.display());
309                    continue;
310                }
311                // We acquired the lock, so no one else holds it.
312                // Drop it before deleting.
313                drop(lock_file);
314            }
315
316            info!("Cleaning up stale spill dir: {}", path.display());
317            if let Err(e) = std::fs::remove_dir_all(&path) {
318                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
319            }
320        }
321    }
322
323    /// Run stale spill cleanup. Call at client startup or periodically.
324    #[allow(dead_code)]
325    pub(crate) fn run_cleanup() {
326        if let Ok(root) = Self::spill_root() {
327            Self::cleanup_stale(&root);
328        }
329    }
330
331    /// Write one encrypted chunk to disk and record its address.
332    ///
333    /// Deduplicates by content address: if the same chunk was already
334    /// spilled, the write and accounting are skipped. This prevents
335    /// double-uploads and inflated quoting metrics.
336    fn push(&mut self, content: &[u8]) -> Result<()> {
337        let address = compute_address(content);
338        if !self.seen.insert(address) {
339            return Ok(());
340        }
341        let path = self.dir.join(hex::encode(address));
342        std::fs::write(&path, content)?;
343        let content_len = content.len() as u64;
344        self.sizes.insert(address, content_len);
345        self.total_bytes += content_len;
346        self.addresses.push(address);
347        Ok(())
348    }
349
350    /// Number of chunks stored.
351    fn len(&self) -> usize {
352        self.addresses.len()
353    }
354
355    /// Total bytes of all spilled chunks.
356    fn total_bytes(&self) -> u64 {
357        self.total_bytes
358    }
359
360    /// Address and byte-size pairs for all spilled chunks.
361    fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
362        self.addresses
363            .iter()
364            .map(|address| {
365                self.sizes
366                    .get(address)
367                    .copied()
368                    .map(|size| (*address, size))
369                    .ok_or_else(|| {
370                        Error::Storage(format!(
371                            "missing size for spilled chunk {}",
372                            hex::encode(address)
373                        ))
374                    })
375            })
376            .collect()
377    }
378
379    /// Read a single chunk back from disk by address.
380    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
381        let path = self.dir.join(hex::encode(address));
382        let data = std::fs::read(&path).map_err(|e| {
383            Error::Io(std::io::Error::new(
384                e.kind(),
385                format!("reading spilled chunk {}: {e}", hex::encode(address)),
386            ))
387        })?;
388        Ok(Bytes::from(data))
389    }
390
391    /// Read a wave of chunks from disk.
392    fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
393        let mut out = Vec::with_capacity(wave_addrs.len());
394        for addr in wave_addrs {
395            let content = self.read_chunk(addr)?;
396            out.push((content, *addr));
397        }
398        Ok(out)
399    }
400
401    /// Clean up the spill directory.
402    fn cleanup(&self) {
403        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
404            warn!(
405                "Failed to clean up chunk spill dir {}: {e}",
406                self.dir.display()
407            );
408        }
409    }
410}
411
412impl Drop for ChunkSpill {
413    fn drop(&mut self) {
414        self.cleanup();
415    }
416}
417
418fn cached_merkle_covers_addresses(
419    cached: &MerkleBatchPaymentResult,
420    addresses: &[[u8; 32]],
421) -> bool {
422    addresses
423        .iter()
424        .all(|addr| cached.proofs.contains_key(addr))
425}
426
427/// Check that the spill directory has enough free space for the spilled chunks.
428///
429/// `file_size` is the source file's byte count. We require
430/// `file_size + 10%` free space to account for self-encryption overhead.
431fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
432    let spill_root = ChunkSpill::spill_root()?;
433
434    // Ensure the root exists so fs2 can query it.
435    std::fs::create_dir_all(&spill_root)?;
436
437    let available = fs2::available_space(&spill_root).map_err(|e| {
438        Error::Io(std::io::Error::new(
439            e.kind(),
440            format!(
441                "failed to query disk space on {}: {e}",
442                spill_root.display()
443            ),
444        ))
445    })?;
446
447    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
448    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
449    let required = file_size.saturating_add(headroom);
450
451    if available < required {
452        let avail_mb = available / (1024 * 1024);
453        let req_mb = required / (1024 * 1024);
454        return Err(Error::InsufficientDiskSpace(format!(
455            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
456            spill_root.display()
457        )));
458    }
459
460    debug!(
461        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
462        spill_root.display()
463    );
464    Ok(())
465}
466
467fn usable_memory_bytes() -> Option<u64> {
468    let mut system = sysinfo::System::new();
469    system.refresh_memory();
470
471    let available_memory = system.available_memory();
472    let free_memory = system.free_memory();
473    let used_memory = system.used_memory();
474    let total_memory = system.total_memory();
475    let unused_memory = total_memory.saturating_sub(used_memory);
476
477    let mut usable = [available_memory, free_memory, unused_memory]
478        .into_iter()
479        .filter(|bytes| *bytes > 0)
480        .max();
481
482    let cgroup_free_memory = system
483        .cgroup_limits()
484        .filter(|limits| limits.total_memory > 0)
485        .map(|limits| limits.free_memory);
486    if let Some(cgroup_free_memory) = cgroup_free_memory {
487        usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
488    }
489
490    debug!(
491        available_memory,
492        free_memory,
493        used_memory,
494        total_memory,
495        cgroup_free_memory,
496        usable_memory = ?usable,
497        "Detected usable memory for stream decrypt batch sizing"
498    );
499
500    usable
501}
502
503fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
504    let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
505    let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
506        .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
507        .max(1);
508    let cap = (budget / estimated_bytes_per_chunk).max(1);
509
510    usize::try_from(cap).unwrap_or(usize::MAX)
511}
512
513fn adaptive_stream_decrypt_batch_size(
514    total_chunks: usize,
515    fetch_cap: usize,
516    configured_batch_floor: usize,
517    usable_memory_bytes: Option<u64>,
518) -> usize {
519    let fetch_target = fetch_cap
520        .max(1)
521        .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
522    let requested = match usable_memory_bytes {
523        Some(bytes) => {
524            let memory_cap = stream_decrypt_batch_memory_cap(bytes);
525            configured_batch_floor
526                .max(fetch_target)
527                .max(1)
528                .min(memory_cap)
529        }
530        None => configured_batch_floor.max(1),
531    };
532
533    requested.min(total_chunks.max(1)).max(1)
534}
535
536/// Whether the data map is published to the network for address-based retrieval.
537///
538/// A private upload stores only the data chunks and returns the `DataMap` to
539/// the caller — only someone holding that `DataMap` can reconstruct the file.
540/// A public upload additionally stores the serialized `DataMap` as a chunk on
541/// the network, yielding a single chunk address that anyone can use to
542/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
543#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
544pub enum Visibility {
545    /// Keep the data map local; only the holder can retrieve the file.
546    #[default]
547    Private,
548    /// Publish the data map as a network chunk so anyone with the returned
549    /// address can retrieve and decrypt the file.
550    Public,
551}
552
553/// Estimated cost of uploading a file, returned by
554/// [`Client::estimate_upload_cost`].
555#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
556pub struct UploadCostEstimate {
557    /// Original file size in bytes.
558    pub file_size: u64,
559    /// Number of chunks the file would be split into (data chunks only,
560    /// does not include the DataMap chunk added during public uploads).
561    pub chunk_count: usize,
562    /// Estimated total storage cost in atto (token smallest unit).
563    pub storage_cost_atto: String,
564    /// Estimated gas cost in wei as a string. This is a rough heuristic
565    /// based on chunk count and payment mode, NOT a live gas price query.
566    pub estimated_gas_cost_wei: String,
567    /// Payment mode that would be used.
568    pub payment_mode: PaymentMode,
569}
570
571/// Result of a file upload: the `DataMap` needed to retrieve the file.
572///
573/// Marked `#[non_exhaustive]` so adding a new field in future is not a
574/// breaking change for downstream consumers that construct or pattern-match
575/// on this struct.
576#[derive(Debug, Clone)]
577#[non_exhaustive]
578pub struct FileUploadResult {
579    /// The data map containing chunk metadata for reconstruction.
580    pub data_map: DataMap,
581    /// Number of chunks stored on the network.
582    pub chunks_stored: usize,
583    /// Number of chunks that failed to store. Always 0 for a successful
584    /// upload — partial-failure information is conveyed via
585    /// [`crate::data::Error::PartialUpload`] instead.
586    pub chunks_failed: usize,
587    /// Total number of chunks in the upload, including chunks that were
588    /// already stored and skipped. On full success this equals `chunks_stored`.
589    pub total_chunks: usize,
590    /// Which payment mode was actually used (not just requested).
591    pub payment_mode_used: PaymentMode,
592    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
593    pub storage_cost_atto: String,
594    /// Total gas cost in wei. 0 if no on-chain transactions were made.
595    pub gas_cost_wei: u128,
596    /// Chunk address of the serialized `DataMap`, set only for
597    /// [`Visibility::Public`] uploads. **`Some` means this address is
598    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
599    /// necessarily that *this* upload paid to store it — if the serialized
600    /// `DataMap` hashed to a chunk that was already on the network (same
601    /// file uploaded before; deterministic via self-encryption), the address
602    /// is still returned but no storage payment was made for it.
603    pub data_map_address: Option<[u8; 32]>,
604    /// Sum of chunk-store RPC attempts across the upload
605    /// (`>= chunks_stored` on full success; more if any chunk retried).
606    /// `0` for paths that don't run the wave store loop.
607    pub chunk_attempts_total: usize,
608    /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
609    /// success, empty for paths that don't run the wave store loop).
610    pub store_durations_ms: Vec<u64>,
611    /// Count of stored chunks that succeeded on each retry round
612    /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
613    /// paths that don't run the wave store loop.
614    pub retries_histogram: [usize; 4],
615}
616
617/// Payment information for external signing — either wave-batch or merkle.
618#[derive(Debug)]
619pub enum ExternalPaymentInfo {
620    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
621    WaveBatch {
622        /// Chunks ready for payment (needed for finalize).
623        prepared_chunks: Vec<PreparedChunk>,
624        /// Payment intent for external signing.
625        payment_intent: PaymentIntent,
626    },
627    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
628    Merkle {
629        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
630        prepared_batch: PreparedMerkleBatch,
631        /// Raw chunk contents that still need upload after the preflight check.
632        chunk_contents: Vec<Bytes>,
633        /// Chunk addresses that still need upload after the preflight check.
634        chunk_addresses: Vec<[u8; 32]>,
635    },
636}
637
638/// Prepared upload ready for external payment.
639///
640/// Contains everything needed to construct the on-chain payment transaction
641/// externally (e.g. via WalletConnect in a desktop app) and then finalize
642/// the upload without a Rust-side wallet.
643///
644/// Note: This struct stays in Rust memory — only the public fields of
645/// `payment_info` are sent to the frontend. `PreparedChunk` contains
646/// non-serializable network types, so the full struct cannot derive `Serialize`.
647///
648/// Marked `#[non_exhaustive]` so adding a new field in future is not a
649/// breaking change for downstream consumers.
650#[derive(Debug)]
651#[non_exhaustive]
652pub struct PreparedUpload {
653    /// The data map for later retrieval.
654    pub data_map: DataMap,
655    /// Payment information for chunks that still need payment after the
656    /// already-stored preflight. This may be wave-batch even when the original
657    /// chunk count was merkle-eligible if the remaining count is below the
658    /// merkle threshold.
659    pub payment_info: ExternalPaymentInfo,
660    /// Chunk address of the serialized `DataMap` when this upload was
661    /// prepared with [`Visibility::Public`]. `Some` means the address is
662    /// retrievable on the network after finalization — either because this
663    /// upload paid to store the chunk in `payment_info`, or because the
664    /// chunk was already on the network (deterministic self-encryption).
665    /// Carried through to [`FileUploadResult::data_map_address`].
666    pub data_map_address: Option<[u8; 32]>,
667    /// Chunk addresses already present on the network when this upload was
668    /// prepared. These do not require payment or PUT during finalization.
669    pub already_stored_addresses: Vec<[u8; 32]>,
670    /// Total chunk count for the upload, including already-stored chunks.
671    pub total_chunks: usize,
672}
673
674/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
675type EncryptionChannels = (
676    tokio::sync::mpsc::Receiver<Bytes>,
677    tokio::sync::oneshot::Receiver<DataMap>,
678    tokio::task::JoinHandle<Result<()>>,
679);
680
681/// Spawn a blocking task that streams file encryption through a channel.
682fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
683    let metadata = std::fs::metadata(&path)?;
684    let data_size = usize::try_from(metadata.len())
685        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
686
687    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
688    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
689
690    let handle = tokio::task::spawn_blocking(move || {
691        let file = std::fs::File::open(&path)?;
692        let mut reader = std::io::BufReader::new(file);
693
694        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
695        let read_error_clone = Arc::clone(&read_error);
696
697        let data_iter = std::iter::from_fn(move || {
698            let mut buffer = vec![0u8; 8192];
699            match std::io::Read::read(&mut reader, &mut buffer) {
700                Ok(0) => None,
701                Ok(n) => {
702                    buffer.truncate(n);
703                    Some(Bytes::from(buffer))
704                }
705                Err(e) => {
706                    let mut guard = read_error_clone
707                        .lock()
708                        .unwrap_or_else(|poisoned| poisoned.into_inner());
709                    *guard = Some(e);
710                    None
711                }
712            }
713        });
714
715        let mut stream = stream_encrypt(data_size, data_iter)
716            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
717
718        for chunk_result in stream.chunks() {
719            // Check for captured read errors immediately after each chunk.
720            // stream_encrypt sees None (EOF) when a read fails, so it stops
721            // producing chunks. We must detect this before sending the
722            // partial results to avoid uploading a truncated DataMap.
723            {
724                let guard = read_error
725                    .lock()
726                    .unwrap_or_else(|poisoned| poisoned.into_inner());
727                if let Some(ref e) = *guard {
728                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
729                }
730            }
731
732            let (_hash, content) = chunk_result
733                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
734            if chunk_tx.blocking_send(content).is_err() {
735                return Err(Error::Encryption("upload receiver dropped".to_string()));
736            }
737        }
738
739        // Final check: read error after last chunk (stream saw EOF).
740        {
741            let guard = read_error
742                .lock()
743                .unwrap_or_else(|poisoned| poisoned.into_inner());
744            if let Some(ref e) = *guard {
745                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
746            }
747        }
748
749        let datamap = stream
750            .into_datamap()
751            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
752        if datamap_tx.send(datamap).is_err() {
753            warn!("DataMap receiver dropped — upload may have been cancelled");
754        }
755        Ok(())
756    });
757
758    Ok((chunk_rx, datamap_rx, handle))
759}
760
761impl Client {
762    /// Upload a file to the network using streaming self-encryption.
763    ///
764    /// Automatically selects merkle batch payment for files that produce
765    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
766    /// directory so peak memory stays at ~256 MB regardless of file size.
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if the file cannot be read, encryption fails,
771    /// or any chunk cannot be stored.
772    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
773        self.file_upload_with_mode(path, PaymentMode::Auto).await
774    }
775
776    /// Estimate the cost of uploading a file without actually uploading.
777    ///
778    /// Encrypts the file to determine chunk count and sizes, then requests
779    /// a single quote from the network for a representative chunk. The
780    /// per-chunk price is extrapolated to the total chunk count.
781    ///
782    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
783    /// chunks are cleaned up automatically when the function returns.
784    ///
785    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
786    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
787    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
788    /// varies with network conditions.
789    ///
790    /// If the first sampled chunk is already stored on the network, the
791    /// function retries with subsequent chunk addresses (up to
792    /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
793    /// a [`Error::CostEstimationInconclusive`] is returned so callers can
794    /// decide how to react rather than trust a bogus "free" estimate. Only
795    /// when every address in the file is stored do we return a zero-cost
796    /// estimate.
797    ///
798    /// # Errors
799    ///
800    /// Returns an error if the file cannot be read, encryption fails,
801    /// the network cannot provide a quote, or every sampled chunk is
802    /// already stored ([`Error::CostEstimationInconclusive`]).
803    pub async fn estimate_upload_cost(
804        &self,
805        path: &Path,
806        mode: PaymentMode,
807        progress: Option<mpsc::Sender<UploadEvent>>,
808    ) -> Result<UploadCostEstimate> {
809        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
810
811        if file_size < 3 {
812            return Err(Error::InvalidData(
813                "File too small: self-encryption requires at least 3 bytes".into(),
814            ));
815        }
816
817        check_disk_space_for_spill(file_size)?;
818
819        info!(
820            "Estimating upload cost for {} ({file_size} bytes)",
821            path.display()
822        );
823
824        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
825        let chunk_count = spill.len();
826
827        if let Some(ref tx) = progress {
828            let _ = tx
829                .send(UploadEvent::Encrypted {
830                    total_chunks: chunk_count,
831                })
832                .await;
833        }
834
835        info!("Encrypted into {chunk_count} chunks, requesting quote");
836
837        // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
838        // AlreadyStored result says nothing about the rest of the file — the
839        // first chunk is often a DataMap-adjacent chunk that collides with
840        // prior uploads even when 99% of the file is new. Only treat the
841        // whole file as "fully stored" when every sample comes back stored.
842        let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
843        let mut sampled = 0usize;
844        let mut all_already_stored = true;
845        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
846
847        for addr in spill.addresses.iter().take(sample_limit) {
848            sampled += 1;
849            let chunk_bytes = spill.read_chunk(addr)?;
850            let data_size = u64::try_from(chunk_bytes.len())
851                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
852            match self
853                .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
854                .await
855            {
856                Ok(q) => {
857                    quotes_opt = Some(q);
858                    all_already_stored = false;
859                    break;
860                }
861                Err(Error::AlreadyStored) => {
862                    debug!(
863                        "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
864                        hex::encode(addr)
865                    );
866                    continue;
867                }
868                Err(e) => return Err(e),
869            }
870        }
871
872        let uses_merkle = should_use_merkle(chunk_count, mode);
873
874        let quotes = match quotes_opt {
875            Some(q) => q,
876            None if all_already_stored && sampled == chunk_count => {
877                // Every address in the file was sampled and every one is
878                // already on the network — returning a zero-cost estimate is
879                // accurate in this case.
880                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
881                return Ok(UploadCostEstimate {
882                    file_size,
883                    chunk_count,
884                    storage_cost_atto: "0".into(),
885                    estimated_gas_cost_wei: "0".into(),
886                    payment_mode: if uses_merkle {
887                        PaymentMode::Merkle
888                    } else {
889                        PaymentMode::Single
890                    },
891                });
892            }
893            None => {
894                return Err(Error::CostEstimationInconclusive(format!(
895                    "sampled {sampled} chunk addresses out of {chunk_count} and every \
896                     one reported AlreadyStored; cannot infer a representative price \
897                     for the remaining chunks"
898                )));
899            }
900        };
901
902        // Use the median price × 3 (matches SingleNodePayment::from_quotes
903        // which pays 3x the median to incentivize reliable storage).
904        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
905        prices.sort();
906        let median_price = prices
907            .get(prices.len() / 2)
908            .copied()
909            .unwrap_or(Amount::ZERO);
910        let per_chunk_cost = median_price * Amount::from(3u64);
911
912        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
913        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
914
915        // Estimate gas cost from realistic per-transaction budgets rather
916        // than a flat per-chunk or per-wave number.
917        //
918        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
919        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
920        //   The dominant cost is one SSTORE per entry plus base tx overhead,
921        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
922        //   on a full wave and multiply by the number of waves. The previous
923        //   per-wave figure of 150k was closer to a single-entry transfer
924        //   and understated cost by 5–10x for full waves.
925        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
926        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
927        //
928        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
929        // Arbitrum baseline). Treat the result as advisory, not a commitment.
930        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
931        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
932        let estimated_gas: u128 = if uses_merkle {
933            merkle_batches
934                .saturating_mul(GAS_PER_MERKLE_TX)
935                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
936        } else {
937            waves
938                .saturating_mul(GAS_PER_WAVE_TX)
939                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
940        };
941
942        info!(
943            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
944        );
945
946        Ok(UploadCostEstimate {
947            file_size,
948            chunk_count,
949            storage_cost_atto: total_storage.to_string(),
950            estimated_gas_cost_wei: estimated_gas.to_string(),
951            payment_mode: if uses_merkle {
952                PaymentMode::Merkle
953            } else {
954                PaymentMode::Single
955            },
956        })
957    }
958
959    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
960    ///
961    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
962    /// [`Visibility::Private`] — see that method for details.
963    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
964        self.file_prepare_upload_with_progress(path, Visibility::Private, None)
965            .await
966    }
967
968    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
969    ///
970    /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
971    /// `progress: None` — see that method for details.
972    pub async fn file_prepare_upload_with_visibility(
973        &self,
974        path: &Path,
975        visibility: Visibility,
976    ) -> Result<PreparedUpload> {
977        self.file_prepare_upload_with_progress(path, visibility, None)
978            .await
979    }
980
981    /// Phase 1 of external-signer upload with progress events.
982    ///
983    /// Requires an EVM network (for contract price queries) but NOT a wallet.
984    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
985    /// and a [`PaymentIntent`] that the external signer uses to construct
986    /// and submit the on-chain payment transaction.
987    ///
988    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
989    /// is bundled into the payment batch as an additional chunk and its
990    /// address is recorded on the returned [`PreparedUpload`]. After
991    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
992    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
993    /// can share a single address from which anyone can retrieve the file.
994    ///
995    /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
996    /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
997    /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
998    /// emitted later by [`Client::finalize_upload_with_progress`] /
999    /// [`Client::finalize_upload_merkle_with_progress`].
1000    ///
1001    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1002    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1003    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1004    /// inherent to the two-phase external-signer protocol — the chunks must
1005    /// stay in memory until [`Client::finalize_upload`] stores them. For very
1006    /// large files, prefer [`Client::file_upload`] which streams directly.
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns an error if there is insufficient disk space, the file cannot
1011    /// be read, encryption fails, or quote collection fails.
1012    pub async fn file_prepare_upload_with_progress(
1013        &self,
1014        path: &Path,
1015        visibility: Visibility,
1016        progress: Option<mpsc::Sender<UploadEvent>>,
1017    ) -> Result<PreparedUpload> {
1018        debug!(
1019            "Preparing file upload for external signing (visibility={visibility:?}): {}",
1020            path.display()
1021        );
1022
1023        let file_size = std::fs::metadata(path)?.len();
1024        check_disk_space_for_spill(file_size)?;
1025
1026        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1027
1028        info!(
1029            "Encrypted {} into {} chunks for external signing (spilled to disk)",
1030            path.display(),
1031            spill.len()
1032        );
1033
1034        // Read each chunk from disk and collect quotes concurrently.
1035        // Note: all PreparedChunks accumulate in memory because the external-signer
1036        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1037        let mut chunk_data: Vec<Bytes> = spill
1038            .addresses
1039            .iter()
1040            .map(|addr| spill.read_chunk(addr))
1041            .collect::<std::result::Result<Vec<_>, _>>()?;
1042
1043        // For public uploads, bundle the serialized DataMap as an extra chunk
1044        // in the same payment batch. This lets the external signer pay for
1045        // the data chunks and the DataMap chunk in one flow, and lets the
1046        // finalize step return the DataMap's chunk address as the shareable
1047        // retrieval address.
1048        let data_map_address = match visibility {
1049            Visibility::Private => None,
1050            Visibility::Public => {
1051                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1052                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1053                })?;
1054                let bytes = Bytes::from(serialized);
1055                let address = compute_address(&bytes);
1056                info!(
1057                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1058                    bytes.len(),
1059                    hex::encode(address)
1060                );
1061                chunk_data.push(bytes);
1062                Some(address)
1063            }
1064        };
1065
1066        let chunk_count = chunk_data.len();
1067
1068        if let Some(ref tx) = progress {
1069            let _ = tx
1070                .send(UploadEvent::Encrypted {
1071                    total_chunks: chunk_count,
1072                })
1073                .await;
1074        }
1075
1076        let (payment_info, already_stored_addresses) = if should_use_merkle(
1077            chunk_count,
1078            PaymentMode::Auto,
1079        ) {
1080            // Merkle path: build tree, collect candidate pools, return for external payment.
1081            info!("Using merkle batch preparation for {chunk_count} file chunks");
1082
1083            let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1084                .iter()
1085                .map(|chunk| {
1086                    let size = u64::try_from(chunk.len())
1087                        .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1088                    Ok((compute_address(chunk), size))
1089                })
1090                .collect::<Result<Vec<_>>>()?;
1091
1092            let merkle_plan = self
1093                .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1094                .await?;
1095
1096            if merkle_plan.to_upload.is_empty() {
1097                info!("All {chunk_count} file chunks already stored; no external payment needed");
1098                (
1099                    ExternalPaymentInfo::WaveBatch {
1100                        prepared_chunks: Vec::new(),
1101                        payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1102                    },
1103                    merkle_plan.already_stored,
1104                )
1105            } else {
1106                let chunk_data =
1107                    chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1108
1109                if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1110                    info!(
1111                        "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1112                        merkle_plan.to_upload.len()
1113                    );
1114                    let (payment_info, mut wave_already_stored) = self
1115                        .prepare_wave_batch_external_chunks(
1116                            chunk_data,
1117                            progress.as_ref(),
1118                            chunk_count,
1119                        )
1120                        .await?;
1121                    let mut already_stored = merkle_plan.already_stored;
1122                    already_stored.append(&mut wave_already_stored);
1123                    (payment_info, already_stored)
1124                } else {
1125                    match self
1126                        .prepare_merkle_batch_external(
1127                            &merkle_plan.to_upload,
1128                            DATA_TYPE_CHUNK,
1129                            merkle_plan.to_upload_avg_size(),
1130                        )
1131                        .await
1132                    {
1133                        Ok(prepared_batch) => {
1134                            info!(
1135                                "File prepared for external merkle signing: {} chunks, depth={} ({})",
1136                                merkle_plan.to_upload.len(),
1137                                prepared_batch.depth,
1138                                path.display()
1139                            );
1140
1141                            (
1142                                ExternalPaymentInfo::Merkle {
1143                                    prepared_batch,
1144                                    chunk_contents: chunk_data,
1145                                    chunk_addresses: merkle_plan.to_upload,
1146                                },
1147                                merkle_plan.already_stored,
1148                            )
1149                        }
1150                        Err(Error::InsufficientPeers(ref msg)) => {
1151                            info!(
1152                                "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1153                            );
1154                            let (payment_info, mut wave_already_stored) = self
1155                                .prepare_wave_batch_external_chunks(
1156                                    chunk_data,
1157                                    progress.as_ref(),
1158                                    chunk_count,
1159                                )
1160                                .await?;
1161                            let mut already_stored = merkle_plan.already_stored;
1162                            already_stored.append(&mut wave_already_stored);
1163                            (payment_info, already_stored)
1164                        }
1165                        Err(e) => return Err(e),
1166                    }
1167                }
1168            }
1169        } else {
1170            self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1171                .await?
1172        };
1173
1174        // Surface the "DataMap chunk was already on the network" case
1175        // so debugging "why is data_map_address set but no storage cost
1176        // appears for it?" doesn't require reading the source. See the
1177        // `data_map_address` doc comment for why this is still a valid
1178        // `Some(addr)` outcome.
1179        if let Some(addr) = data_map_address {
1180            let data_map_needs_payment = match &payment_info {
1181                ExternalPaymentInfo::WaveBatch {
1182                    prepared_chunks, ..
1183                } => prepared_chunks.iter().any(|c| c.address == addr),
1184                ExternalPaymentInfo::Merkle {
1185                    chunk_addresses, ..
1186                } => chunk_addresses.contains(&addr),
1187            };
1188            if !data_map_needs_payment {
1189                info!(
1190                    "Public upload: DataMap chunk {} was already stored \
1191                     on the network — address is retrievable without a \
1192                     new payment",
1193                    hex::encode(addr)
1194                );
1195            }
1196        }
1197
1198        Ok(PreparedUpload {
1199            data_map,
1200            payment_info,
1201            data_map_address,
1202            already_stored_addresses,
1203            total_chunks: chunk_count,
1204        })
1205    }
1206
1207    async fn prepare_wave_batch_external_chunks(
1208        &self,
1209        chunk_data: Vec<Bytes>,
1210        progress: Option<&mpsc::Sender<UploadEvent>>,
1211        progress_total: usize,
1212    ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1213        let chunk_count = chunk_data.len();
1214        let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1215            .into_iter()
1216            .map(|content| {
1217                let address = compute_address(&content);
1218                (content, address)
1219            })
1220            .collect();
1221
1222        // Wave-batch path: collect quotes per chunk concurrently, emitting
1223        // a `ChunkQuoted` event after each completion so callers can drive
1224        // a progress bar through the slow quote phase.
1225        let quote_limiter = self.controller().quote.clone();
1226        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1227        let mut quote_stream = stream::iter(chunks_with_addr)
1228            .map(|(content, address)| {
1229                let limiter = quote_limiter.clone();
1230                async move {
1231                    let result = observe_op(
1232                        &limiter,
1233                        || async move { self.prepare_chunk_payment(content).await },
1234                        classify_error,
1235                    )
1236                    .await;
1237                    (address, result)
1238                }
1239            })
1240            .buffer_unordered(quote_concurrency);
1241
1242        let mut prepared_chunks = Vec::with_capacity(chunk_count);
1243        let mut already_stored = Vec::new();
1244        let mut quoted = 0usize;
1245        while let Some((address, result)) = quote_stream.next().await {
1246            match result? {
1247                Some(prepared) => prepared_chunks.push(prepared),
1248                None => already_stored.push(address),
1249            }
1250            quoted += 1;
1251            if let Some(tx) = progress {
1252                let _ = tx.try_send(UploadEvent::ChunkQuoted {
1253                    quoted,
1254                    total: progress_total,
1255                });
1256            }
1257        }
1258
1259        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1260        info!(
1261            "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1262            prepared_chunks.len(),
1263            already_stored.len(),
1264            payment_intent.total_amount,
1265        );
1266
1267        Ok((
1268            ExternalPaymentInfo::WaveBatch {
1269                prepared_chunks,
1270                payment_intent,
1271            },
1272            already_stored,
1273        ))
1274    }
1275
1276    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1277    ///
1278    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1279    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1280    /// payment. Builds payment proofs and stores chunks on the network.
1281    ///
1282    /// # Errors
1283    ///
1284    /// Returns an error if the prepared upload used merkle payment (use
1285    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1286    /// or any chunk cannot be stored.
1287    pub async fn finalize_upload(
1288        &self,
1289        prepared: PreparedUpload,
1290        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1291    ) -> Result<FileUploadResult> {
1292        self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1293            .await
1294    }
1295
1296    /// Phase 2 of external-signer upload (wave-batch) with progress events.
1297    ///
1298    /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1299    /// on the provided channel as each chunk is successfully stored.
1300    ///
1301    /// # Errors
1302    ///
1303    /// Same as [`Client::finalize_upload`].
1304    pub async fn finalize_upload_with_progress(
1305        &self,
1306        prepared: PreparedUpload,
1307        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1308        progress: Option<mpsc::Sender<UploadEvent>>,
1309    ) -> Result<FileUploadResult> {
1310        let data_map_address = prepared.data_map_address;
1311        let already_stored_addresses = prepared.already_stored_addresses;
1312        let already_stored_count = already_stored_addresses.len();
1313        let total_chunks = prepared.total_chunks;
1314        match prepared.payment_info {
1315            ExternalPaymentInfo::WaveBatch {
1316                prepared_chunks,
1317                payment_intent: _,
1318            } => {
1319                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1320                let wave_result = self
1321                    .store_paid_chunks_with_events(
1322                        paid_chunks,
1323                        progress.as_ref(),
1324                        already_stored_count,
1325                        total_chunks,
1326                    )
1327                    .await;
1328                if !wave_result.failed.is_empty() {
1329                    let failed_count = wave_result.failed.len();
1330                    let stored_count = already_stored_count + wave_result.stored.len();
1331                    let mut stored = already_stored_addresses;
1332                    stored.extend(wave_result.stored);
1333                    return Err(Error::PartialUpload {
1334                        stored,
1335                        stored_count,
1336                        failed: wave_result.failed,
1337                        failed_count,
1338                        total_chunks,
1339                        reason: "finalize_upload: chunk storage failed after retries".into(),
1340                    });
1341                }
1342                let chunks_stored = already_stored_count + wave_result.stored.len();
1343
1344                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1345
1346                let mut stats = WaveAggregateStats::default();
1347                stats.absorb(&wave_result);
1348
1349                Ok(FileUploadResult {
1350                    data_map: prepared.data_map,
1351                    chunks_stored,
1352                    chunks_failed: 0,
1353                    total_chunks,
1354                    payment_mode_used: PaymentMode::Single,
1355                    storage_cost_atto: "0".into(),
1356                    gas_cost_wei: 0,
1357                    data_map_address,
1358                    chunk_attempts_total: stats.chunk_attempts_total,
1359                    store_durations_ms: stats.store_durations_ms,
1360                    retries_histogram: stats.retries_histogram,
1361                })
1362            }
1363            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1364                "Cannot finalize merkle upload with wave-batch tx hashes. \
1365                 Use finalize_upload_merkle() instead."
1366                    .to_string(),
1367            )),
1368        }
1369    }
1370
1371    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1372    ///
1373    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1374    /// returned by the on-chain merkle payment transaction. Generates proofs and
1375    /// stores chunks on the network.
1376    ///
1377    /// # Errors
1378    ///
1379    /// Returns an error if the prepared upload used wave-batch payment (use
1380    /// [`Client::finalize_upload`] instead), proof generation fails,
1381    /// or any chunk cannot be stored.
1382    pub async fn finalize_upload_merkle(
1383        &self,
1384        prepared: PreparedUpload,
1385        winner_pool_hash: [u8; 32],
1386    ) -> Result<FileUploadResult> {
1387        self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1388            .await
1389    }
1390
1391    /// Phase 2 of external-signer upload (merkle) with progress events.
1392    ///
1393    /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1394    /// on the provided channel as each chunk is successfully stored.
1395    ///
1396    /// # Errors
1397    ///
1398    /// Same as [`Client::finalize_upload_merkle`].
1399    pub async fn finalize_upload_merkle_with_progress(
1400        &self,
1401        prepared: PreparedUpload,
1402        winner_pool_hash: [u8; 32],
1403        progress: Option<mpsc::Sender<UploadEvent>>,
1404    ) -> Result<FileUploadResult> {
1405        let data_map_address = prepared.data_map_address;
1406        let already_stored_count = prepared.already_stored_addresses.len();
1407        let total_chunks = prepared.total_chunks;
1408        match prepared.payment_info {
1409            ExternalPaymentInfo::Merkle {
1410                prepared_batch,
1411                chunk_contents,
1412                chunk_addresses,
1413            } => {
1414                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1415                let outcome = self
1416                    .merkle_upload_chunks(
1417                        chunk_contents,
1418                        chunk_addresses,
1419                        &batch_result,
1420                        progress.as_ref(),
1421                        already_stored_count,
1422                        total_chunks,
1423                    )
1424                    .await?;
1425
1426                info!(
1427                    "External-signer merkle upload finalized: {} chunks stored, {} failed",
1428                    outcome.stored, outcome.failed
1429                );
1430
1431                Ok(FileUploadResult {
1432                    data_map: prepared.data_map,
1433                    chunks_stored: outcome.stored,
1434                    chunks_failed: outcome.failed,
1435                    total_chunks,
1436                    payment_mode_used: PaymentMode::Merkle,
1437                    storage_cost_atto: "0".into(),
1438                    gas_cost_wei: 0,
1439                    data_map_address,
1440                    chunk_attempts_total: outcome.stats.chunk_attempts_total,
1441                    store_durations_ms: outcome.stats.store_durations_ms,
1442                    retries_histogram: outcome.stats.retries_histogram,
1443                })
1444            }
1445            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1446                "Cannot finalize wave-batch upload with merkle winner hash. \
1447                 Use finalize_upload() instead."
1448                    .to_string(),
1449            )),
1450        }
1451    }
1452
1453    /// Upload a file with a specific payment mode.
1454    ///
1455    /// Before encryption, checks that the temp directory has enough free
1456    /// disk space for the spilled chunks (~1.1× source file size).
1457    ///
1458    /// Encrypted chunks are spilled to a temp directory during encryption
1459    /// so that only their 32-byte addresses stay in memory. At upload time,
1460    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1461    ///
1462    /// # Errors
1463    ///
1464    /// Returns an error if there is insufficient disk space, the file cannot
1465    /// be read, encryption fails, or any chunk cannot be stored.
1466    #[allow(clippy::too_many_lines)]
1467    pub async fn file_upload_with_mode(
1468        &self,
1469        path: &Path,
1470        mode: PaymentMode,
1471    ) -> Result<FileUploadResult> {
1472        self.file_upload_with_progress(path, mode, None).await
1473    }
1474
1475    /// Upload a file with progress events sent to the given channel.
1476    ///
1477    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1478    /// provided channel for UI progress feedback.
1479    #[allow(clippy::too_many_lines)]
1480    pub async fn file_upload_with_progress(
1481        &self,
1482        path: &Path,
1483        mode: PaymentMode,
1484        progress: Option<mpsc::Sender<UploadEvent>>,
1485    ) -> Result<FileUploadResult> {
1486        debug!(
1487            "Streaming file upload with mode {mode:?}: {}",
1488            path.display()
1489        );
1490
1491        // Pre-flight: verify enough temp disk space for the chunk spill.
1492        let file_size = std::fs::metadata(path)?.len();
1493        check_disk_space_for_spill(file_size)?;
1494
1495        // Phase 1: Encrypt file and spill chunks to temp directory.
1496        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1497        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1498
1499        let chunk_count = spill.len();
1500        info!(
1501            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1502            path.display()
1503        );
1504        if let Some(ref tx) = progress {
1505            let _ = tx
1506                .send(UploadEvent::Encrypted {
1507                    total_chunks: chunk_count,
1508                })
1509                .await;
1510        }
1511
1512        // Phase 2: Decide payment mode and upload in waves from disk.
1513        //
1514        // For the merkle path, attempt to resume from a cached
1515        // receipt before paying again. The cache is keyed by the
1516        // CANONICAL source path so `./foo`, `/abs/foo`, and any
1517        // symlink alias all resolve to the same cache entry — a
1518        // crash-and-retry from a different cwd or via a different
1519        // alias still hits the receipt. Canonicalize may fail (the
1520        // file could have been moved between phase 1 and here); we
1521        // fall back to the display string in that case, which
1522        // preserves pre-fix behaviour rather than dropping cache
1523        // resume entirely.
1524        let file_path_key = std::fs::canonicalize(path)
1525            .map(|p| p.display().to_string())
1526            .unwrap_or_else(|_| path.display().to_string());
1527        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1528            .should_use_merkle(chunk_count, mode)
1529        {
1530            info!("Using merkle batch payment for {chunk_count} file chunks");
1531
1532            let cached_merkle =
1533                crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1534                    .map(|(_cache_path, cached)| cached);
1535
1536            let merkle_plan = match self
1537                .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
1538                .await
1539            {
1540                Ok(plan) => plan,
1541                Err(e) => {
1542                    if let Some(cached) = cached_merkle
1543                        .as_ref()
1544                        .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
1545                    {
1546                        info!(
1547                            "Merkle preflight failed ({e}); \
1548                             resuming with cached merkle proofs"
1549                        );
1550                        let (stored, sc, gc, stats) = self
1551                            .upload_waves_merkle(
1552                                &spill,
1553                                &spill.addresses,
1554                                cached,
1555                                &[],
1556                                progress.as_ref(),
1557                            )
1558                            .await?;
1559                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1560                        return Ok(FileUploadResult {
1561                            data_map,
1562                            chunks_stored: stored,
1563                            chunks_failed: 0,
1564                            total_chunks: chunk_count,
1565                            payment_mode_used: PaymentMode::Merkle,
1566                            storage_cost_atto: sc,
1567                            gas_cost_wei: gc,
1568                            data_map_address: None,
1569                            chunk_attempts_total: stats.chunk_attempts_total,
1570                            store_durations_ms: stats.store_durations_ms,
1571                            retries_histogram: stats.retries_histogram,
1572                        });
1573                    }
1574                    match &e {
1575                        Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
1576                            info!(
1577                                "Merkle preflight needs more peers ({msg}), \
1578                                 falling back to wave-batch"
1579                            );
1580                            let (stored, sc, gc, fb_stats) = self
1581                                .upload_waves_single(
1582                                    &spill,
1583                                    progress.as_ref(),
1584                                    Some(&file_path_key),
1585                                )
1586                                .await?;
1587                            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1588                            return Ok(FileUploadResult {
1589                                data_map,
1590                                chunks_stored: stored,
1591                                chunks_failed: 0,
1592                                total_chunks: chunk_count,
1593                                payment_mode_used: PaymentMode::Single,
1594                                storage_cost_atto: sc,
1595                                gas_cost_wei: gc,
1596                                data_map_address: None,
1597                                chunk_attempts_total: fb_stats.chunk_attempts_total,
1598                                store_durations_ms: fb_stats.store_durations_ms,
1599                                retries_histogram: fb_stats.retries_histogram,
1600                            });
1601                        }
1602                        _ => return Err(e),
1603                    }
1604                }
1605            };
1606
1607            if merkle_plan.to_upload.is_empty() {
1608                info!("All {chunk_count} merkle chunks already stored; skipping payment");
1609                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1610                crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1611                (
1612                    chunk_count,
1613                    PaymentMode::Merkle,
1614                    "0".to_string(),
1615                    0,
1616                    WaveAggregateStats::default(),
1617                )
1618            } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
1619                let remaining_chunks = merkle_plan.to_upload.len();
1620                if let Some(cached) = cached_merkle
1621                    .as_ref()
1622                    .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
1623                {
1624                    info!(
1625                        "{remaining_chunks} chunks remain below merkle threshold; \
1626                         reusing cached merkle proofs"
1627                    );
1628                    let (stored, sc, gc, stats) = self
1629                        .upload_waves_merkle(
1630                            &spill,
1631                            &merkle_plan.to_upload,
1632                            cached,
1633                            &merkle_plan.already_stored,
1634                            progress.as_ref(),
1635                        )
1636                        .await?;
1637                    crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1638                    (stored, PaymentMode::Merkle, sc, gc, stats)
1639                } else {
1640                    if cached_merkle.is_some() {
1641                        info!(
1642                            "{remaining_chunks} chunks remain below merkle threshold, \
1643                             and the cached merkle receipt does not cover them. \
1644                             Discarding cache and using single-node payment."
1645                        );
1646                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1647                    } else {
1648                        info!(
1649                            "{remaining_chunks} chunks need upload after merkle preflight; \
1650                             using single-node payment"
1651                        );
1652                    }
1653                    let (stored, sc, gc, stats) = self
1654                        .upload_spill_addresses_single(
1655                            &spill,
1656                            &merkle_plan.to_upload,
1657                            progress.as_ref(),
1658                            merkle_plan.already_stored.len(),
1659                            chunk_count,
1660                            Some(&file_path_key),
1661                        )
1662                        .await?;
1663                    crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1664                    (stored, PaymentMode::Single, sc, gc, stats)
1665                }
1666            } else {
1667                let batch_result = if let Some(cached) = cached_merkle.as_ref() {
1668                    // Validate the cache against the chunks that still need
1669                    // storage. Extra proofs are harmless: a previous attempt
1670                    // may have paid for chunks that are now already stored.
1671                    if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
1672                        info!(
1673                            "Skipping merkle payment phase; resuming with \
1674                             cached proofs for {} remaining chunks",
1675                            merkle_plan.to_upload.len()
1676                        );
1677                        Ok(cached.clone())
1678                    } else {
1679                        info!(
1680                            "Cached merkle receipt does not cover the current \
1681                             remaining chunks (cached={}, remaining={}). \
1682                             Discarding cache and paying fresh.",
1683                            cached.proofs.len(),
1684                            merkle_plan.to_upload.len()
1685                        );
1686                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1687                        self.pay_for_merkle_batch(
1688                            &merkle_plan.to_upload,
1689                            DATA_TYPE_CHUNK,
1690                            merkle_plan.to_upload_avg_size(),
1691                        )
1692                        .await
1693                        .inspect(|result| {
1694                            crate::data::client::cached_merkle::try_save(&file_path_key, result);
1695                        })
1696                    }
1697                } else {
1698                    self.pay_for_merkle_batch(
1699                        &merkle_plan.to_upload,
1700                        DATA_TYPE_CHUNK,
1701                        merkle_plan.to_upload_avg_size(),
1702                    )
1703                    .await
1704                    .inspect(|result| {
1705                        // Save BEFORE the store phase so a crash
1706                        // mid-upload leaves a resumable receipt.
1707                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
1708                    })
1709                };
1710
1711                let batch_result = match batch_result {
1712                    Ok(result) => result,
1713                    Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1714                        info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1715                        let (stored, sc, gc, fb_stats) = self
1716                            .upload_spill_addresses_single(
1717                                &spill,
1718                                &merkle_plan.to_upload,
1719                                progress.as_ref(),
1720                                merkle_plan.already_stored.len(),
1721                                chunk_count,
1722                                Some(&file_path_key),
1723                            )
1724                            .await?;
1725                        crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1726                        return Ok(FileUploadResult {
1727                            data_map,
1728                            chunks_stored: stored,
1729                            chunks_failed: 0,
1730                            total_chunks: chunk_count,
1731                            payment_mode_used: PaymentMode::Single,
1732                            storage_cost_atto: sc,
1733                            gas_cost_wei: gc,
1734                            data_map_address: None,
1735                            chunk_attempts_total: fb_stats.chunk_attempts_total,
1736                            store_durations_ms: fb_stats.store_durations_ms,
1737                            retries_histogram: fb_stats.retries_histogram,
1738                        });
1739                    }
1740                    Err(e) => return Err(e),
1741                };
1742
1743                let (stored, sc, gc, stats) = self
1744                    .upload_waves_merkle(
1745                        &spill,
1746                        &merkle_plan.to_upload,
1747                        &batch_result,
1748                        &merkle_plan.already_stored,
1749                        progress.as_ref(),
1750                    )
1751                    .await?;
1752                // Upload succeeded end-to-end; the cached receipt is
1753                // no longer needed.
1754                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1755                (stored, PaymentMode::Merkle, sc, gc, stats)
1756            }
1757        } else {
1758            let (stored, sc, gc, stats) = self
1759                .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
1760                .await?;
1761            // Full file success: drop any cached single-node receipt.
1762            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1763            (stored, PaymentMode::Single, sc, gc, stats)
1764        };
1765
1766        info!(
1767            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1768            path.display()
1769        );
1770
1771        Ok(FileUploadResult {
1772            data_map,
1773            chunks_stored,
1774            chunks_failed: 0,
1775            total_chunks: chunk_count,
1776            payment_mode_used: actual_mode,
1777            storage_cost_atto,
1778            gas_cost_wei,
1779            data_map_address: None,
1780            chunk_attempts_total: stats.chunk_attempts_total,
1781            store_durations_ms: stats.store_durations_ms,
1782            retries_histogram: stats.retries_histogram,
1783        })
1784    }
1785
1786    /// Encrypt a file and spill chunks to a temp directory.
1787    ///
1788    /// Logs progress every 100 chunks so users get feedback during
1789    /// multi-GB encryptions.
1790    ///
1791    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1792    async fn encrypt_file_to_spill(
1793        &self,
1794        path: &Path,
1795        progress: Option<&mpsc::Sender<UploadEvent>>,
1796    ) -> Result<(ChunkSpill, DataMap)> {
1797        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1798
1799        let mut spill = ChunkSpill::new()?;
1800        while let Some(content) = chunk_rx.recv().await {
1801            spill.push(&content)?;
1802            let chunks_done = spill.len();
1803            if let Some(tx) = progress {
1804                if chunks_done.is_multiple_of(10) {
1805                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1806                }
1807            }
1808            if chunks_done % 100 == 0 {
1809                let mb = spill.total_bytes() / (1024 * 1024);
1810                info!(
1811                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1812                    path.display()
1813                );
1814            }
1815        }
1816
1817        // Await encryption completion to catch errors before paying.
1818        handle
1819            .await
1820            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1821            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1822
1823        let data_map = datamap_rx
1824            .await
1825            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1826
1827        Ok((spill, data_map))
1828    }
1829
1830    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1831    ///
1832    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1833    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1834    ///
1835    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1836    async fn upload_waves_single(
1837        &self,
1838        spill: &ChunkSpill,
1839        progress: Option<&mpsc::Sender<UploadEvent>>,
1840        resume_key: Option<&str>,
1841    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1842        self.upload_spill_addresses_single(
1843            spill,
1844            &spill.addresses,
1845            progress,
1846            0,
1847            spill.len(),
1848            resume_key,
1849        )
1850        .await
1851    }
1852
1853    async fn upload_spill_addresses_single(
1854        &self,
1855        spill: &ChunkSpill,
1856        addresses: &[[u8; 32]],
1857        progress: Option<&mpsc::Sender<UploadEvent>>,
1858        stored_offset: usize,
1859        total_chunks: usize,
1860        resume_key: Option<&str>,
1861    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1862        let mut total_stored = stored_offset;
1863        let mut total_storage = Amount::ZERO;
1864        let mut total_gas: u128 = 0;
1865        let mut agg_stats = WaveAggregateStats::default();
1866        let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
1867        let wave_count = waves.len();
1868
1869        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1870            let wave_num = wave_idx + 1;
1871            let wave_data: Vec<Bytes> = wave_addrs
1872                .iter()
1873                .map(|addr| spill.read_chunk(addr))
1874                .collect::<Result<Vec<_>>>()?;
1875
1876            info!(
1877                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1878                wave_data.len()
1879            );
1880            if let Some(tx) = progress {
1881                let _ = tx
1882                    .send(UploadEvent::QuotingChunks {
1883                        wave: wave_num,
1884                        total_waves: wave_count,
1885                        chunks_in_wave: wave_data.len(),
1886                    })
1887                    .await;
1888            }
1889            let (addresses, wave_storage, wave_gas, wave_stats) = self
1890                .batch_upload_chunks_with_events(
1891                    wave_data,
1892                    progress,
1893                    total_stored,
1894                    total_chunks,
1895                    resume_key,
1896                )
1897                .await?;
1898            total_stored += addresses.len();
1899            if let Ok(cost) = wave_storage.parse::<Amount>() {
1900                total_storage += cost;
1901            }
1902            total_gas = total_gas.saturating_add(wave_gas);
1903            // Merge per-call stats (each call already aggregates across the
1904            // waves it ran internally, so a simple sum/extend is correct).
1905            agg_stats.chunk_attempts_total = agg_stats
1906                .chunk_attempts_total
1907                .saturating_add(wave_stats.chunk_attempts_total);
1908            agg_stats
1909                .store_durations_ms
1910                .extend(wave_stats.store_durations_ms);
1911            for (slot, count) in agg_stats
1912                .retries_histogram
1913                .iter_mut()
1914                .zip(wave_stats.retries_histogram.iter())
1915            {
1916                *slot = slot.saturating_add(*count);
1917            }
1918            if let Some(tx) = progress {
1919                let _ = tx
1920                    .send(UploadEvent::WaveComplete {
1921                        wave: wave_num,
1922                        total_waves: wave_count,
1923                        stored_so_far: total_stored,
1924                        total: total_chunks,
1925                    })
1926                    .await;
1927            }
1928        }
1929
1930        Ok((
1931            total_stored,
1932            total_storage.to_string(),
1933            total_gas,
1934            agg_stats,
1935        ))
1936    }
1937
1938    /// Upload chunks from a spill using pre-computed merkle proofs.
1939    ///
1940    /// Reads one wave at a time from disk, pairs each chunk with its proof,
1941    /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1942    ///
1943    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1944    /// Costs come from the `batch_result` which was populated during payment.
1945    async fn upload_waves_merkle(
1946        &self,
1947        spill: &ChunkSpill,
1948        addresses: &[[u8; 32]],
1949        batch_result: &MerkleBatchPaymentResult,
1950        already_stored_addresses: &[[u8; 32]],
1951        progress: Option<&mpsc::Sender<UploadEvent>>,
1952    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1953        let mut total_stored = already_stored_addresses.len();
1954        let total_chunks = total_stored + addresses.len();
1955        let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
1956        let wave_count = waves.len();
1957        let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
1958        let mut agg_stats = WaveAggregateStats::default();
1959
1960        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1961            let wave_num = wave_idx + 1;
1962            let wave = spill.read_wave(wave_addrs)?;
1963
1964            info!(
1965                "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1966                wave.len()
1967            );
1968
1969            let store_limiter = self.controller().store.clone();
1970            // Clamp fan-out to wave size — partial last wave should
1971            // not pay for extra slots (see PERF-RESULTS.md).
1972            let store_concurrency = store_limiter.current().min(wave.len().max(1));
1973            let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1974                let proof_bytes = batch_result.proofs.get(&addr).cloned();
1975                let limiter = store_limiter.clone();
1976                async move {
1977                    let started = std::time::Instant::now();
1978                    let proof = proof_bytes.ok_or_else(|| {
1979                        (
1980                            addr,
1981                            Error::Payment(format!(
1982                                "Missing merkle proof for chunk {}",
1983                                hex::encode(addr)
1984                            )),
1985                            started,
1986                        )
1987                    })?;
1988                    let peers = self
1989                        .close_group_peers(&addr)
1990                        .await
1991                        .map_err(|e| (addr, e, started))?;
1992                    observe_op(
1993                        &limiter,
1994                        || async move {
1995                            self.chunk_put_to_close_group(content, proof, &peers).await
1996                        },
1997                        classify_error,
1998                    )
1999                    .await
2000                    .map(|_| (addr, started))
2001                    .map_err(|e| (addr, e, started))
2002                }
2003            }))
2004            .buffer_unordered(store_concurrency);
2005
2006            while let Some(result) = upload_stream.next().await {
2007                match result {
2008                    Ok((addr, started)) => {
2009                        let duration_ms =
2010                            u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
2011                        agg_stats.store_durations_ms.push(duration_ms);
2012                        agg_stats.chunk_attempts_total =
2013                            agg_stats.chunk_attempts_total.saturating_add(1);
2014                        agg_stats.retries_histogram[0] =
2015                            agg_stats.retries_histogram[0].saturating_add(1);
2016                        stored_addresses.push(addr);
2017                        total_stored += 1;
2018                        info!("Stored {total_stored}/{total_chunks}");
2019                        if let Some(tx) = progress {
2020                            let _ = tx
2021                                .send(UploadEvent::ChunkStored {
2022                                    stored: total_stored,
2023                                    total: total_chunks,
2024                                })
2025                                .await;
2026                        }
2027                    }
2028                    Err((addr, e, _started)) => {
2029                        warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
2030                        return Err(Error::PartialUpload {
2031                            stored: stored_addresses,
2032                            stored_count: total_stored,
2033                            failed: vec![(addr, e.to_string())],
2034                            failed_count: 1,
2035                            total_chunks,
2036                            reason: format!("merkle chunk upload failed: {e}"),
2037                        });
2038                    }
2039                }
2040            }
2041
2042            if let Some(tx) = progress {
2043                let _ = tx
2044                    .send(UploadEvent::WaveComplete {
2045                        wave: wave_num,
2046                        total_waves: wave_count,
2047                        stored_so_far: total_stored,
2048                        total: total_chunks,
2049                    })
2050                    .await;
2051            }
2052        }
2053
2054        Ok((
2055            total_stored,
2056            batch_result.storage_cost_atto.clone(),
2057            batch_result.gas_cost_wei,
2058            agg_stats,
2059        ))
2060    }
2061
2062    /// Download and decrypt a file from the network, writing it to disk.
2063    ///
2064    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2065    /// memory at a time, avoiding OOM on large files. Chunks are fetched
2066    /// concurrently within each batch, then decrypted data is written to
2067    /// disk incrementally.
2068    ///
2069    /// Returns the number of bytes written.
2070    ///
2071    /// # Panics
2072    ///
2073    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2074    /// Will panic if called from a `current_thread` runtime because
2075    /// `streaming_decrypt` takes a synchronous callback that must bridge
2076    /// back to async via `block_in_place`.
2077    ///
2078    /// # Errors
2079    ///
2080    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2081    /// or the file cannot be written.
2082    #[allow(clippy::unused_async)]
2083    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2084        self.file_download_with_progress(data_map, output, None)
2085            .await
2086    }
2087
2088    /// Download and decrypt a file with progress events.
2089    ///
2090    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
2091    ///
2092    /// Progress reporting:
2093    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
2094    ///    `ChunksFetched` with `total: 0` during resolution)
2095    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
2096    /// 3. Fetches data chunks with accurate `fetched/total` progress
2097    #[allow(clippy::unused_async)]
2098    pub async fn file_download_with_progress(
2099        &self,
2100        data_map: &DataMap,
2101        output: &Path,
2102        progress: Option<mpsc::Sender<DownloadEvent>>,
2103    ) -> Result<u64> {
2104        debug!("Downloading file to {}", output.display());
2105
2106        let handle = Handle::current();
2107
2108        // Phase 1: Resolve hierarchical DataMap to root level.
2109        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
2110        let root_map = if data_map.is_child() {
2111            let dm_chunks = data_map.len();
2112            if let Some(ref tx) = progress {
2113                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
2114                    total_map_chunks: dm_chunks,
2115                });
2116            }
2117
2118            let resolve_progress = progress.clone();
2119            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2120
2121            let resolved = tokio::task::block_in_place(|| {
2122                let counter_ref = resolve_counter.clone();
2123                let progress_ref = resolve_progress.clone();
2124                let fetch_limiter = self.controller().fetch.clone();
2125                let fetch = |batch: &[(usize, XorName)]| {
2126                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2127                    let counter = counter_ref.clone();
2128                    let prog = progress_ref.clone();
2129                    let limiter = fetch_limiter.clone();
2130                    handle.block_on(async {
2131                        // Use rebucketed_unordered so the in-flight cap
2132                        // is re-read from the limiter as each slot frees.
2133                        // `buffer_unordered` snapshots the cap once at
2134                        // pipeline build, which means observe_op
2135                        // signals from inside chunk_get cannot reduce
2136                        // concurrency on the current batch — exactly
2137                        // the case where load-shedding is needed.
2138                        let mut results = rebucketed_unordered(
2139                            &limiter,
2140                            batch_owned,
2141                            |(idx, hash): (usize, XorName)| {
2142                                let counter = counter.clone();
2143                                let prog = prog.clone();
2144                                async move {
2145                                    let addr = hash.0;
2146                                    // chunk_get_observed feeds the
2147                                    // adaptive fetch limiter once per
2148                                    // call via chunk_get_outcome
2149                                    // (Ok(None) -> Timeout is the
2150                                    // load-shedding signal for
2151                                    // sustained close-group exhaustion).
2152                                    let chunk = self
2153                                        .chunk_get_observed(&addr)
2154                                        .await
2155                                        .map_err(|e| {
2156                                            self_encryption::Error::Generic(format!(
2157                                                "DataMap resolution failed: {e}"
2158                                            ))
2159                                        })?
2160                                        .ok_or_else(|| {
2161                                            self_encryption::Error::Generic(format!(
2162                                                "DataMap chunk not found: {}",
2163                                                hex::encode(addr)
2164                                            ))
2165                                        })?;
2166                                    let fetched = counter
2167                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
2168                                        + 1;
2169                                    if let Some(ref tx) = prog {
2170                                        let _ =
2171                                            tx.try_send(DownloadEvent::MapChunkFetched { fetched });
2172                                    }
2173                                    Ok::<_, self_encryption::Error>((idx, chunk.content))
2174                                }
2175                            },
2176                        )
2177                        .await?;
2178                        // CRITICAL: self_encryption::get_root_data_map_parallel
2179                        // pairs the returned Vec POSITIONALLY with the input
2180                        // hashes via .zip() and discards our idx field.
2181                        // rebucketed_unordered preserves first-completion
2182                        // order, so sort by idx to restore input order
2183                        // before returning.
2184                        results.sort_by_key(|(idx, _)| *idx);
2185                        Ok(results)
2186                    })
2187                };
2188                get_root_data_map_parallel(data_map.clone(), &fetch)
2189            })
2190            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
2191
2192            info!(
2193                "Resolved hierarchical DataMap: {} data chunks",
2194                resolved.len()
2195            );
2196            resolved
2197        } else {
2198            data_map.clone()
2199        };
2200
2201        // Phase 2: Now we know the real chunk count.
2202        let total_chunks = root_map.len();
2203        if let Some(ref tx) = progress {
2204            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
2205        }
2206
2207        // Phase 3: Fetch and decrypt data chunks with accurate progress.
2208        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2209        let fetched_for_closure = fetched_counter.clone();
2210        let progress_for_closure = progress.clone();
2211
2212        let fetch_limiter_outer = self.controller().fetch.clone();
2213        let usable_memory = usable_memory_bytes();
2214        let configured_batch_floor = stream_decrypt_batch_size();
2215        let fetch_cap = fetch_limiter_outer.current();
2216        let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
2217            total_chunks,
2218            fetch_cap,
2219            configured_batch_floor,
2220            usable_memory,
2221        );
2222        info!(
2223            total_chunks,
2224            fetch_cap,
2225            configured_batch_floor,
2226            ?usable_memory,
2227            decrypt_batch_size,
2228            "Selected adaptive stream decrypt batch size"
2229        );
2230
2231        let stream = streaming_decrypt_with_batch_size(
2232            &root_map,
2233            |batch: &[(usize, XorName)]| {
2234                let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2235                let fetched_ref = fetched_for_closure.clone();
2236                let progress_ref = progress_for_closure.clone();
2237                let fetch_limiter = fetch_limiter_outer.clone();
2238
2239                tokio::task::block_in_place(|| {
2240                    handle.block_on(async {
2241                        // First pass: try every chunk in the batch via
2242                        // chunk_get_observed (which already does its own
2243                        // first-attempt + retry sweep). A chunk that
2244                        // returns Ok(None) here is NOT a fatal failure
2245                        // — it's a candidate for a deferred retry below.
2246                        // We carry the chunk's XorName through so the
2247                        // retry pass can re-fetch by address.
2248                        //
2249                        // The closure ONLY returns Err on a true
2250                        // protocol/network error from chunk_get (the
2251                        // Err variant). Ok(None) is encoded as
2252                        // `Err(addr)` in the inner Result so the outer
2253                        // rebucketed pass doesn't early-abort on it.
2254                        type BatchEntry =
2255                            (usize, std::result::Result<bytes::Bytes, XorName>);
2256                        let raw: Vec<BatchEntry> = rebucketed_unordered(
2257                            &fetch_limiter,
2258                            batch_owned,
2259                            |(idx, hash): (usize, XorName)| {
2260                                let fetched_ref = fetched_ref.clone();
2261                                let progress_ref = progress_ref.clone();
2262                                async move {
2263                                    let addr = hash.0;
2264                                    let addr_hex = hex::encode(addr);
2265                                    match self.chunk_get_observed(&addr).await {
2266                                        Ok(Some(chunk)) => {
2267                                            let fetched = fetched_ref.fetch_add(
2268                                                1,
2269                                                std::sync::atomic::Ordering::Relaxed,
2270                                            ) + 1;
2271                                            info!("Downloaded {fetched}/{total_chunks}");
2272                                            if let Some(ref tx) = progress_ref {
2273                                                let _ = tx.try_send(
2274                                                    DownloadEvent::ChunksFetched {
2275                                                        fetched,
2276                                                        total: total_chunks,
2277                                                    },
2278                                                );
2279                                            }
2280                                            Ok::<BatchEntry, self_encryption::Error>((
2281                                                idx,
2282                                                Ok(chunk.content),
2283                                            ))
2284                                        }
2285                                        // chunk_get returned Ok(None): defer
2286                                        // this chunk for a later retry rather
2287                                        // than aborting the whole batch.
2288                                        Ok(None) => Ok((idx, Err(hash))),
2289                                        // A transient error for one chunk
2290                                        // (e.g. its close-group DHT walk
2291                                        // erroring on this pass) must not
2292                                        // abort a multi-hundred-chunk
2293                                        // download. Defer it to the retry
2294                                        // rounds, same as Ok(None); only a
2295                                        // chunk that survives all deferred
2296                                        // rounds is fatal.
2297                                        Err(e) => {
2298                                            info!(
2299                                                "First-pass fetch error for {addr_hex}: {e}; deferring"
2300                                            );
2301                                            Ok((idx, Err(hash)))
2302                                        }
2303                                    }
2304                                }
2305                            },
2306                        )
2307                        .await?;
2308
2309                        // Partition: things we already have vs the
2310                        // deferred set we need to retry.
2311                        let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
2312                        let mut deferred: Vec<(usize, XorName)> = Vec::new();
2313                        for (idx, inner) in raw {
2314                            match inner {
2315                                Ok(bytes) => results.push((idx, bytes)),
2316                                Err(hash) => deferred.push((idx, hash)),
2317                            }
2318                        }
2319
2320                        // Deferred retry pass: retry the deferred chunks
2321                        // in CONCURRENT rounds (reusing the fetch
2322                        // limiter's cap), not serially. The first round
2323                        // fires immediately — most deferrals on a
2324                        // healthy-but-lossy link are peer-side noise
2325                        // that clears in well under a second, and
2326                        // serializing them behind mandatory multi-second
2327                        // sleeps was the single biggest throughput sink
2328                        // on such links (a batch deferring ~20 chunks
2329                        // burned minutes of near-zero throughput even
2330                        // though every chunk succeeded on its first
2331                        // retry). Only chunks that survive a round get a
2332                        // longer back-off before the next, so genuine
2333                        // saturation still gets time to settle.
2334                        if !deferred.is_empty() {
2335                            // Round delays in seconds. Round 0 is
2336                            // immediate; later rounds back off to ride
2337                            // out sustained saturation.
2338                            const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
2339                            info!(
2340                                "Deferring {} chunk(s) for concurrent retry after batch settles",
2341                                deferred.len()
2342                            );
2343                            let mut remaining = deferred;
2344                            for (round, &delay_secs) in DEFERRED_ROUND_DELAYS_SECS
2345                                .iter()
2346                                .enumerate()
2347                            {
2348                                if remaining.is_empty() {
2349                                    break;
2350                                }
2351                                if delay_secs > 0 {
2352                                    tokio::time::sleep(std::time::Duration::from_secs(
2353                                        delay_secs,
2354                                    ))
2355                                    .await;
2356                                }
2357                                info!(
2358                                    "Deferred retry round {}/{}: {} chunk(s)",
2359                                    round + 1,
2360                                    DEFERRED_ROUND_DELAYS_SECS.len(),
2361                                    remaining.len(),
2362                                );
2363                                let round_input = std::mem::take(&mut remaining);
2364                                let round_results: Vec<BatchEntry> = rebucketed_unordered(
2365                                    &fetch_limiter,
2366                                    round_input,
2367                                    |(idx, hash): (usize, XorName)| {
2368                                        let fetched_ref = fetched_ref.clone();
2369                                        let progress_ref = progress_ref.clone();
2370                                        async move {
2371                                            let addr = hash.0;
2372                                            // Both Ok(None) and a transient
2373                                            // Err re-defer the chunk to the
2374                                            // next round rather than
2375                                            // aborting; only the final
2376                                            // round's leftovers are fatal.
2377                                            match self.chunk_get_observed(&addr).await {
2378                                                Ok(Some(chunk)) => {
2379                                                    let fetched = fetched_ref.fetch_add(
2380                                                        1,
2381                                                        std::sync::atomic::Ordering::Relaxed,
2382                                                    ) + 1;
2383                                                    info!(
2384                                                        "Downloaded {fetched}/{total_chunks} (deferred retry)"
2385                                                    );
2386                                                    if let Some(ref tx) = progress_ref {
2387                                                        let _ = tx.try_send(
2388                                                            DownloadEvent::ChunksFetched {
2389                                                                fetched,
2390                                                                total: total_chunks,
2391                                                            },
2392                                                        );
2393                                                    }
2394                                                    Ok::<BatchEntry, self_encryption::Error>((
2395                                                        idx,
2396                                                        Ok(chunk.content),
2397                                                    ))
2398                                                }
2399                                                Ok(None) => Ok((idx, Err(hash))),
2400                                                Err(e) => {
2401                                                    info!(
2402                                                        "Deferred retry for {} hit transient error: {e}; re-deferring",
2403                                                        hex::encode(addr)
2404                                                    );
2405                                                    Ok((idx, Err(hash)))
2406                                                }
2407                                            }
2408                                        }
2409                                    },
2410                                )
2411                                .await?;
2412                                for (idx, inner) in round_results {
2413                                    match inner {
2414                                        Ok(bytes) => results.push((idx, bytes)),
2415                                        Err(hash) => remaining.push((idx, hash)),
2416                                    }
2417                                }
2418                            }
2419                            if let Some((_, hash)) = remaining.first() {
2420                                return Err(self_encryption::Error::Generic(format!(
2421                                    "Chunk not found after {} deferred retry rounds: {}",
2422                                    DEFERRED_ROUND_DELAYS_SECS.len(),
2423                                    hex::encode(hash.0),
2424                                )));
2425                            }
2426                        }
2427
2428                        // streaming_decrypt itself sort_by_keys before
2429                        // zipping, but the same closure is also passed
2430                        // through get_root_data_map_parallel internally
2431                        // (see self_encryption::stream_decrypt.rs::new), and
2432                        // THAT path zips positionally without sorting. Sort
2433                        // here so both consumers see input order.
2434                        results.sort_by_key(|(idx, _)| *idx);
2435                        Ok(results)
2436                    })
2437                })
2438            },
2439            decrypt_batch_size,
2440        )
2441        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
2442
2443        // Write decrypted chunks to a temp file, then rename atomically.
2444        let parent = output.parent().unwrap_or_else(|| Path::new("."));
2445        let unique: u64 = rand::random();
2446        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
2447
2448        let write_result = (|| -> Result<u64> {
2449            let mut file = std::fs::File::create(&tmp_path)?;
2450            let mut bytes_written = 0u64;
2451            for chunk_result in stream {
2452                let chunk_bytes = chunk_result
2453                    .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
2454                file.write_all(&chunk_bytes)?;
2455                bytes_written += chunk_bytes.len() as u64;
2456            }
2457            file.flush()?;
2458            Ok(bytes_written)
2459        })();
2460
2461        match write_result {
2462            Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
2463                Ok(()) => {
2464                    info!(
2465                        "File downloaded: {bytes_written} bytes written to {}",
2466                        output.display()
2467                    );
2468                    Ok(bytes_written)
2469                }
2470                Err(rename_err) => {
2471                    if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2472                        warn!(
2473                            "Failed to remove temp download file {}: {cleanup_err}",
2474                            tmp_path.display()
2475                        );
2476                    }
2477                    Err(rename_err.into())
2478                }
2479            },
2480            Err(e) => {
2481                if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2482                    warn!(
2483                        "Failed to remove temp download file {}: {cleanup_err}",
2484                        tmp_path.display()
2485                    );
2486                }
2487                Err(e)
2488            }
2489        }
2490    }
2491}
2492
2493#[cfg(test)]
2494#[allow(clippy::unwrap_used)]
2495mod tests {
2496    use super::*;
2497
2498    #[test]
2499    fn disk_space_check_passes_for_small_file() {
2500        // A 1 KB file should always pass the disk space check
2501        check_disk_space_for_spill(1024).unwrap();
2502    }
2503
2504    #[test]
2505    fn disk_space_check_fails_for_absurd_size() {
2506        // Requesting space for a 1 exabyte file should fail on any real system
2507        let result = check_disk_space_for_spill(u64::MAX / 2);
2508        assert!(result.is_err());
2509        let err = result.unwrap_err();
2510        assert!(
2511            matches!(err, Error::InsufficientDiskSpace(_)),
2512            "expected InsufficientDiskSpace, got: {err}"
2513        );
2514    }
2515
2516    #[test]
2517    fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
2518        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
2519
2520        assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
2521    }
2522
2523    #[test]
2524    fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
2525        let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
2526
2527        assert_eq!(batch_size, 12);
2528    }
2529
2530    #[test]
2531    fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
2532        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
2533
2534        assert_eq!(batch_size, 32);
2535    }
2536
2537    #[test]
2538    fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
2539        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
2540
2541        assert_eq!(batch_size, 10);
2542    }
2543
2544    #[test]
2545    fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
2546        let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
2547            .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
2548            .max(1);
2549        let usable_memory = estimated_bytes_per_chunk
2550            .saturating_mul(16)
2551            .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
2552        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
2553
2554        assert_eq!(batch_size, 16);
2555    }
2556
2557    #[test]
2558    fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
2559        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
2560
2561        assert_eq!(batch_size, 1);
2562    }
2563
2564    #[test]
2565    fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
2566        let covered = compute_address(&Bytes::from_static(b"covered"));
2567        let extra = compute_address(&Bytes::from_static(b"extra"));
2568        let missing = compute_address(&Bytes::from_static(b"missing"));
2569        let cached = MerkleBatchPaymentResult {
2570            proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
2571            chunk_count: 2,
2572            storage_cost_atto: "0".to_string(),
2573            gas_cost_wei: 0,
2574            merkle_payment_timestamp: 0,
2575        };
2576
2577        assert!(cached_merkle_covers_addresses(&cached, &[covered]));
2578        assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
2579        assert!(!cached_merkle_covers_addresses(
2580            &cached,
2581            &[covered, missing]
2582        ));
2583    }
2584
2585    #[test]
2586    fn chunk_spill_round_trip() {
2587        let mut spill = ChunkSpill::new().unwrap();
2588        let data1 = vec![0xAA; 1024];
2589        let data2 = vec![0xBB; 2048];
2590
2591        spill.push(&data1).unwrap();
2592        spill.push(&data2).unwrap();
2593
2594        assert_eq!(spill.len(), 2);
2595        assert_eq!(spill.total_bytes(), 1024 + 2048);
2596        let chunk_entries = spill.chunk_entries().unwrap();
2597        let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
2598        assert_eq!(entry_total, 1024 + 2048);
2599
2600        // Read back and verify
2601        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
2602        assert_eq!(&chunk1[..], &data1[..]);
2603
2604        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
2605        assert_eq!(&chunk2[..], &data2[..]);
2606
2607        // Verify waves with 1-chunk wave size
2608        let waves: Vec<_> = spill.addresses.chunks(1).collect();
2609        assert_eq!(waves.len(), 2);
2610    }
2611
2612    #[test]
2613    fn chunk_spill_cleanup_on_drop() {
2614        let dir;
2615        {
2616            let spill = ChunkSpill::new().unwrap();
2617            dir = spill.dir.clone();
2618            assert!(dir.exists());
2619        }
2620        // After drop, the directory should be cleaned up
2621        assert!(!dir.exists(), "spill dir should be removed on drop");
2622    }
2623
2624    #[test]
2625    fn chunk_spill_deduplicates_identical_content() {
2626        let mut spill = ChunkSpill::new().unwrap();
2627        let data = vec![0xCC; 512];
2628
2629        spill.push(&data).unwrap();
2630        spill.push(&data).unwrap(); // same content, should be skipped
2631        spill.push(&data).unwrap(); // again
2632
2633        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
2634        assert_eq!(
2635            spill.total_bytes(),
2636            512,
2637            "total_bytes should count unique only"
2638        );
2639
2640        // Different content should still be added
2641        let data2 = vec![0xDD; 256];
2642        spill.push(&data2).unwrap();
2643        assert_eq!(spill.len(), 2);
2644        assert_eq!(spill.total_bytes(), 512 + 256);
2645    }
2646}
2647
2648/// Compile-time assertions that Client file method futures are Send.
2649#[cfg(test)]
2650mod send_assertions {
2651    use super::*;
2652
2653    fn _assert_send<T: Send>(_: &T) {}
2654
2655    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2656    async fn _file_upload_is_send(client: &Client) {
2657        let fut = client.file_upload(Path::new("/dev/null"));
2658        _assert_send(&fut);
2659    }
2660
2661    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2662    async fn _file_upload_with_mode_is_send(client: &Client) {
2663        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
2664        _assert_send(&fut);
2665    }
2666
2667    #[allow(
2668        dead_code,
2669        unreachable_code,
2670        unused_variables,
2671        clippy::diverging_sub_expression
2672    )]
2673    async fn _file_download_is_send(client: &Client) {
2674        let dm: DataMap = todo!();
2675        let fut = client.file_download(&dm, Path::new("/dev/null"));
2676        _assert_send(&fut);
2677    }
2678}