Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15    finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19    chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_store_with_retry,
20    should_use_merkle, MerkleBatchPaymentResult, PaymentMode, PreparedMerkleBatch,
21    MERKLE_RETRY_BACKOFF, MERKLE_STORE_MAX_ATTEMPTS,
22};
23use crate::data::client::Client;
24use crate::data::error::{Error, Result};
25use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
26use ant_protocol::transport::{MultiAddr, PeerId};
27use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
28use bytes::Bytes;
29use fs2::FileExt;
30use futures::stream::{self, StreamExt};
31use self_encryption::{
32    get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
33    streaming_decrypt_with_batch_size, DataMap,
34};
35use std::collections::{HashMap, HashSet};
36use std::io::Write;
37use std::path::{Path, PathBuf};
38use std::sync::{Arc, Mutex};
39use tokio::runtime::Handle;
40use tokio::sync::mpsc;
41use tracing::{debug, info, warn};
42use xor_name::XorName;
43
44/// Progress events emitted during file upload for UI feedback.
45#[derive(Debug, Clone)]
46pub enum UploadEvent {
47    /// A chunk has been encrypted and spilled to disk.
48    Encrypting { chunks_done: usize },
49    /// File encryption complete.
50    Encrypted { total_chunks: usize },
51    /// Starting quote collection for a wave.
52    QuotingChunks {
53        wave: usize,
54        total_waves: usize,
55        chunks_in_wave: usize,
56    },
57    /// A chunk has been quoted (peer discovery + price received).
58    /// This is the slow phase — each quote involves network round-trips.
59    ChunkQuoted { quoted: usize, total: usize },
60    /// A chunk has been stored on the network.
61    ChunkStored { stored: usize, total: usize },
62    /// A wave has completed.
63    WaveComplete {
64        wave: usize,
65        total_waves: usize,
66        stored_so_far: usize,
67        total: usize,
68    },
69}
70
71/// Progress events emitted during file download for UI feedback.
72#[derive(Debug, Clone)]
73pub enum DownloadEvent {
74    /// Resolving hierarchical DataMap to discover real chunk count.
75    ResolvingDataMap { total_map_chunks: usize },
76    /// A DataMap chunk has been fetched during resolution.
77    MapChunkFetched { fetched: usize },
78    /// DataMap resolved — total data chunk count now known.
79    DataMapResolved { total_chunks: usize },
80    /// Data chunks are being fetched from the network.
81    ChunksFetched { fetched: usize, total: usize },
82}
83
84/// One entry in the per-chunk quote list returned by
85/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
86/// signed quote it returned, and the payment amount it is demanding.
87type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
88
89/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
90const UPLOAD_WAVE_SIZE: usize = 64;
91
92/// Stream decrypt batches should be larger than fetch fan-out so
93/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
94/// ones complete, instead of stopping at each self-encryption batch boundary.
95const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
96
97/// Use at most this fraction of currently usable RAM for one decrypt batch.
98const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
99
100/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
101/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
102/// payload bytes alone.
103const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
104
105/// Maximum number of distinct chunk addresses to sample when probing for a
106/// representative quote in [`Client::estimate_upload_cost`].
107///
108/// Bounded small so we never spend more than a couple of round-trips on the
109/// `AlreadyStored` retry path, which only matters when many leading chunks
110/// of a file already live on the network.
111const ESTIMATE_SAMPLE_CAP: usize = 5;
112
113/// Gas used by one `pay_for_quotes` transaction that packs up to
114/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
115///
116/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
117/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
118/// the base tx overhead. On Arbitrum that is roughly
119/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
120/// conservative per-wave upper bound.
121const GAS_PER_WAVE_TX: u128 = 1_500_000;
122
123/// Gas used by one merkle batch payment transaction.
124///
125/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
126/// and posts a pool commitment, so budget higher than a plain transfer.
127const GAS_PER_MERKLE_TX: u128 = 500_000;
128
129/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
130/// figure when no live gas oracle is consulted.
131///
132/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
133/// that as the default so the CLI prints a sensible order-of-magnitude
134/// number. Users should treat the reported gas cost as an estimate, not a
135/// commitment — real gas is bid at submission time.
136const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
137
138/// Extra headroom percentage for disk space check.
139///
140/// Encrypted chunks are slightly larger than the source data due to padding
141/// and self-encryption overhead. We require file_size + 10% free space in
142/// the temp directory to account for this.
143const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
144
145/// Temporary on-disk buffer for encrypted chunks.
146///
147/// During file encryption, chunks are written to a temp directory so that
148/// only their 32-byte addresses stay in memory. At upload time chunks are
149/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
150/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
151///
152/// This is a small TOCTOU guard covering the sub-millisecond window inside
153/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
154/// dir is older than this and its lockfile is releasable, the owning process
155/// is gone and the dir is safe to reap — regardless of how old it is.
156///
157/// The previous policy waited 24 h before reaping any orphan, which meant
158/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
159/// spill dir until the next day's upload — and on a host being restart-looped
160/// by systemd, orphans could fill the disk well within that window.
161const SPILL_STALE_GRACE_SECS: u64 = 30;
162
163/// Prefix for spill directory names to distinguish from user files.
164const SPILL_DIR_PREFIX: &str = "spill_";
165
166/// Lockfile name inside each spill dir to signal active use.
167const SPILL_LOCK_NAME: &str = ".lock";
168
169struct ChunkSpill {
170    /// Directory holding spilled chunk files (named by hex address).
171    dir: PathBuf,
172    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
173    _lock: std::fs::File,
174    /// Deduplicated list of chunk addresses.
175    addresses: Vec<[u8; 32]>,
176    /// Tracks seen addresses for deduplication.
177    seen: HashSet<[u8; 32]>,
178    /// Byte size per spilled chunk address.
179    sizes: HashMap<[u8; 32], u64>,
180    /// Running total of unique chunk byte sizes (for average-size calculation).
181    total_bytes: u64,
182}
183
184impl ChunkSpill {
185    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
186    fn spill_root() -> Result<PathBuf> {
187        use crate::config;
188        let root = config::data_dir()
189            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
190            .join("spill");
191        Ok(root)
192    }
193
194    /// Create a new spill directory under `<data_dir>/spill/`.
195    ///
196    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
197    /// identified by prefix and cleaned up by age. A lockfile inside the
198    /// dir prevents concurrent cleanup from deleting an active spill.
199    fn new() -> Result<Self> {
200        let root = Self::spill_root()?;
201        std::fs::create_dir_all(&root)?;
202
203        // Clean up stale spill dirs from previous crashed runs.
204        Self::cleanup_stale(&root);
205
206        let now = std::time::SystemTime::now()
207            .duration_since(std::time::UNIX_EPOCH)
208            .unwrap_or_default()
209            .as_secs();
210        let unique: u64 = rand::random();
211        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
212        std::fs::create_dir(&dir)?;
213
214        // Create and hold a lockfile for the lifetime of this spill.
215        // cleanup_stale() will skip dirs with locked files.
216        let lock_path = dir.join(SPILL_LOCK_NAME);
217        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
218            Error::Io(std::io::Error::new(
219                e.kind(),
220                format!("failed to create spill lockfile: {e}"),
221            ))
222        })?;
223        lock_file.try_lock_exclusive().map_err(|e| {
224            Error::Io(std::io::Error::new(
225                e.kind(),
226                format!("failed to lock spill lockfile: {e}"),
227            ))
228        })?;
229
230        Ok(Self {
231            dir,
232            _lock: lock_file,
233            addresses: Vec::new(),
234            seen: HashSet::new(),
235            sizes: HashMap::new(),
236            total_bytes: 0,
237        })
238    }
239
240    /// Clean up stale spill directories. Best-effort, errors are logged.
241    ///
242    /// A spill dir is reaped when:
243    /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
244    /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
245    /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
246    /// 4. Its lockfile is releasable — i.e. no live process holds it
247    ///
248    /// The lockfile is the primary correctness gate: a releasable lock means
249    /// the owning `ChunkSpill` has been dropped or the process is gone, so
250    /// the dir is fair game. The grace period covers only the brief window
251    /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
252    ///
253    /// Safe to call concurrently from multiple processes.
254    fn cleanup_stale(root: &Path) {
255        let now = std::time::SystemTime::now()
256            .duration_since(std::time::UNIX_EPOCH)
257            .unwrap_or_default()
258            .as_secs();
259
260        if now == 0 {
261            // Clock is broken (before Unix epoch). Skip cleanup to avoid
262            // misidentifying dirs as stale.
263            warn!("System clock before Unix epoch, skipping spill cleanup");
264            return;
265        }
266
267        let entries = match std::fs::read_dir(root) {
268            Ok(entries) => entries,
269            Err(_) => return,
270        };
271
272        for entry in entries.flatten() {
273            let name = entry.file_name();
274            let name_str = name.to_string_lossy();
275
276            // Only process dirs with our prefix.
277            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
278                Some(s) => s,
279                None => continue,
280            };
281
282            // Parse timestamp: "spill_<timestamp>_<random>"
283            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
284                Some(ts) => ts,
285                None => continue,
286            };
287
288            if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
289                continue;
290            }
291
292            // Safety: only delete actual directories, not symlinks.
293            let file_type = match entry.file_type() {
294                Ok(ft) => ft,
295                Err(_) => continue,
296            };
297            if !file_type.is_dir() {
298                continue;
299            }
300
301            let path = entry.path();
302
303            // Check lockfile: if locked, the dir is in active use -- skip it.
304            let lock_path = path.join(SPILL_LOCK_NAME);
305            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
306                use fs2::FileExt;
307                if lock_file.try_lock_exclusive().is_err() {
308                    // Lock held by another process -- dir is active.
309                    debug!("Skipping active spill dir: {}", path.display());
310                    continue;
311                }
312                // We acquired the lock, so no one else holds it.
313                // Drop it before deleting.
314                drop(lock_file);
315            }
316
317            info!("Cleaning up stale spill dir: {}", path.display());
318            if let Err(e) = std::fs::remove_dir_all(&path) {
319                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
320            }
321        }
322    }
323
324    /// Run stale spill cleanup. Call at client startup or periodically.
325    #[allow(dead_code)]
326    pub(crate) fn run_cleanup() {
327        if let Ok(root) = Self::spill_root() {
328            Self::cleanup_stale(&root);
329        }
330    }
331
332    /// Write one encrypted chunk to disk and record its address.
333    ///
334    /// Deduplicates by content address: if the same chunk was already
335    /// spilled, the write and accounting are skipped. This prevents
336    /// double-uploads and inflated quoting metrics.
337    fn push(&mut self, content: &[u8]) -> Result<()> {
338        let address = compute_address(content);
339        if !self.seen.insert(address) {
340            return Ok(());
341        }
342        let path = self.dir.join(hex::encode(address));
343        std::fs::write(&path, content)?;
344        let content_len = content.len() as u64;
345        self.sizes.insert(address, content_len);
346        self.total_bytes += content_len;
347        self.addresses.push(address);
348        Ok(())
349    }
350
351    /// Number of chunks stored.
352    fn len(&self) -> usize {
353        self.addresses.len()
354    }
355
356    /// Total bytes of all spilled chunks.
357    fn total_bytes(&self) -> u64 {
358        self.total_bytes
359    }
360
361    /// Address and byte-size pairs for all spilled chunks.
362    fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
363        self.addresses
364            .iter()
365            .map(|address| {
366                self.sizes
367                    .get(address)
368                    .copied()
369                    .map(|size| (*address, size))
370                    .ok_or_else(|| {
371                        Error::Storage(format!(
372                            "missing size for spilled chunk {}",
373                            hex::encode(address)
374                        ))
375                    })
376            })
377            .collect()
378    }
379
380    /// Read a single chunk back from disk by address.
381    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
382        let path = self.dir.join(hex::encode(address));
383        let data = std::fs::read(&path).map_err(|e| {
384            Error::Io(std::io::Error::new(
385                e.kind(),
386                format!("reading spilled chunk {}: {e}", hex::encode(address)),
387            ))
388        })?;
389        Ok(Bytes::from(data))
390    }
391
392    /// Read a wave of chunks from disk.
393    fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
394        let mut out = Vec::with_capacity(wave_addrs.len());
395        for addr in wave_addrs {
396            let content = self.read_chunk(addr)?;
397            out.push((content, *addr));
398        }
399        Ok(out)
400    }
401
402    /// Clean up the spill directory.
403    fn cleanup(&self) {
404        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
405            warn!(
406                "Failed to clean up chunk spill dir {}: {e}",
407                self.dir.display()
408            );
409        }
410    }
411}
412
413impl Drop for ChunkSpill {
414    fn drop(&mut self) {
415        self.cleanup();
416    }
417}
418
419fn cached_merkle_covers_addresses(
420    cached: &MerkleBatchPaymentResult,
421    addresses: &[[u8; 32]],
422) -> bool {
423    addresses
424        .iter()
425        .all(|addr| cached.proofs.contains_key(addr))
426}
427
428/// Split `addresses` into `(to_store, missing_proof)`: those that have a merkle
429/// proof in `proofs`, and those that don't.
430///
431/// A partial [`MerkleBatchPaymentResult`] (from a `pay_for_merkle_multi_batch`
432/// where a later sub-batch's payment failed) carries proofs only for the
433/// already-paid sub-batches, so unpaid chunks reach the upload path with no
434/// proof. `upload_waves_merkle` reports those as failed via
435/// [`Error::PartialUpload`] rather than aborting the whole file. Order within
436/// each group follows `addresses`.
437fn partition_addresses_by_proof(
438    addresses: &[[u8; 32]],
439    proofs: &HashMap<[u8; 32], Vec<u8>>,
440) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
441    addresses
442        .iter()
443        .copied()
444        .partition(|addr| proofs.contains_key(addr))
445}
446
447/// Check that the spill directory has enough free space for the spilled chunks.
448///
449/// `file_size` is the source file's byte count. We require
450/// `file_size + 10%` free space to account for self-encryption overhead.
451fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
452    let spill_root = ChunkSpill::spill_root()?;
453
454    // Ensure the root exists so fs2 can query it.
455    std::fs::create_dir_all(&spill_root)?;
456
457    let available = fs2::available_space(&spill_root).map_err(|e| {
458        Error::Io(std::io::Error::new(
459            e.kind(),
460            format!(
461                "failed to query disk space on {}: {e}",
462                spill_root.display()
463            ),
464        ))
465    })?;
466
467    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
468    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
469    let required = file_size.saturating_add(headroom);
470
471    if available < required {
472        let avail_mb = available / (1024 * 1024);
473        let req_mb = required / (1024 * 1024);
474        return Err(Error::InsufficientDiskSpace(format!(
475            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
476            spill_root.display()
477        )));
478    }
479
480    debug!(
481        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
482        spill_root.display()
483    );
484    Ok(())
485}
486
487fn usable_memory_bytes() -> Option<u64> {
488    let mut system = sysinfo::System::new();
489    system.refresh_memory();
490
491    let available_memory = system.available_memory();
492    let free_memory = system.free_memory();
493    let used_memory = system.used_memory();
494    let total_memory = system.total_memory();
495    let unused_memory = total_memory.saturating_sub(used_memory);
496
497    let mut usable = [available_memory, free_memory, unused_memory]
498        .into_iter()
499        .filter(|bytes| *bytes > 0)
500        .max();
501
502    let cgroup_free_memory = system
503        .cgroup_limits()
504        .filter(|limits| limits.total_memory > 0)
505        .map(|limits| limits.free_memory);
506    if let Some(cgroup_free_memory) = cgroup_free_memory {
507        usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
508    }
509
510    debug!(
511        available_memory,
512        free_memory,
513        used_memory,
514        total_memory,
515        cgroup_free_memory,
516        usable_memory = ?usable,
517        "Detected usable memory for stream decrypt batch sizing"
518    );
519
520    usable
521}
522
523fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
524    let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
525    let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
526        .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
527        .max(1);
528    let cap = (budget / estimated_bytes_per_chunk).max(1);
529
530    usize::try_from(cap).unwrap_or(usize::MAX)
531}
532
533fn adaptive_stream_decrypt_batch_size(
534    total_chunks: usize,
535    fetch_cap: usize,
536    configured_batch_floor: usize,
537    usable_memory_bytes: Option<u64>,
538) -> usize {
539    let fetch_target = fetch_cap
540        .max(1)
541        .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
542    let requested = match usable_memory_bytes {
543        Some(bytes) => {
544            let memory_cap = stream_decrypt_batch_memory_cap(bytes);
545            configured_batch_floor
546                .max(fetch_target)
547                .max(1)
548                .min(memory_cap)
549        }
550        None => configured_batch_floor.max(1),
551    };
552
553    requested.min(total_chunks.max(1)).max(1)
554}
555
556/// Whether the data map is published to the network for address-based retrieval.
557///
558/// A private upload stores only the data chunks and returns the `DataMap` to
559/// the caller — only someone holding that `DataMap` can reconstruct the file.
560/// A public upload additionally stores the serialized `DataMap` as a chunk on
561/// the network, yielding a single chunk address that anyone can use to
562/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
563#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
564pub enum Visibility {
565    /// Keep the data map local; only the holder can retrieve the file.
566    #[default]
567    Private,
568    /// Publish the data map as a network chunk so anyone with the returned
569    /// address can retrieve and decrypt the file.
570    Public,
571}
572
573/// Estimated cost of uploading a file, returned by
574/// [`Client::estimate_upload_cost`].
575#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
576pub struct UploadCostEstimate {
577    /// Original file size in bytes.
578    pub file_size: u64,
579    /// Number of chunks the file would be split into (data chunks only,
580    /// does not include the DataMap chunk added during public uploads).
581    pub chunk_count: usize,
582    /// Estimated total storage cost in atto (token smallest unit).
583    pub storage_cost_atto: String,
584    /// Estimated gas cost in wei as a string. This is a rough heuristic
585    /// based on chunk count and payment mode, NOT a live gas price query.
586    pub estimated_gas_cost_wei: String,
587    /// Payment mode that would be used.
588    pub payment_mode: PaymentMode,
589}
590
591/// Result of a file upload: the `DataMap` needed to retrieve the file.
592///
593/// Marked `#[non_exhaustive]` so adding a new field in future is not a
594/// breaking change for downstream consumers that construct or pattern-match
595/// on this struct.
596#[derive(Debug, Clone)]
597#[non_exhaustive]
598pub struct FileUploadResult {
599    /// The data map containing chunk metadata for reconstruction.
600    pub data_map: DataMap,
601    /// Number of chunks stored on the network.
602    pub chunks_stored: usize,
603    /// Number of chunks that failed to store. Always 0 for a successful
604    /// upload — partial-failure information is conveyed via
605    /// [`crate::data::Error::PartialUpload`] instead.
606    pub chunks_failed: usize,
607    /// Total number of chunks in the upload, including chunks that were
608    /// already stored and skipped. On full success this equals `chunks_stored`.
609    pub total_chunks: usize,
610    /// Which payment mode was actually used (not just requested).
611    pub payment_mode_used: PaymentMode,
612    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
613    pub storage_cost_atto: String,
614    /// Total gas cost in wei. 0 if no on-chain transactions were made.
615    pub gas_cost_wei: u128,
616    /// Chunk address of the serialized `DataMap`, set only for
617    /// [`Visibility::Public`] uploads. **`Some` means this address is
618    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
619    /// necessarily that *this* upload paid to store it — if the serialized
620    /// `DataMap` hashed to a chunk that was already on the network (same
621    /// file uploaded before; deterministic via self-encryption), the address
622    /// is still returned but no storage payment was made for it.
623    pub data_map_address: Option<[u8; 32]>,
624    /// Sum of chunk-store RPC attempts across the upload
625    /// (`>= chunks_stored` on full success; more if any chunk retried).
626    /// `0` for paths that don't run the wave store loop.
627    pub chunk_attempts_total: usize,
628    /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
629    /// success, empty for paths that don't run the wave store loop).
630    pub store_durations_ms: Vec<u64>,
631    /// Count of stored chunks that succeeded on each retry round
632    /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
633    /// paths that don't run the wave store loop.
634    pub retries_histogram: [usize; 4],
635}
636
637/// Payment information for external signing — either wave-batch or merkle.
638#[derive(Debug)]
639pub enum ExternalPaymentInfo {
640    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
641    WaveBatch {
642        /// Chunks ready for payment (needed for finalize).
643        prepared_chunks: Vec<PreparedChunk>,
644        /// Payment intent for external signing.
645        payment_intent: PaymentIntent,
646    },
647    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
648    Merkle {
649        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
650        prepared_batch: PreparedMerkleBatch,
651        /// Raw chunk contents that still need upload after the preflight check.
652        chunk_contents: Vec<Bytes>,
653        /// Chunk addresses that still need upload after the preflight check.
654        chunk_addresses: Vec<[u8; 32]>,
655    },
656}
657
658/// Prepared upload ready for external payment.
659///
660/// Contains everything needed to construct the on-chain payment transaction
661/// externally (e.g. via WalletConnect in a desktop app) and then finalize
662/// the upload without a Rust-side wallet.
663///
664/// Note: This struct stays in Rust memory — only the public fields of
665/// `payment_info` are sent to the frontend. `PreparedChunk` contains
666/// non-serializable network types, so the full struct cannot derive `Serialize`.
667///
668/// Marked `#[non_exhaustive]` so adding a new field in future is not a
669/// breaking change for downstream consumers.
670#[derive(Debug)]
671#[non_exhaustive]
672pub struct PreparedUpload {
673    /// The data map for later retrieval.
674    pub data_map: DataMap,
675    /// Payment information for chunks that still need payment after the
676    /// already-stored preflight. This may be wave-batch even when the original
677    /// chunk count was merkle-eligible if the remaining count is below the
678    /// merkle threshold.
679    pub payment_info: ExternalPaymentInfo,
680    /// Chunk address of the serialized `DataMap` when this upload was
681    /// prepared with [`Visibility::Public`]. `Some` means the address is
682    /// retrievable on the network after finalization — either because this
683    /// upload paid to store the chunk in `payment_info`, or because the
684    /// chunk was already on the network (deterministic self-encryption).
685    /// Carried through to [`FileUploadResult::data_map_address`].
686    pub data_map_address: Option<[u8; 32]>,
687    /// Chunk addresses already present on the network when this upload was
688    /// prepared. These do not require payment or PUT during finalization.
689    pub already_stored_addresses: Vec<[u8; 32]>,
690    /// Total chunk count for the upload, including already-stored chunks.
691    pub total_chunks: usize,
692}
693
694/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
695type EncryptionChannels = (
696    tokio::sync::mpsc::Receiver<Bytes>,
697    tokio::sync::oneshot::Receiver<DataMap>,
698    tokio::task::JoinHandle<Result<()>>,
699);
700
701/// Spawn a blocking task that streams file encryption through a channel.
702fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
703    let metadata = std::fs::metadata(&path)?;
704    let data_size = usize::try_from(metadata.len())
705        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
706
707    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
708    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
709
710    let handle = tokio::task::spawn_blocking(move || {
711        let file = std::fs::File::open(&path)?;
712        let mut reader = std::io::BufReader::new(file);
713
714        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
715        let read_error_clone = Arc::clone(&read_error);
716
717        let data_iter = std::iter::from_fn(move || {
718            let mut buffer = vec![0u8; 8192];
719            match std::io::Read::read(&mut reader, &mut buffer) {
720                Ok(0) => None,
721                Ok(n) => {
722                    buffer.truncate(n);
723                    Some(Bytes::from(buffer))
724                }
725                Err(e) => {
726                    let mut guard = read_error_clone
727                        .lock()
728                        .unwrap_or_else(|poisoned| poisoned.into_inner());
729                    *guard = Some(e);
730                    None
731                }
732            }
733        });
734
735        let mut stream = stream_encrypt(data_size, data_iter)
736            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
737
738        for chunk_result in stream.chunks() {
739            // Check for captured read errors immediately after each chunk.
740            // stream_encrypt sees None (EOF) when a read fails, so it stops
741            // producing chunks. We must detect this before sending the
742            // partial results to avoid uploading a truncated DataMap.
743            {
744                let guard = read_error
745                    .lock()
746                    .unwrap_or_else(|poisoned| poisoned.into_inner());
747                if let Some(ref e) = *guard {
748                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
749                }
750            }
751
752            let (_hash, content) = chunk_result
753                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
754            if chunk_tx.blocking_send(content).is_err() {
755                return Err(Error::Encryption("upload receiver dropped".to_string()));
756            }
757        }
758
759        // Final check: read error after last chunk (stream saw EOF).
760        {
761            let guard = read_error
762                .lock()
763                .unwrap_or_else(|poisoned| poisoned.into_inner());
764            if let Some(ref e) = *guard {
765                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
766            }
767        }
768
769        let datamap = stream
770            .into_datamap()
771            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
772        if datamap_tx.send(datamap).is_err() {
773            warn!("DataMap receiver dropped — upload may have been cancelled");
774        }
775        Ok(())
776    });
777
778    Ok((chunk_rx, datamap_rx, handle))
779}
780
781impl Client {
782    /// Upload a file to the network using streaming self-encryption.
783    ///
784    /// Automatically selects merkle batch payment for files that produce
785    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
786    /// directory so peak memory stays at ~256 MB regardless of file size.
787    ///
788    /// # Errors
789    ///
790    /// Returns an error if the file cannot be read, encryption fails,
791    /// or any chunk cannot be stored.
792    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
793        self.file_upload_with_mode(path, PaymentMode::Auto).await
794    }
795
796    /// Estimate the cost of uploading a file without actually uploading.
797    ///
798    /// Encrypts the file to determine chunk count and sizes, then requests
799    /// a single quote from the network for a representative chunk. The
800    /// per-chunk price is extrapolated to the total chunk count.
801    ///
802    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
803    /// chunks are cleaned up automatically when the function returns.
804    ///
805    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
806    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
807    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
808    /// varies with network conditions.
809    ///
810    /// If the first sampled chunk is already stored on the network, the
811    /// function retries with subsequent chunk addresses (up to
812    /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
813    /// a [`Error::CostEstimationInconclusive`] is returned so callers can
814    /// decide how to react rather than trust a bogus "free" estimate. Only
815    /// when every address in the file is stored do we return a zero-cost
816    /// estimate.
817    ///
818    /// # Errors
819    ///
820    /// Returns an error if the file cannot be read, encryption fails,
821    /// the network cannot provide a quote, or every sampled chunk is
822    /// already stored ([`Error::CostEstimationInconclusive`]).
823    pub async fn estimate_upload_cost(
824        &self,
825        path: &Path,
826        mode: PaymentMode,
827        progress: Option<mpsc::Sender<UploadEvent>>,
828    ) -> Result<UploadCostEstimate> {
829        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
830
831        if file_size < 3 {
832            return Err(Error::InvalidData(
833                "File too small: self-encryption requires at least 3 bytes".into(),
834            ));
835        }
836
837        check_disk_space_for_spill(file_size)?;
838
839        info!(
840            "Estimating upload cost for {} ({file_size} bytes)",
841            path.display()
842        );
843
844        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
845        let chunk_count = spill.len();
846
847        if let Some(ref tx) = progress {
848            let _ = tx
849                .send(UploadEvent::Encrypted {
850                    total_chunks: chunk_count,
851                })
852                .await;
853        }
854
855        info!("Encrypted into {chunk_count} chunks, requesting quote");
856
857        // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
858        // AlreadyStored result says nothing about the rest of the file — the
859        // first chunk is often a DataMap-adjacent chunk that collides with
860        // prior uploads even when 99% of the file is new. Only treat the
861        // whole file as "fully stored" when every sample comes back stored.
862        let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
863        let mut sampled = 0usize;
864        let mut all_already_stored = true;
865        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
866
867        for addr in spill.addresses.iter().take(sample_limit) {
868            sampled += 1;
869            let chunk_bytes = spill.read_chunk(addr)?;
870            let data_size = u64::try_from(chunk_bytes.len())
871                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
872            match self
873                .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
874                .await
875            {
876                Ok(q) => {
877                    quotes_opt = Some(q);
878                    all_already_stored = false;
879                    break;
880                }
881                Err(Error::AlreadyStored) => {
882                    debug!(
883                        "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
884                        hex::encode(addr)
885                    );
886                    continue;
887                }
888                Err(e) => return Err(e),
889            }
890        }
891
892        let uses_merkle = should_use_merkle(chunk_count, mode);
893
894        let quotes = match quotes_opt {
895            Some(q) => q,
896            None if all_already_stored && sampled == chunk_count => {
897                // Every address in the file was sampled and every one is
898                // already on the network — returning a zero-cost estimate is
899                // accurate in this case.
900                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
901                return Ok(UploadCostEstimate {
902                    file_size,
903                    chunk_count,
904                    storage_cost_atto: "0".into(),
905                    estimated_gas_cost_wei: "0".into(),
906                    payment_mode: if uses_merkle {
907                        PaymentMode::Merkle
908                    } else {
909                        PaymentMode::Single
910                    },
911                });
912            }
913            None => {
914                return Err(Error::CostEstimationInconclusive(format!(
915                    "sampled {sampled} chunk addresses out of {chunk_count} and every \
916                     one reported AlreadyStored; cannot infer a representative price \
917                     for the remaining chunks"
918                )));
919            }
920        };
921
922        // Use the median price × 3 (matches SingleNodePayment::from_quotes
923        // which pays 3x the median to incentivize reliable storage).
924        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
925        prices.sort();
926        let median_price = prices
927            .get(prices.len() / 2)
928            .copied()
929            .unwrap_or(Amount::ZERO);
930        let per_chunk_cost = median_price * Amount::from(3u64);
931
932        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
933        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
934
935        // Estimate gas cost from realistic per-transaction budgets rather
936        // than a flat per-chunk or per-wave number.
937        //
938        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
939        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
940        //   The dominant cost is one SSTORE per entry plus base tx overhead,
941        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
942        //   on a full wave and multiply by the number of waves. The previous
943        //   per-wave figure of 150k was closer to a single-entry transfer
944        //   and understated cost by 5–10x for full waves.
945        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
946        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
947        //
948        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
949        // Arbitrum baseline). Treat the result as advisory, not a commitment.
950        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
951        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
952        let estimated_gas: u128 = if uses_merkle {
953            merkle_batches
954                .saturating_mul(GAS_PER_MERKLE_TX)
955                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
956        } else {
957            waves
958                .saturating_mul(GAS_PER_WAVE_TX)
959                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
960        };
961
962        info!(
963            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
964        );
965
966        Ok(UploadCostEstimate {
967            file_size,
968            chunk_count,
969            storage_cost_atto: total_storage.to_string(),
970            estimated_gas_cost_wei: estimated_gas.to_string(),
971            payment_mode: if uses_merkle {
972                PaymentMode::Merkle
973            } else {
974                PaymentMode::Single
975            },
976        })
977    }
978
979    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
980    ///
981    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
982    /// [`Visibility::Private`] — see that method for details.
983    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
984        self.file_prepare_upload_with_progress(path, Visibility::Private, None)
985            .await
986    }
987
988    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
989    ///
990    /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
991    /// `progress: None` — see that method for details.
992    pub async fn file_prepare_upload_with_visibility(
993        &self,
994        path: &Path,
995        visibility: Visibility,
996    ) -> Result<PreparedUpload> {
997        self.file_prepare_upload_with_progress(path, visibility, None)
998            .await
999    }
1000
1001    /// Phase 1 of external-signer upload with progress events.
1002    ///
1003    /// Requires an EVM network (for contract price queries) but NOT a wallet.
1004    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
1005    /// and a [`PaymentIntent`] that the external signer uses to construct
1006    /// and submit the on-chain payment transaction.
1007    ///
1008    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
1009    /// is bundled into the payment batch as an additional chunk and its
1010    /// address is recorded on the returned [`PreparedUpload`]. After
1011    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
1012    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
1013    /// can share a single address from which anyone can retrieve the file.
1014    ///
1015    /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
1016    /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
1017    /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
1018    /// emitted later by [`Client::finalize_upload_with_progress`] /
1019    /// [`Client::finalize_upload_merkle_with_progress`].
1020    ///
1021    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1022    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1023    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1024    /// inherent to the two-phase external-signer protocol — the chunks must
1025    /// stay in memory until [`Client::finalize_upload`] stores them. For very
1026    /// large files, prefer [`Client::file_upload`] which streams directly.
1027    ///
1028    /// # Errors
1029    ///
1030    /// Returns an error if there is insufficient disk space, the file cannot
1031    /// be read, encryption fails, or quote collection fails.
1032    pub async fn file_prepare_upload_with_progress(
1033        &self,
1034        path: &Path,
1035        visibility: Visibility,
1036        progress: Option<mpsc::Sender<UploadEvent>>,
1037    ) -> Result<PreparedUpload> {
1038        debug!(
1039            "Preparing file upload for external signing (visibility={visibility:?}): {}",
1040            path.display()
1041        );
1042
1043        let file_size = std::fs::metadata(path)?.len();
1044        check_disk_space_for_spill(file_size)?;
1045
1046        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1047
1048        info!(
1049            "Encrypted {} into {} chunks for external signing (spilled to disk)",
1050            path.display(),
1051            spill.len()
1052        );
1053
1054        // Read each chunk from disk and collect quotes concurrently.
1055        // Note: all PreparedChunks accumulate in memory because the external-signer
1056        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1057        let mut chunk_data: Vec<Bytes> = spill
1058            .addresses
1059            .iter()
1060            .map(|addr| spill.read_chunk(addr))
1061            .collect::<std::result::Result<Vec<_>, _>>()?;
1062
1063        // For public uploads, bundle the serialized DataMap as an extra chunk
1064        // in the same payment batch. This lets the external signer pay for
1065        // the data chunks and the DataMap chunk in one flow, and lets the
1066        // finalize step return the DataMap's chunk address as the shareable
1067        // retrieval address.
1068        let data_map_address = match visibility {
1069            Visibility::Private => None,
1070            Visibility::Public => {
1071                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1072                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1073                })?;
1074                let bytes = Bytes::from(serialized);
1075                let address = compute_address(&bytes);
1076                info!(
1077                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1078                    bytes.len(),
1079                    hex::encode(address)
1080                );
1081                chunk_data.push(bytes);
1082                Some(address)
1083            }
1084        };
1085
1086        let chunk_count = chunk_data.len();
1087
1088        if let Some(ref tx) = progress {
1089            let _ = tx
1090                .send(UploadEvent::Encrypted {
1091                    total_chunks: chunk_count,
1092                })
1093                .await;
1094        }
1095
1096        let (payment_info, already_stored_addresses) = if should_use_merkle(
1097            chunk_count,
1098            PaymentMode::Auto,
1099        ) {
1100            // Merkle path: build tree, collect candidate pools, return for external payment.
1101            info!("Using merkle batch preparation for {chunk_count} file chunks");
1102
1103            let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1104                .iter()
1105                .map(|chunk| {
1106                    let size = u64::try_from(chunk.len())
1107                        .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1108                    Ok((compute_address(chunk), size))
1109                })
1110                .collect::<Result<Vec<_>>>()?;
1111
1112            let merkle_plan = self
1113                .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1114                .await?;
1115
1116            if merkle_plan.to_upload.is_empty() {
1117                info!("All {chunk_count} file chunks already stored; no external payment needed");
1118                (
1119                    ExternalPaymentInfo::WaveBatch {
1120                        prepared_chunks: Vec::new(),
1121                        payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1122                    },
1123                    merkle_plan.already_stored,
1124                )
1125            } else {
1126                let chunk_data =
1127                    chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1128
1129                if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1130                    info!(
1131                        "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1132                        merkle_plan.to_upload.len()
1133                    );
1134                    let (payment_info, mut wave_already_stored) = self
1135                        .prepare_wave_batch_external_chunks(
1136                            chunk_data,
1137                            progress.as_ref(),
1138                            chunk_count,
1139                        )
1140                        .await?;
1141                    let mut already_stored = merkle_plan.already_stored;
1142                    already_stored.append(&mut wave_already_stored);
1143                    (payment_info, already_stored)
1144                } else {
1145                    match self
1146                        .prepare_merkle_batch_external(
1147                            &merkle_plan.to_upload,
1148                            DATA_TYPE_CHUNK,
1149                            merkle_plan.to_upload_avg_size(),
1150                        )
1151                        .await
1152                    {
1153                        Ok(prepared_batch) => {
1154                            info!(
1155                                "File prepared for external merkle signing: {} chunks, depth={} ({})",
1156                                merkle_plan.to_upload.len(),
1157                                prepared_batch.depth,
1158                                path.display()
1159                            );
1160
1161                            (
1162                                ExternalPaymentInfo::Merkle {
1163                                    prepared_batch,
1164                                    chunk_contents: chunk_data,
1165                                    chunk_addresses: merkle_plan.to_upload,
1166                                },
1167                                merkle_plan.already_stored,
1168                            )
1169                        }
1170                        Err(Error::InsufficientPeers(ref msg)) => {
1171                            info!(
1172                                "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1173                            );
1174                            let (payment_info, mut wave_already_stored) = self
1175                                .prepare_wave_batch_external_chunks(
1176                                    chunk_data,
1177                                    progress.as_ref(),
1178                                    chunk_count,
1179                                )
1180                                .await?;
1181                            let mut already_stored = merkle_plan.already_stored;
1182                            already_stored.append(&mut wave_already_stored);
1183                            (payment_info, already_stored)
1184                        }
1185                        Err(e) => return Err(e),
1186                    }
1187                }
1188            }
1189        } else {
1190            self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1191                .await?
1192        };
1193
1194        // Surface the "DataMap chunk was already on the network" case
1195        // so debugging "why is data_map_address set but no storage cost
1196        // appears for it?" doesn't require reading the source. See the
1197        // `data_map_address` doc comment for why this is still a valid
1198        // `Some(addr)` outcome.
1199        if let Some(addr) = data_map_address {
1200            let data_map_needs_payment = match &payment_info {
1201                ExternalPaymentInfo::WaveBatch {
1202                    prepared_chunks, ..
1203                } => prepared_chunks.iter().any(|c| c.address == addr),
1204                ExternalPaymentInfo::Merkle {
1205                    chunk_addresses, ..
1206                } => chunk_addresses.contains(&addr),
1207            };
1208            if !data_map_needs_payment {
1209                info!(
1210                    "Public upload: DataMap chunk {} was already stored \
1211                     on the network — address is retrievable without a \
1212                     new payment",
1213                    hex::encode(addr)
1214                );
1215            }
1216        }
1217
1218        Ok(PreparedUpload {
1219            data_map,
1220            payment_info,
1221            data_map_address,
1222            already_stored_addresses,
1223            total_chunks: chunk_count,
1224        })
1225    }
1226
1227    async fn prepare_wave_batch_external_chunks(
1228        &self,
1229        chunk_data: Vec<Bytes>,
1230        progress: Option<&mpsc::Sender<UploadEvent>>,
1231        progress_total: usize,
1232    ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1233        let chunk_count = chunk_data.len();
1234        let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1235            .into_iter()
1236            .map(|content| {
1237                let address = compute_address(&content);
1238                (content, address)
1239            })
1240            .collect();
1241
1242        // Wave-batch path: collect quotes per chunk concurrently, emitting
1243        // a `ChunkQuoted` event after each completion so callers can drive
1244        // a progress bar through the slow quote phase.
1245        let quote_limiter = self.controller().quote.clone();
1246        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1247        let mut quote_stream = stream::iter(chunks_with_addr)
1248            .map(|(content, address)| {
1249                let limiter = quote_limiter.clone();
1250                async move {
1251                    let result = observe_op(
1252                        &limiter,
1253                        || async move { self.prepare_chunk_payment(content).await },
1254                        classify_error,
1255                    )
1256                    .await;
1257                    (address, result)
1258                }
1259            })
1260            .buffer_unordered(quote_concurrency);
1261
1262        let mut prepared_chunks = Vec::with_capacity(chunk_count);
1263        let mut already_stored = Vec::new();
1264        let mut quoted = 0usize;
1265        while let Some((address, result)) = quote_stream.next().await {
1266            match result? {
1267                Some(prepared) => prepared_chunks.push(prepared),
1268                None => already_stored.push(address),
1269            }
1270            quoted += 1;
1271            if let Some(tx) = progress {
1272                let _ = tx.try_send(UploadEvent::ChunkQuoted {
1273                    quoted,
1274                    total: progress_total,
1275                });
1276            }
1277        }
1278
1279        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1280        info!(
1281            "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1282            prepared_chunks.len(),
1283            already_stored.len(),
1284            payment_intent.total_amount,
1285        );
1286
1287        Ok((
1288            ExternalPaymentInfo::WaveBatch {
1289                prepared_chunks,
1290                payment_intent,
1291            },
1292            already_stored,
1293        ))
1294    }
1295
1296    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1297    ///
1298    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1299    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1300    /// payment. Builds payment proofs and stores chunks on the network.
1301    ///
1302    /// # Errors
1303    ///
1304    /// Returns an error if the prepared upload used merkle payment (use
1305    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1306    /// or any chunk cannot be stored.
1307    pub async fn finalize_upload(
1308        &self,
1309        prepared: PreparedUpload,
1310        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1311    ) -> Result<FileUploadResult> {
1312        self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1313            .await
1314    }
1315
1316    /// Phase 2 of external-signer upload (wave-batch) with progress events.
1317    ///
1318    /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1319    /// on the provided channel as each chunk is successfully stored.
1320    ///
1321    /// # Errors
1322    ///
1323    /// Same as [`Client::finalize_upload`].
1324    pub async fn finalize_upload_with_progress(
1325        &self,
1326        prepared: PreparedUpload,
1327        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1328        progress: Option<mpsc::Sender<UploadEvent>>,
1329    ) -> Result<FileUploadResult> {
1330        let data_map_address = prepared.data_map_address;
1331        let already_stored_addresses = prepared.already_stored_addresses;
1332        let already_stored_count = already_stored_addresses.len();
1333        let total_chunks = prepared.total_chunks;
1334        match prepared.payment_info {
1335            ExternalPaymentInfo::WaveBatch {
1336                prepared_chunks,
1337                payment_intent: _,
1338            } => {
1339                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1340                let wave_result = self
1341                    .store_paid_chunks_with_events(
1342                        paid_chunks,
1343                        progress.as_ref(),
1344                        already_stored_count,
1345                        total_chunks,
1346                    )
1347                    .await;
1348                if !wave_result.failed.is_empty() {
1349                    let failed_count = wave_result.failed.len();
1350                    let stored_count = already_stored_count + wave_result.stored.len();
1351                    let mut stored = already_stored_addresses;
1352                    stored.extend(wave_result.stored);
1353                    return Err(Error::PartialUpload {
1354                        stored,
1355                        stored_count,
1356                        failed: wave_result.failed,
1357                        failed_count,
1358                        total_chunks,
1359                        reason: "finalize_upload: chunk storage failed after retries".into(),
1360                    });
1361                }
1362                let chunks_stored = already_stored_count + wave_result.stored.len();
1363
1364                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1365
1366                let mut stats = WaveAggregateStats::default();
1367                stats.absorb(&wave_result);
1368
1369                Ok(FileUploadResult {
1370                    data_map: prepared.data_map,
1371                    chunks_stored,
1372                    chunks_failed: 0,
1373                    total_chunks,
1374                    payment_mode_used: PaymentMode::Single,
1375                    storage_cost_atto: "0".into(),
1376                    gas_cost_wei: 0,
1377                    data_map_address,
1378                    chunk_attempts_total: stats.chunk_attempts_total,
1379                    store_durations_ms: stats.store_durations_ms,
1380                    retries_histogram: stats.retries_histogram,
1381                })
1382            }
1383            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1384                "Cannot finalize merkle upload with wave-batch tx hashes. \
1385                 Use finalize_upload_merkle() instead."
1386                    .to_string(),
1387            )),
1388        }
1389    }
1390
1391    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1392    ///
1393    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1394    /// returned by the on-chain merkle payment transaction. Generates proofs and
1395    /// stores chunks on the network.
1396    ///
1397    /// # Errors
1398    ///
1399    /// Returns an error if the prepared upload used wave-batch payment (use
1400    /// [`Client::finalize_upload`] instead), proof generation fails,
1401    /// or any chunk cannot be stored.
1402    pub async fn finalize_upload_merkle(
1403        &self,
1404        prepared: PreparedUpload,
1405        winner_pool_hash: [u8; 32],
1406    ) -> Result<FileUploadResult> {
1407        self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1408            .await
1409    }
1410
1411    /// Phase 2 of external-signer upload (merkle) with progress events.
1412    ///
1413    /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1414    /// on the provided channel as each chunk is successfully stored.
1415    ///
1416    /// # Errors
1417    ///
1418    /// Same as [`Client::finalize_upload_merkle`].
1419    pub async fn finalize_upload_merkle_with_progress(
1420        &self,
1421        prepared: PreparedUpload,
1422        winner_pool_hash: [u8; 32],
1423        progress: Option<mpsc::Sender<UploadEvent>>,
1424    ) -> Result<FileUploadResult> {
1425        let data_map_address = prepared.data_map_address;
1426        let already_stored_count = prepared.already_stored_addresses.len();
1427        let total_chunks = prepared.total_chunks;
1428        match prepared.payment_info {
1429            ExternalPaymentInfo::Merkle {
1430                prepared_batch,
1431                chunk_contents,
1432                chunk_addresses,
1433            } => {
1434                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1435                let outcome = self
1436                    .merkle_upload_chunks(
1437                        chunk_contents,
1438                        chunk_addresses,
1439                        &batch_result,
1440                        progress.as_ref(),
1441                        already_stored_count,
1442                        total_chunks,
1443                    )
1444                    .await?;
1445
1446                info!(
1447                    "External-signer merkle upload finalized: {} chunks stored, {} failed",
1448                    outcome.stored, outcome.failed
1449                );
1450
1451                Ok(FileUploadResult {
1452                    data_map: prepared.data_map,
1453                    chunks_stored: outcome.stored,
1454                    chunks_failed: outcome.failed,
1455                    total_chunks,
1456                    payment_mode_used: PaymentMode::Merkle,
1457                    storage_cost_atto: "0".into(),
1458                    gas_cost_wei: 0,
1459                    data_map_address,
1460                    chunk_attempts_total: outcome.stats.chunk_attempts_total,
1461                    store_durations_ms: outcome.stats.store_durations_ms,
1462                    retries_histogram: outcome.stats.retries_histogram,
1463                })
1464            }
1465            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1466                "Cannot finalize wave-batch upload with merkle winner hash. \
1467                 Use finalize_upload() instead."
1468                    .to_string(),
1469            )),
1470        }
1471    }
1472
1473    /// Upload a file with a specific payment mode.
1474    ///
1475    /// Before encryption, checks that the temp directory has enough free
1476    /// disk space for the spilled chunks (~1.1× source file size).
1477    ///
1478    /// Encrypted chunks are spilled to a temp directory during encryption
1479    /// so that only their 32-byte addresses stay in memory. At upload time,
1480    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1481    ///
1482    /// # Errors
1483    ///
1484    /// Returns an error if there is insufficient disk space, the file cannot
1485    /// be read, encryption fails, or any chunk cannot be stored.
1486    #[allow(clippy::too_many_lines)]
1487    pub async fn file_upload_with_mode(
1488        &self,
1489        path: &Path,
1490        mode: PaymentMode,
1491    ) -> Result<FileUploadResult> {
1492        self.file_upload_with_progress(path, mode, None).await
1493    }
1494
1495    /// Upload a file with progress events sent to the given channel.
1496    ///
1497    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1498    /// provided channel for UI progress feedback.
1499    #[allow(clippy::too_many_lines)]
1500    pub async fn file_upload_with_progress(
1501        &self,
1502        path: &Path,
1503        mode: PaymentMode,
1504        progress: Option<mpsc::Sender<UploadEvent>>,
1505    ) -> Result<FileUploadResult> {
1506        debug!(
1507            "Streaming file upload with mode {mode:?}: {}",
1508            path.display()
1509        );
1510
1511        // Pre-flight: verify enough temp disk space for the chunk spill.
1512        let file_size = std::fs::metadata(path)?.len();
1513        check_disk_space_for_spill(file_size)?;
1514
1515        // Phase 1: Encrypt file and spill chunks to temp directory.
1516        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1517        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1518
1519        let chunk_count = spill.len();
1520        info!(
1521            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1522            path.display()
1523        );
1524        if let Some(ref tx) = progress {
1525            let _ = tx
1526                .send(UploadEvent::Encrypted {
1527                    total_chunks: chunk_count,
1528                })
1529                .await;
1530        }
1531
1532        // Phase 2: Decide payment mode and upload in waves from disk.
1533        //
1534        // For the merkle path, attempt to resume from a cached
1535        // receipt before paying again. The cache is keyed by the
1536        // CANONICAL source path so `./foo`, `/abs/foo`, and any
1537        // symlink alias all resolve to the same cache entry — a
1538        // crash-and-retry from a different cwd or via a different
1539        // alias still hits the receipt. Canonicalize may fail (the
1540        // file could have been moved between phase 1 and here); we
1541        // fall back to the display string in that case, which
1542        // preserves pre-fix behaviour rather than dropping cache
1543        // resume entirely.
1544        let file_path_key = std::fs::canonicalize(path)
1545            .map(|p| p.display().to_string())
1546            .unwrap_or_else(|_| path.display().to_string());
1547        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1548            .should_use_merkle(chunk_count, mode)
1549        {
1550            info!("Using merkle batch payment for {chunk_count} file chunks");
1551
1552            let cached_merkle =
1553                crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1554                    .map(|(_cache_path, cached)| cached);
1555
1556            let merkle_plan = match self
1557                .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
1558                .await
1559            {
1560                Ok(plan) => plan,
1561                Err(e) => {
1562                    if let Some(cached) = cached_merkle
1563                        .as_ref()
1564                        .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
1565                    {
1566                        info!(
1567                            "Merkle preflight failed ({e}); \
1568                             resuming with cached merkle proofs"
1569                        );
1570                        let (stored, sc, gc, stats) = self
1571                            .upload_waves_merkle(
1572                                &spill,
1573                                &spill.addresses,
1574                                cached,
1575                                &[],
1576                                progress.as_ref(),
1577                            )
1578                            .await?;
1579                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1580                        return Ok(FileUploadResult {
1581                            data_map,
1582                            chunks_stored: stored,
1583                            chunks_failed: 0,
1584                            total_chunks: chunk_count,
1585                            payment_mode_used: PaymentMode::Merkle,
1586                            storage_cost_atto: sc,
1587                            gas_cost_wei: gc,
1588                            data_map_address: None,
1589                            chunk_attempts_total: stats.chunk_attempts_total,
1590                            store_durations_ms: stats.store_durations_ms,
1591                            retries_histogram: stats.retries_histogram,
1592                        });
1593                    }
1594                    match &e {
1595                        Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
1596                            info!(
1597                                "Merkle preflight needs more peers ({msg}), \
1598                                 falling back to wave-batch"
1599                            );
1600                            let (stored, sc, gc, fb_stats) = self
1601                                .upload_waves_single(
1602                                    &spill,
1603                                    progress.as_ref(),
1604                                    Some(&file_path_key),
1605                                )
1606                                .await?;
1607                            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1608                            return Ok(FileUploadResult {
1609                                data_map,
1610                                chunks_stored: stored,
1611                                chunks_failed: 0,
1612                                total_chunks: chunk_count,
1613                                payment_mode_used: PaymentMode::Single,
1614                                storage_cost_atto: sc,
1615                                gas_cost_wei: gc,
1616                                data_map_address: None,
1617                                chunk_attempts_total: fb_stats.chunk_attempts_total,
1618                                store_durations_ms: fb_stats.store_durations_ms,
1619                                retries_histogram: fb_stats.retries_histogram,
1620                            });
1621                        }
1622                        _ => return Err(e),
1623                    }
1624                }
1625            };
1626
1627            if merkle_plan.to_upload.is_empty() {
1628                info!("All {chunk_count} merkle chunks already stored; skipping payment");
1629                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1630                crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1631                (
1632                    chunk_count,
1633                    PaymentMode::Merkle,
1634                    "0".to_string(),
1635                    0,
1636                    WaveAggregateStats::default(),
1637                )
1638            } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
1639                let remaining_chunks = merkle_plan.to_upload.len();
1640                if let Some(cached) = cached_merkle
1641                    .as_ref()
1642                    .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
1643                {
1644                    info!(
1645                        "{remaining_chunks} chunks remain below merkle threshold; \
1646                         reusing cached merkle proofs"
1647                    );
1648                    let (stored, sc, gc, stats) = self
1649                        .upload_waves_merkle(
1650                            &spill,
1651                            &merkle_plan.to_upload,
1652                            cached,
1653                            &merkle_plan.already_stored,
1654                            progress.as_ref(),
1655                        )
1656                        .await?;
1657                    crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1658                    (stored, PaymentMode::Merkle, sc, gc, stats)
1659                } else {
1660                    if cached_merkle.is_some() {
1661                        info!(
1662                            "{remaining_chunks} chunks remain below merkle threshold, \
1663                             and the cached merkle receipt does not cover them. \
1664                             Discarding cache and using single-node payment."
1665                        );
1666                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1667                    } else {
1668                        info!(
1669                            "{remaining_chunks} chunks need upload after merkle preflight; \
1670                             using single-node payment"
1671                        );
1672                    }
1673                    let (stored, sc, gc, stats) = self
1674                        .upload_spill_addresses_single(
1675                            &spill,
1676                            &merkle_plan.to_upload,
1677                            progress.as_ref(),
1678                            merkle_plan.already_stored.len(),
1679                            chunk_count,
1680                            Some(&file_path_key),
1681                        )
1682                        .await?;
1683                    crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1684                    (stored, PaymentMode::Single, sc, gc, stats)
1685                }
1686            } else {
1687                let batch_result = if let Some(cached) = cached_merkle.as_ref() {
1688                    // Validate the cache against the chunks that still need
1689                    // storage. Extra proofs are harmless: a previous attempt
1690                    // may have paid for chunks that are now already stored.
1691                    if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
1692                        info!(
1693                            "Skipping merkle payment phase; resuming with \
1694                             cached proofs for {} remaining chunks",
1695                            merkle_plan.to_upload.len()
1696                        );
1697                        Ok(cached.clone())
1698                    } else {
1699                        info!(
1700                            "Cached merkle receipt does not cover the current \
1701                             remaining chunks (cached={}, remaining={}). \
1702                             Discarding cache and paying fresh.",
1703                            cached.proofs.len(),
1704                            merkle_plan.to_upload.len()
1705                        );
1706                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1707                        self.pay_for_merkle_batch(
1708                            &merkle_plan.to_upload,
1709                            DATA_TYPE_CHUNK,
1710                            merkle_plan.to_upload_avg_size(),
1711                        )
1712                        .await
1713                        .inspect(|result| {
1714                            crate::data::client::cached_merkle::try_save(&file_path_key, result);
1715                        })
1716                    }
1717                } else {
1718                    self.pay_for_merkle_batch(
1719                        &merkle_plan.to_upload,
1720                        DATA_TYPE_CHUNK,
1721                        merkle_plan.to_upload_avg_size(),
1722                    )
1723                    .await
1724                    .inspect(|result| {
1725                        // Save BEFORE the store phase so a crash
1726                        // mid-upload leaves a resumable receipt.
1727                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
1728                    })
1729                };
1730
1731                let batch_result = match batch_result {
1732                    Ok(result) => result,
1733                    Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1734                        info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1735                        let (stored, sc, gc, fb_stats) = self
1736                            .upload_spill_addresses_single(
1737                                &spill,
1738                                &merkle_plan.to_upload,
1739                                progress.as_ref(),
1740                                merkle_plan.already_stored.len(),
1741                                chunk_count,
1742                                Some(&file_path_key),
1743                            )
1744                            .await?;
1745                        crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1746                        return Ok(FileUploadResult {
1747                            data_map,
1748                            chunks_stored: stored,
1749                            chunks_failed: 0,
1750                            total_chunks: chunk_count,
1751                            payment_mode_used: PaymentMode::Single,
1752                            storage_cost_atto: sc,
1753                            gas_cost_wei: gc,
1754                            data_map_address: None,
1755                            chunk_attempts_total: fb_stats.chunk_attempts_total,
1756                            store_durations_ms: fb_stats.store_durations_ms,
1757                            retries_histogram: fb_stats.retries_histogram,
1758                        });
1759                    }
1760                    Err(e) => return Err(e),
1761                };
1762
1763                let (stored, sc, gc, stats) = self
1764                    .upload_waves_merkle(
1765                        &spill,
1766                        &merkle_plan.to_upload,
1767                        &batch_result,
1768                        &merkle_plan.already_stored,
1769                        progress.as_ref(),
1770                    )
1771                    .await?;
1772                // Upload succeeded end-to-end; the cached receipt is
1773                // no longer needed.
1774                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1775                (stored, PaymentMode::Merkle, sc, gc, stats)
1776            }
1777        } else {
1778            let (stored, sc, gc, stats) = self
1779                .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
1780                .await?;
1781            // Full file success: drop any cached single-node receipt.
1782            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1783            (stored, PaymentMode::Single, sc, gc, stats)
1784        };
1785
1786        info!(
1787            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1788            path.display()
1789        );
1790
1791        Ok(FileUploadResult {
1792            data_map,
1793            chunks_stored,
1794            chunks_failed: 0,
1795            total_chunks: chunk_count,
1796            payment_mode_used: actual_mode,
1797            storage_cost_atto,
1798            gas_cost_wei,
1799            data_map_address: None,
1800            chunk_attempts_total: stats.chunk_attempts_total,
1801            store_durations_ms: stats.store_durations_ms,
1802            retries_histogram: stats.retries_histogram,
1803        })
1804    }
1805
1806    /// Encrypt a file and spill chunks to a temp directory.
1807    ///
1808    /// Logs progress every 100 chunks so users get feedback during
1809    /// multi-GB encryptions.
1810    ///
1811    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1812    async fn encrypt_file_to_spill(
1813        &self,
1814        path: &Path,
1815        progress: Option<&mpsc::Sender<UploadEvent>>,
1816    ) -> Result<(ChunkSpill, DataMap)> {
1817        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1818
1819        let mut spill = ChunkSpill::new()?;
1820        while let Some(content) = chunk_rx.recv().await {
1821            spill.push(&content)?;
1822            let chunks_done = spill.len();
1823            if let Some(tx) = progress {
1824                if chunks_done.is_multiple_of(10) {
1825                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1826                }
1827            }
1828            if chunks_done % 100 == 0 {
1829                let mb = spill.total_bytes() / (1024 * 1024);
1830                info!(
1831                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1832                    path.display()
1833                );
1834            }
1835        }
1836
1837        // Await encryption completion to catch errors before paying.
1838        handle
1839            .await
1840            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1841            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1842
1843        let data_map = datamap_rx
1844            .await
1845            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1846
1847        Ok((spill, data_map))
1848    }
1849
1850    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1851    ///
1852    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1853    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1854    ///
1855    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1856    async fn upload_waves_single(
1857        &self,
1858        spill: &ChunkSpill,
1859        progress: Option<&mpsc::Sender<UploadEvent>>,
1860        resume_key: Option<&str>,
1861    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1862        self.upload_spill_addresses_single(
1863            spill,
1864            &spill.addresses,
1865            progress,
1866            0,
1867            spill.len(),
1868            resume_key,
1869        )
1870        .await
1871    }
1872
1873    async fn upload_spill_addresses_single(
1874        &self,
1875        spill: &ChunkSpill,
1876        addresses: &[[u8; 32]],
1877        progress: Option<&mpsc::Sender<UploadEvent>>,
1878        stored_offset: usize,
1879        total_chunks: usize,
1880        resume_key: Option<&str>,
1881    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1882        let mut total_stored = stored_offset;
1883        let mut total_storage = Amount::ZERO;
1884        let mut total_gas: u128 = 0;
1885        let mut agg_stats = WaveAggregateStats::default();
1886        let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
1887        let wave_count = waves.len();
1888
1889        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1890            let wave_num = wave_idx + 1;
1891            let wave_data: Vec<Bytes> = wave_addrs
1892                .iter()
1893                .map(|addr| spill.read_chunk(addr))
1894                .collect::<Result<Vec<_>>>()?;
1895
1896            info!(
1897                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1898                wave_data.len()
1899            );
1900            if let Some(tx) = progress {
1901                let _ = tx
1902                    .send(UploadEvent::QuotingChunks {
1903                        wave: wave_num,
1904                        total_waves: wave_count,
1905                        chunks_in_wave: wave_data.len(),
1906                    })
1907                    .await;
1908            }
1909            let (addresses, wave_storage, wave_gas, wave_stats) = self
1910                .batch_upload_chunks_with_events(
1911                    wave_data,
1912                    progress,
1913                    total_stored,
1914                    total_chunks,
1915                    resume_key,
1916                )
1917                .await?;
1918            total_stored += addresses.len();
1919            if let Ok(cost) = wave_storage.parse::<Amount>() {
1920                total_storage += cost;
1921            }
1922            total_gas = total_gas.saturating_add(wave_gas);
1923            // Merge per-call stats (each call already aggregates across the
1924            // waves it ran internally, so a simple sum/extend is correct).
1925            agg_stats.chunk_attempts_total = agg_stats
1926                .chunk_attempts_total
1927                .saturating_add(wave_stats.chunk_attempts_total);
1928            agg_stats
1929                .store_durations_ms
1930                .extend(wave_stats.store_durations_ms);
1931            for (slot, count) in agg_stats
1932                .retries_histogram
1933                .iter_mut()
1934                .zip(wave_stats.retries_histogram.iter())
1935            {
1936                *slot = slot.saturating_add(*count);
1937            }
1938            if let Some(tx) = progress {
1939                let _ = tx
1940                    .send(UploadEvent::WaveComplete {
1941                        wave: wave_num,
1942                        total_waves: wave_count,
1943                        stored_so_far: total_stored,
1944                        total: total_chunks,
1945                    })
1946                    .await;
1947            }
1948        }
1949
1950        Ok((
1951            total_stored,
1952            total_storage.to_string(),
1953            total_gas,
1954            agg_stats,
1955        ))
1956    }
1957
1958    /// Upload chunks from a spill using pre-computed merkle proofs.
1959    ///
1960    /// Reads one wave at a time from disk, pairs each chunk with its proof,
1961    /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1962    ///
1963    /// A chunk that is transiently short of quorum (`InsufficientPeers`) does
1964    /// **not** abort the file: each wave is driven through
1965    /// [`merkle_store_with_retry`], which collects such chunks and retries them
1966    /// — re-collecting their close group and reusing the same proof — for up to
1967    /// [`MERKLE_STORE_MAX_ATTEMPTS`] rounds with a [`MERKLE_RETRY_BACKOFF`] wait
1968    /// between rounds. Retry is per-wave to preserve the streaming memory bound.
1969    /// Non-quorum errors (e.g. a missing proof) stay fatal and abort immediately.
1970    ///
1971    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)` on success.
1972    /// Costs come from the `batch_result` which was populated during payment.
1973    ///
1974    /// # Errors
1975    ///
1976    /// Returns [`Error::PartialUpload`] if any chunk is still short of quorum
1977    /// after all retries across every wave (other chunks remain stored), or the
1978    /// underlying error for a non-quorum failure.
1979    async fn upload_waves_merkle(
1980        &self,
1981        spill: &ChunkSpill,
1982        addresses: &[[u8; 32]],
1983        batch_result: &MerkleBatchPaymentResult,
1984        already_stored_addresses: &[[u8; 32]],
1985        progress: Option<&mpsc::Sender<UploadEvent>>,
1986    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1987        let mut total_stored = already_stored_addresses.len();
1988        let total_chunks = total_stored + addresses.len();
1989        let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
1990        let mut failed: Vec<([u8; 32], String)> = Vec::new();
1991        let mut agg_stats = WaveAggregateStats::default();
1992
1993        // Chunks without a merkle proof were never paid for: a partial
1994        // `pay_for_merkle_multi_batch` result carries proofs only for the
1995        // sub-batches whose on-chain payment succeeded. Such a chunk cannot be
1996        // stored, so record it as failed (surfaced via `PartialUpload` once the
1997        // storable chunks have been attempted) rather than letting its
1998        // "missing proof" error abort the whole file and discard every other
1999        // chunk's progress.
2000        let (to_store, missing_proof) =
2001            partition_addresses_by_proof(addresses, &batch_result.proofs);
2002        if !missing_proof.is_empty() {
2003            warn!(
2004                "{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
2005                missing_proof.len()
2006            );
2007            for addr in &missing_proof {
2008                failed.push((
2009                    *addr,
2010                    format!("Missing merkle proof for chunk {}", hex::encode(addr)),
2011                ));
2012            }
2013        }
2014
2015        let waves: Vec<&[[u8; 32]]> = to_store.chunks(UPLOAD_WAVE_SIZE).collect();
2016        let wave_count = waves.len();
2017
2018        let store_limiter = self.controller().store.clone();
2019
2020        // Store one chunk to its (freshly re-collected) close group, reusing the
2021        // chunk's merkle proof. Shared across every retry round so a converged
2022        // routing table yields a fresh group. Only `InsufficientPeers` is
2023        // recoverable; a missing proof stays fatal. Mirrors the external-signer
2024        // path's closure in `merkle_upload_chunks`.
2025        let store_one = |addr: [u8; 32], content: Bytes| {
2026            let limiter = store_limiter.clone();
2027            let proof_bytes = batch_result.proofs.get(&addr).cloned();
2028            async move {
2029                let started = std::time::Instant::now();
2030                let proof = proof_bytes.ok_or_else(|| {
2031                    Error::Payment(format!(
2032                        "Missing merkle proof for chunk {}",
2033                        hex::encode(addr)
2034                    ))
2035                })?;
2036                let peers = self.close_group_peers(&addr).await?;
2037                observe_op(
2038                    &limiter,
2039                    || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
2040                    classify_error,
2041                )
2042                .await
2043                .map(|_| started)
2044            }
2045        };
2046
2047        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2048            let wave_num = wave_idx + 1;
2049            let wave = spill.read_wave(wave_addrs)?;
2050
2051            info!(
2052                "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
2053                wave.len()
2054            );
2055
2056            // Clamp fan-out to wave size — partial last wave should
2057            // not pay for extra slots (see PERF-RESULTS.md).
2058            let store_concurrency = store_limiter.current().min(wave.len().max(1));
2059            let chunks: Vec<([u8; 32], Bytes)> = wave
2060                .into_iter()
2061                .map(|(content, addr)| (addr, content))
2062                .collect();
2063
2064            // Retry quorum-short chunks instead of aborting on the first miss.
2065            // `stored_offset` is the running cumulative count so the progress
2066            // events the driver emits stay correctly numbered across waves.
2067            let outcome = match merkle_store_with_retry(
2068                chunks,
2069                store_concurrency,
2070                MERKLE_STORE_MAX_ATTEMPTS,
2071                MERKLE_RETRY_BACKOFF,
2072                progress,
2073                total_stored,
2074                total_chunks,
2075                &store_one,
2076            )
2077            .await
2078            {
2079                Ok(outcome) => outcome,
2080                Err(e) => {
2081                    // A non-quorum store error is fatal for the retry helper
2082                    // (missing proofs were filtered out above, so this is a
2083                    // genuine network/store failure, e.g. a DHT lookup error).
2084                    // Surface it at the file boundary as `PartialUpload` so the
2085                    // chunks already stored in earlier waves — and any
2086                    // missing-proof chunks already recorded — are preserved for
2087                    // resume, rather than discarded with a generic error.
2088                    warn!("merkle wave {wave_num}/{wave_count} aborted: {e}");
2089                    let failed_count = failed.len();
2090                    return Err(Error::PartialUpload {
2091                        stored: stored_addresses,
2092                        stored_count: total_stored,
2093                        failed,
2094                        failed_count,
2095                        total_chunks,
2096                        reason: format!("merkle chunk store aborted: {e}"),
2097                    });
2098                }
2099            };
2100
2101            // Record which of this wave's chunks landed and which exhausted
2102            // their retries, so a permanently-failed chunk can surface as
2103            // `PartialUpload` once the whole file has been attempted.
2104            let wave_failed: HashSet<[u8; 32]> = outcome
2105                .failed_addresses
2106                .iter()
2107                .map(|(addr, _)| *addr)
2108                .collect();
2109            for addr in wave_addrs {
2110                if !wave_failed.contains(addr) {
2111                    stored_addresses.push(*addr);
2112                }
2113            }
2114            failed.extend(outcome.failed_addresses);
2115            total_stored = outcome.stored;
2116
2117            // Merge per-wave stats (durations, attempts, per-round histogram).
2118            agg_stats.chunk_attempts_total = agg_stats
2119                .chunk_attempts_total
2120                .saturating_add(outcome.stats.chunk_attempts_total);
2121            agg_stats
2122                .store_durations_ms
2123                .extend(outcome.stats.store_durations_ms);
2124            for (slot, count) in agg_stats
2125                .retries_histogram
2126                .iter_mut()
2127                .zip(outcome.stats.retries_histogram.iter())
2128            {
2129                *slot = slot.saturating_add(*count);
2130            }
2131
2132            if let Some(tx) = progress {
2133                let _ = tx
2134                    .send(UploadEvent::WaveComplete {
2135                        wave: wave_num,
2136                        total_waves: wave_count,
2137                        stored_so_far: total_stored,
2138                        total: total_chunks,
2139                    })
2140                    .await;
2141            }
2142        }
2143
2144        // A file with any permanently-failed chunk is not fully stored — surface
2145        // it as `PartialUpload`, but only after retries across all waves are
2146        // exhausted (never silently succeed with missing chunks).
2147        if !failed.is_empty() {
2148            let failed_count = failed.len();
2149            warn!(
2150                "merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
2151            );
2152            return Err(Error::PartialUpload {
2153                stored: stored_addresses,
2154                stored_count: total_stored,
2155                failed,
2156                failed_count,
2157                total_chunks,
2158                reason: format!(
2159                    "{failed_count} chunk(s) short of quorum after {MERKLE_STORE_MAX_ATTEMPTS} attempts"
2160                ),
2161            });
2162        }
2163
2164        Ok((
2165            total_stored,
2166            batch_result.storage_cost_atto.clone(),
2167            batch_result.gas_cost_wei,
2168            agg_stats,
2169        ))
2170    }
2171
2172    /// Download and decrypt a file from the network, writing it to disk.
2173    ///
2174    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2175    /// memory at a time, avoiding OOM on large files. Chunks are fetched
2176    /// concurrently within each batch, then decrypted data is written to
2177    /// disk incrementally.
2178    ///
2179    /// Returns the number of bytes written.
2180    ///
2181    /// # Panics
2182    ///
2183    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2184    /// Will panic if called from a `current_thread` runtime because
2185    /// `streaming_decrypt` takes a synchronous callback that must bridge
2186    /// back to async via `block_in_place`.
2187    ///
2188    /// # Errors
2189    ///
2190    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2191    /// or the file cannot be written.
2192    #[allow(clippy::unused_async)]
2193    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2194        self.file_download_with_progress(data_map, output, None)
2195            .await
2196    }
2197
2198    /// Download and decrypt a file with progress events.
2199    ///
2200    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
2201    ///
2202    /// Progress reporting:
2203    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
2204    ///    `ChunksFetched` with `total: 0` during resolution)
2205    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
2206    /// 3. Fetches data chunks with accurate `fetched/total` progress
2207    #[allow(clippy::unused_async)]
2208    pub async fn file_download_with_progress(
2209        &self,
2210        data_map: &DataMap,
2211        output: &Path,
2212        progress: Option<mpsc::Sender<DownloadEvent>>,
2213    ) -> Result<u64> {
2214        debug!("Downloading file to {}", output.display());
2215
2216        let handle = Handle::current();
2217
2218        // Phase 1: Resolve hierarchical DataMap to root level.
2219        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
2220        let root_map = if data_map.is_child() {
2221            let dm_chunks = data_map.len();
2222            if let Some(ref tx) = progress {
2223                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
2224                    total_map_chunks: dm_chunks,
2225                });
2226            }
2227
2228            let resolve_progress = progress.clone();
2229            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2230
2231            let resolved = tokio::task::block_in_place(|| {
2232                let counter_ref = resolve_counter.clone();
2233                let progress_ref = resolve_progress.clone();
2234                let fetch_limiter = self.controller().fetch.clone();
2235                let fetch = |batch: &[(usize, XorName)]| {
2236                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2237                    let counter = counter_ref.clone();
2238                    let prog = progress_ref.clone();
2239                    let limiter = fetch_limiter.clone();
2240                    handle.block_on(async {
2241                        // Use rebucketed_unordered so the in-flight cap
2242                        // is re-read from the limiter as each slot frees.
2243                        // `buffer_unordered` snapshots the cap once at
2244                        // pipeline build, which means observe_op
2245                        // signals from inside chunk_get cannot reduce
2246                        // concurrency on the current batch — exactly
2247                        // the case where load-shedding is needed.
2248                        let mut results = rebucketed_unordered(
2249                            &limiter,
2250                            batch_owned,
2251                            |(idx, hash): (usize, XorName)| {
2252                                let counter = counter.clone();
2253                                let prog = prog.clone();
2254                                async move {
2255                                    let addr = hash.0;
2256                                    // chunk_get_observed feeds the
2257                                    // adaptive fetch limiter once per
2258                                    // call via chunk_get_outcome
2259                                    // (Ok(None) -> Timeout is the
2260                                    // load-shedding signal for
2261                                    // sustained close-group exhaustion).
2262                                    let chunk = self
2263                                        .chunk_get_observed(&addr)
2264                                        .await
2265                                        .map_err(|e| {
2266                                            self_encryption::Error::Generic(format!(
2267                                                "DataMap resolution failed: {e}"
2268                                            ))
2269                                        })?
2270                                        .ok_or_else(|| {
2271                                            self_encryption::Error::Generic(format!(
2272                                                "DataMap chunk not found: {}",
2273                                                hex::encode(addr)
2274                                            ))
2275                                        })?;
2276                                    let fetched = counter
2277                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
2278                                        + 1;
2279                                    if let Some(ref tx) = prog {
2280                                        let _ =
2281                                            tx.try_send(DownloadEvent::MapChunkFetched { fetched });
2282                                    }
2283                                    Ok::<_, self_encryption::Error>((idx, chunk.content))
2284                                }
2285                            },
2286                        )
2287                        .await?;
2288                        // CRITICAL: self_encryption::get_root_data_map_parallel
2289                        // pairs the returned Vec POSITIONALLY with the input
2290                        // hashes via .zip() and discards our idx field.
2291                        // rebucketed_unordered preserves first-completion
2292                        // order, so sort by idx to restore input order
2293                        // before returning.
2294                        results.sort_by_key(|(idx, _)| *idx);
2295                        Ok(results)
2296                    })
2297                };
2298                get_root_data_map_parallel(data_map.clone(), &fetch)
2299            })
2300            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
2301
2302            info!(
2303                "Resolved hierarchical DataMap: {} data chunks",
2304                resolved.len()
2305            );
2306            resolved
2307        } else {
2308            data_map.clone()
2309        };
2310
2311        // Phase 2: Now we know the real chunk count.
2312        let total_chunks = root_map.len();
2313        if let Some(ref tx) = progress {
2314            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
2315        }
2316
2317        // Phase 3: Fetch and decrypt data chunks with accurate progress.
2318        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2319        let fetched_for_closure = fetched_counter.clone();
2320        let progress_for_closure = progress.clone();
2321
2322        let fetch_limiter_outer = self.controller().fetch.clone();
2323        let usable_memory = usable_memory_bytes();
2324        let configured_batch_floor = stream_decrypt_batch_size();
2325        let fetch_cap = fetch_limiter_outer.current();
2326        let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
2327            total_chunks,
2328            fetch_cap,
2329            configured_batch_floor,
2330            usable_memory,
2331        );
2332        info!(
2333            total_chunks,
2334            fetch_cap,
2335            configured_batch_floor,
2336            ?usable_memory,
2337            decrypt_batch_size,
2338            "Selected adaptive stream decrypt batch size"
2339        );
2340
2341        let stream = streaming_decrypt_with_batch_size(
2342            &root_map,
2343            |batch: &[(usize, XorName)]| {
2344                let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2345                let fetched_ref = fetched_for_closure.clone();
2346                let progress_ref = progress_for_closure.clone();
2347                let fetch_limiter = fetch_limiter_outer.clone();
2348
2349                tokio::task::block_in_place(|| {
2350                    handle.block_on(async {
2351                        // First pass: try every chunk in the batch via
2352                        // chunk_get_observed (which already does its own
2353                        // first-attempt + retry sweep). A chunk that
2354                        // returns Ok(None) here is NOT a fatal failure
2355                        // — it's a candidate for a deferred retry below.
2356                        // We carry the chunk's XorName through so the
2357                        // retry pass can re-fetch by address.
2358                        //
2359                        // The closure ONLY returns Err on a true
2360                        // protocol/network error from chunk_get (the
2361                        // Err variant). Ok(None) is encoded as
2362                        // `Err(addr)` in the inner Result so the outer
2363                        // rebucketed pass doesn't early-abort on it.
2364                        type BatchEntry =
2365                            (usize, std::result::Result<bytes::Bytes, XorName>);
2366                        let raw: Vec<BatchEntry> = rebucketed_unordered(
2367                            &fetch_limiter,
2368                            batch_owned,
2369                            |(idx, hash): (usize, XorName)| {
2370                                let fetched_ref = fetched_ref.clone();
2371                                let progress_ref = progress_ref.clone();
2372                                async move {
2373                                    let addr = hash.0;
2374                                    let addr_hex = hex::encode(addr);
2375                                    match self.chunk_get_observed(&addr).await {
2376                                        Ok(Some(chunk)) => {
2377                                            let fetched = fetched_ref.fetch_add(
2378                                                1,
2379                                                std::sync::atomic::Ordering::Relaxed,
2380                                            ) + 1;
2381                                            info!("Downloaded {fetched}/{total_chunks}");
2382                                            if let Some(ref tx) = progress_ref {
2383                                                let _ = tx.try_send(
2384                                                    DownloadEvent::ChunksFetched {
2385                                                        fetched,
2386                                                        total: total_chunks,
2387                                                    },
2388                                                );
2389                                            }
2390                                            Ok::<BatchEntry, self_encryption::Error>((
2391                                                idx,
2392                                                Ok(chunk.content),
2393                                            ))
2394                                        }
2395                                        // chunk_get returned Ok(None): defer
2396                                        // this chunk for a later retry rather
2397                                        // than aborting the whole batch.
2398                                        Ok(None) => Ok((idx, Err(hash))),
2399                                        // A transient error for one chunk
2400                                        // (e.g. its close-group DHT walk
2401                                        // erroring on this pass) must not
2402                                        // abort a multi-hundred-chunk
2403                                        // download. Defer it to the retry
2404                                        // rounds, same as Ok(None); only a
2405                                        // chunk that survives all deferred
2406                                        // rounds is fatal.
2407                                        Err(e) => {
2408                                            info!(
2409                                                "First-pass fetch error for {addr_hex}: {e}; deferring"
2410                                            );
2411                                            Ok((idx, Err(hash)))
2412                                        }
2413                                    }
2414                                }
2415                            },
2416                        )
2417                        .await?;
2418
2419                        // Partition: things we already have vs the
2420                        // deferred set we need to retry.
2421                        let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
2422                        let mut deferred: Vec<(usize, XorName)> = Vec::new();
2423                        for (idx, inner) in raw {
2424                            match inner {
2425                                Ok(bytes) => results.push((idx, bytes)),
2426                                Err(hash) => deferred.push((idx, hash)),
2427                            }
2428                        }
2429
2430                        // Deferred retry pass: retry the deferred chunks
2431                        // in CONCURRENT rounds (reusing the fetch
2432                        // limiter's cap), not serially. The first round
2433                        // fires immediately — most deferrals on a
2434                        // healthy-but-lossy link are peer-side noise
2435                        // that clears in well under a second, and
2436                        // serializing them behind mandatory multi-second
2437                        // sleeps was the single biggest throughput sink
2438                        // on such links (a batch deferring ~20 chunks
2439                        // burned minutes of near-zero throughput even
2440                        // though every chunk succeeded on its first
2441                        // retry). Only chunks that survive a round get a
2442                        // longer back-off before the next, so genuine
2443                        // saturation still gets time to settle.
2444                        if !deferred.is_empty() {
2445                            // Round delays in seconds. Round 0 is
2446                            // immediate; later rounds back off to ride
2447                            // out sustained saturation.
2448                            const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
2449                            info!(
2450                                "Deferring {} chunk(s) for concurrent retry after batch settles",
2451                                deferred.len()
2452                            );
2453                            let mut remaining = deferred;
2454                            for (round, &delay_secs) in DEFERRED_ROUND_DELAYS_SECS
2455                                .iter()
2456                                .enumerate()
2457                            {
2458                                if remaining.is_empty() {
2459                                    break;
2460                                }
2461                                if delay_secs > 0 {
2462                                    tokio::time::sleep(std::time::Duration::from_secs(
2463                                        delay_secs,
2464                                    ))
2465                                    .await;
2466                                }
2467                                info!(
2468                                    "Deferred retry round {}/{}: {} chunk(s)",
2469                                    round + 1,
2470                                    DEFERRED_ROUND_DELAYS_SECS.len(),
2471                                    remaining.len(),
2472                                );
2473                                let round_input = std::mem::take(&mut remaining);
2474                                let round_results: Vec<BatchEntry> = rebucketed_unordered(
2475                                    &fetch_limiter,
2476                                    round_input,
2477                                    |(idx, hash): (usize, XorName)| {
2478                                        let fetched_ref = fetched_ref.clone();
2479                                        let progress_ref = progress_ref.clone();
2480                                        async move {
2481                                            let addr = hash.0;
2482                                            // Both Ok(None) and a transient
2483                                            // Err re-defer the chunk to the
2484                                            // next round rather than
2485                                            // aborting; only the final
2486                                            // round's leftovers are fatal.
2487                                            match self.chunk_get_observed(&addr).await {
2488                                                Ok(Some(chunk)) => {
2489                                                    let fetched = fetched_ref.fetch_add(
2490                                                        1,
2491                                                        std::sync::atomic::Ordering::Relaxed,
2492                                                    ) + 1;
2493                                                    info!(
2494                                                        "Downloaded {fetched}/{total_chunks} (deferred retry)"
2495                                                    );
2496                                                    if let Some(ref tx) = progress_ref {
2497                                                        let _ = tx.try_send(
2498                                                            DownloadEvent::ChunksFetched {
2499                                                                fetched,
2500                                                                total: total_chunks,
2501                                                            },
2502                                                        );
2503                                                    }
2504                                                    Ok::<BatchEntry, self_encryption::Error>((
2505                                                        idx,
2506                                                        Ok(chunk.content),
2507                                                    ))
2508                                                }
2509                                                Ok(None) => Ok((idx, Err(hash))),
2510                                                Err(e) => {
2511                                                    info!(
2512                                                        "Deferred retry for {} hit transient error: {e}; re-deferring",
2513                                                        hex::encode(addr)
2514                                                    );
2515                                                    Ok((idx, Err(hash)))
2516                                                }
2517                                            }
2518                                        }
2519                                    },
2520                                )
2521                                .await?;
2522                                for (idx, inner) in round_results {
2523                                    match inner {
2524                                        Ok(bytes) => results.push((idx, bytes)),
2525                                        Err(hash) => remaining.push((idx, hash)),
2526                                    }
2527                                }
2528                            }
2529                            if let Some((_, hash)) = remaining.first() {
2530                                return Err(self_encryption::Error::Generic(format!(
2531                                    "Chunk not found after {} deferred retry rounds: {}",
2532                                    DEFERRED_ROUND_DELAYS_SECS.len(),
2533                                    hex::encode(hash.0),
2534                                )));
2535                            }
2536                        }
2537
2538                        // streaming_decrypt itself sort_by_keys before
2539                        // zipping, but the same closure is also passed
2540                        // through get_root_data_map_parallel internally
2541                        // (see self_encryption::stream_decrypt.rs::new), and
2542                        // THAT path zips positionally without sorting. Sort
2543                        // here so both consumers see input order.
2544                        results.sort_by_key(|(idx, _)| *idx);
2545                        Ok(results)
2546                    })
2547                })
2548            },
2549            decrypt_batch_size,
2550        )
2551        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
2552
2553        // Write decrypted chunks to a temp file, then rename atomically.
2554        let parent = output.parent().unwrap_or_else(|| Path::new("."));
2555        let unique: u64 = rand::random();
2556        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
2557
2558        let write_result = (|| -> Result<u64> {
2559            let mut file = std::fs::File::create(&tmp_path)?;
2560            let mut bytes_written = 0u64;
2561            for chunk_result in stream {
2562                let chunk_bytes = chunk_result
2563                    .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
2564                file.write_all(&chunk_bytes)?;
2565                bytes_written += chunk_bytes.len() as u64;
2566            }
2567            file.flush()?;
2568            Ok(bytes_written)
2569        })();
2570
2571        match write_result {
2572            Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
2573                Ok(()) => {
2574                    info!(
2575                        "File downloaded: {bytes_written} bytes written to {}",
2576                        output.display()
2577                    );
2578                    Ok(bytes_written)
2579                }
2580                Err(rename_err) => {
2581                    if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2582                        warn!(
2583                            "Failed to remove temp download file {}: {cleanup_err}",
2584                            tmp_path.display()
2585                        );
2586                    }
2587                    Err(rename_err.into())
2588                }
2589            },
2590            Err(e) => {
2591                if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2592                    warn!(
2593                        "Failed to remove temp download file {}: {cleanup_err}",
2594                        tmp_path.display()
2595                    );
2596                }
2597                Err(e)
2598            }
2599        }
2600    }
2601}
2602
2603#[cfg(test)]
2604#[allow(clippy::unwrap_used)]
2605mod tests {
2606    use super::*;
2607
2608    #[test]
2609    fn disk_space_check_passes_for_small_file() {
2610        // A 1 KB file should always pass the disk space check
2611        check_disk_space_for_spill(1024).unwrap();
2612    }
2613
2614    #[test]
2615    fn disk_space_check_fails_for_absurd_size() {
2616        // Requesting space for a 1 exabyte file should fail on any real system
2617        let result = check_disk_space_for_spill(u64::MAX / 2);
2618        assert!(result.is_err());
2619        let err = result.unwrap_err();
2620        assert!(
2621            matches!(err, Error::InsufficientDiskSpace(_)),
2622            "expected InsufficientDiskSpace, got: {err}"
2623        );
2624    }
2625
2626    #[test]
2627    fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
2628        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
2629
2630        assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
2631    }
2632
2633    #[test]
2634    fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
2635        let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
2636
2637        assert_eq!(batch_size, 12);
2638    }
2639
2640    #[test]
2641    fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
2642        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
2643
2644        assert_eq!(batch_size, 32);
2645    }
2646
2647    #[test]
2648    fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
2649        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
2650
2651        assert_eq!(batch_size, 10);
2652    }
2653
2654    #[test]
2655    fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
2656        let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
2657            .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
2658            .max(1);
2659        let usable_memory = estimated_bytes_per_chunk
2660            .saturating_mul(16)
2661            .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
2662        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
2663
2664        assert_eq!(batch_size, 16);
2665    }
2666
2667    #[test]
2668    fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
2669        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
2670
2671        assert_eq!(batch_size, 1);
2672    }
2673
2674    #[test]
2675    fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
2676        let covered = compute_address(&Bytes::from_static(b"covered"));
2677        let extra = compute_address(&Bytes::from_static(b"extra"));
2678        let missing = compute_address(&Bytes::from_static(b"missing"));
2679        let cached = MerkleBatchPaymentResult {
2680            proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
2681            chunk_count: 2,
2682            storage_cost_atto: "0".to_string(),
2683            gas_cost_wei: 0,
2684            merkle_payment_timestamp: 0,
2685        };
2686
2687        assert!(cached_merkle_covers_addresses(&cached, &[covered]));
2688        assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
2689        assert!(!cached_merkle_covers_addresses(
2690            &cached,
2691            &[covered, missing]
2692        ));
2693    }
2694
2695    /// A partial merkle payment leaves some addresses without a proof. Those
2696    /// must be split out so `upload_waves_merkle` reports them as failed
2697    /// (`PartialUpload`) instead of aborting the whole file — preserving the
2698    /// addresses' original order in each group.
2699    #[test]
2700    fn partition_addresses_by_proof_splits_paid_and_unpaid() {
2701        let paid_a = [1u8; 32];
2702        let unpaid_b = [2u8; 32];
2703        let paid_c = [3u8; 32];
2704        let unpaid_d = [4u8; 32];
2705        let proofs: HashMap<[u8; 32], Vec<u8>> =
2706            HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
2707
2708        let (to_store, missing) =
2709            partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
2710
2711        assert_eq!(to_store, vec![paid_a, paid_c]);
2712        assert_eq!(missing, vec![unpaid_b, unpaid_d]);
2713    }
2714
2715    #[test]
2716    fn partition_addresses_by_proof_handles_all_or_nothing() {
2717        let a = [5u8; 32];
2718        let b = [6u8; 32];
2719
2720        // No proofs at all → every address is missing.
2721        let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
2722        let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
2723        assert!(to_store.is_empty());
2724        assert_eq!(missing, vec![a, b]);
2725
2726        // All proofs present → nothing missing.
2727        let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
2728        let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
2729        assert_eq!(to_store, vec![a, b]);
2730        assert!(missing.is_empty());
2731    }
2732
2733    #[test]
2734    fn chunk_spill_round_trip() {
2735        let mut spill = ChunkSpill::new().unwrap();
2736        let data1 = vec![0xAA; 1024];
2737        let data2 = vec![0xBB; 2048];
2738
2739        spill.push(&data1).unwrap();
2740        spill.push(&data2).unwrap();
2741
2742        assert_eq!(spill.len(), 2);
2743        assert_eq!(spill.total_bytes(), 1024 + 2048);
2744        let chunk_entries = spill.chunk_entries().unwrap();
2745        let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
2746        assert_eq!(entry_total, 1024 + 2048);
2747
2748        // Read back and verify
2749        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
2750        assert_eq!(&chunk1[..], &data1[..]);
2751
2752        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
2753        assert_eq!(&chunk2[..], &data2[..]);
2754
2755        // Verify waves with 1-chunk wave size
2756        let waves: Vec<_> = spill.addresses.chunks(1).collect();
2757        assert_eq!(waves.len(), 2);
2758    }
2759
2760    #[test]
2761    fn chunk_spill_cleanup_on_drop() {
2762        let dir;
2763        {
2764            let spill = ChunkSpill::new().unwrap();
2765            dir = spill.dir.clone();
2766            assert!(dir.exists());
2767        }
2768        // After drop, the directory should be cleaned up
2769        assert!(!dir.exists(), "spill dir should be removed on drop");
2770    }
2771
2772    #[test]
2773    fn chunk_spill_deduplicates_identical_content() {
2774        let mut spill = ChunkSpill::new().unwrap();
2775        let data = vec![0xCC; 512];
2776
2777        spill.push(&data).unwrap();
2778        spill.push(&data).unwrap(); // same content, should be skipped
2779        spill.push(&data).unwrap(); // again
2780
2781        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
2782        assert_eq!(
2783            spill.total_bytes(),
2784            512,
2785            "total_bytes should count unique only"
2786        );
2787
2788        // Different content should still be added
2789        let data2 = vec![0xDD; 256];
2790        spill.push(&data2).unwrap();
2791        assert_eq!(spill.len(), 2);
2792        assert_eq!(spill.total_bytes(), 512 + 256);
2793    }
2794}
2795
2796/// Compile-time assertions that Client file method futures are Send.
2797#[cfg(test)]
2798mod send_assertions {
2799    use super::*;
2800
2801    fn _assert_send<T: Send>(_: &T) {}
2802
2803    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2804    async fn _file_upload_is_send(client: &Client) {
2805        let fut = client.file_upload(Path::new("/dev/null"));
2806        _assert_send(&fut);
2807    }
2808
2809    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2810    async fn _file_upload_with_mode_is_send(client: &Client) {
2811        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
2812        _assert_send(&fut);
2813    }
2814
2815    #[allow(
2816        dead_code,
2817        unreachable_code,
2818        unused_variables,
2819        clippy::diverging_sub_expression
2820    )]
2821    async fn _file_download_is_send(client: &Client) {
2822        let dm: DataMap = todo!();
2823        let fut = client.file_download(&dm, Path::new("/dev/null"));
2824        _assert_send(&fut);
2825    }
2826}