Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15    finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::chunk::ChunkPeerGetResult;
18use crate::data::client::classify_error;
19use crate::data::client::merkle::{
20    chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_deferred_retry,
21    merkle_store_with_retry, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
22    PreparedMerkleBatch, DEFERRED_ROUND_DELAYS_SECS,
23};
24use crate::data::client::Client;
25use crate::data::error::{Error, PartialUploadSpend, Result};
26use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
27use ant_protocol::transport::{MultiAddr, PeerId};
28use ant_protocol::{compute_address, XorName as ChunkAddress, DATA_TYPE_CHUNK};
29use bytes::Bytes;
30use fs2::FileExt;
31use futures::stream::{self, StreamExt};
32use self_encryption::{
33    get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
34    streaming_decrypt_with_batch_size, DataMap,
35};
36use std::collections::{HashMap, HashSet};
37use std::io::Write;
38use std::num::NonZeroUsize;
39use std::path::{Path, PathBuf};
40use std::sync::{Arc, Mutex};
41use tokio::runtime::Handle;
42use tokio::sync::mpsc;
43use tracing::{debug, info, warn};
44use xor_name::XorName;
45
46/// Progress events emitted during file upload for UI feedback.
47#[derive(Debug, Clone)]
48pub enum UploadEvent {
49    /// A chunk has been encrypted and spilled to disk.
50    Encrypting { chunks_done: usize },
51    /// File encryption complete.
52    Encrypted { total_chunks: usize },
53    /// Starting quote collection for a wave.
54    QuotingChunks {
55        wave: usize,
56        total_waves: usize,
57        chunks_in_wave: usize,
58    },
59    /// A chunk has been quoted (peer discovery + price received).
60    /// This is the slow phase — each quote involves network round-trips.
61    ChunkQuoted { quoted: usize, total: usize },
62    /// A chunk has been stored on the network.
63    ChunkStored { stored: usize, total: usize },
64    /// A wave has completed.
65    WaveComplete {
66        wave: usize,
67        total_waves: usize,
68        stored_so_far: usize,
69        total: usize,
70    },
71}
72
73/// Progress events emitted during file download for UI feedback.
74#[derive(Debug, Clone)]
75pub enum DownloadEvent {
76    /// Resolving hierarchical DataMap to discover real chunk count.
77    ResolvingDataMap { total_map_chunks: usize },
78    /// A DataMap chunk has been fetched during resolution.
79    MapChunkFetched { fetched: usize },
80    /// DataMap resolved — total data chunk count now known.
81    DataMapResolved { total_chunks: usize },
82    /// Data chunks are being fetched from the network.
83    ChunksFetched { fetched: usize, total: usize },
84}
85
86/// File download result when peer-health diagnostics are enabled.
87#[derive(Debug, Clone)]
88pub struct FileDownloadWithPeerReport {
89    /// Number of plaintext bytes written to the destination.
90    pub bytes_written: u64,
91    /// Per-file-chunk closest-peer GET results collected during the actual download.
92    pub chunk_reports: Vec<FileChunkPeerReport>,
93}
94
95/// Closest-peer GET results for one file chunk.
96#[derive(Debug, Clone)]
97pub struct FileChunkPeerReport {
98    /// 1-based chunk index in the resolved file DataMap.
99    pub index: usize,
100    /// Chunk address.
101    pub address: ChunkAddress,
102    /// All diagnostic GET sweeps attempted for this chunk.
103    pub sweeps: Vec<FileChunkPeerSweepReport>,
104}
105
106/// One all-peer diagnostic GET sweep for a file chunk.
107#[derive(Debug, Clone)]
108pub struct FileChunkPeerSweepReport {
109    /// 1-based attempt number for this chunk.
110    pub attempt: usize,
111    /// Whether this sweep happened during a deferred retry round.
112    pub deferred_retry: bool,
113    /// DHT lookup / sweep-level error, if the closest-peer group could not be queried.
114    pub error: Option<String>,
115    /// Per-peer results, sorted closest first.
116    pub peers: Vec<FileChunkPeerReportPeer>,
117}
118
119/// One peer result in a [`FileChunkPeerReport`].
120#[derive(Debug, Clone)]
121pub struct FileChunkPeerReportPeer {
122    /// Peer queried for the chunk.
123    pub peer_id: PeerId,
124    /// Known network addresses used for the peer.
125    pub peer_addrs: Vec<MultiAddr>,
126    /// XOR distance from `peer_id` to the chunk address.
127    pub xor_distance: ChunkAddress,
128    /// Whether this peer returned the chunk or why it did not.
129    pub status: FileChunkPeerStatus,
130}
131
132/// Peer-level file chunk GET diagnostic status.
133#[derive(Debug, Clone)]
134pub enum FileChunkPeerStatus {
135    /// The peer returned the chunk.
136    Found { bytes: usize },
137    /// The peer responded authoritatively that it does not store the chunk.
138    NotFound,
139    /// The peer did not respond before the timeout.
140    Timeout { message: String },
141    /// The transport/network path to the peer failed.
142    NetworkError { message: String },
143    /// Any other per-peer error.
144    Error { message: String },
145}
146
147/// One entry in the per-chunk quote list returned by
148/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
149/// signed quote it returned, and the payment amount it is demanding.
150type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
151
152type DownloadBatchEntry = (usize, std::result::Result<Bytes, XorName>);
153
154#[derive(Debug, Clone)]
155struct RecordedFileChunkPeerSweep {
156    index: usize,
157    address: ChunkAddress,
158    sweep: FileChunkPeerSweepReport,
159}
160
161#[derive(Clone)]
162struct FileDownloadFetchContext {
163    total_chunks: usize,
164    peer_count: usize,
165    fetched_ref: Arc<std::sync::atomic::AtomicUsize>,
166    progress_ref: Option<mpsc::Sender<DownloadEvent>>,
167    peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
168}
169
170/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
171const UPLOAD_WAVE_SIZE: usize = 64;
172
173/// Stream decrypt batches should be larger than fetch fan-out so
174/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
175/// ones complete, instead of stopping at each self-encryption batch boundary.
176const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
177
178/// Use at most this fraction of currently usable RAM for one decrypt batch.
179const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
180
181/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
182/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
183/// payload bytes alone.
184const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
185
186/// Maximum number of distinct chunk addresses to sample when probing for a
187/// representative quote in [`Client::estimate_upload_cost`].
188///
189/// Bounded small so we never spend more than a couple of round-trips on the
190/// `AlreadyStored` retry path, which only matters when many leading chunks
191/// of a file already live on the network.
192const ESTIMATE_SAMPLE_CAP: usize = 5;
193
194/// First diagnostic all-peer fetch attempt for a file chunk.
195const FIRST_DIAGNOSTIC_FETCH_ATTEMPT: usize = 1;
196
197/// Deferred retry attempt number for retry round 0.
198const DEFERRED_RETRY_ATTEMPT_OFFSET: usize = 2;
199
200/// Pick up to `cap` chunk indices spread evenly across `[0, total)`, always
201/// including the first and last chunk.
202///
203/// Sampling the *first* N chunks biases the probe: a file sharing a leading
204/// prefix with a prior upload (compressed archives, similar headers) reports
205/// those chunks as `AlreadyStored` even when the tail is new, so a positional
206/// sample looks in the worst possible place. Spreading the sample means a
207/// single new chunk anywhere in the file yields a real price.
208///
209/// Returns `[0]` for a single chunk and every index when `total <= cap`, so
210/// [`Client::estimate_upload_cost`] can still detect the "whole file sampled"
211/// case. Indices are strictly increasing.
212fn distributed_sample_indices(total: usize, cap: usize) -> Vec<usize> {
213    if total == 0 {
214        return Vec::new();
215    }
216    let sample_limit = total.min(cap);
217    if sample_limit <= 1 {
218        return vec![0];
219    }
220    let mut indices: Vec<usize> = (0..sample_limit)
221        .map(|i| i * (total - 1) / (sample_limit - 1))
222        .collect();
223    indices.dedup(); // defensive: already strictly increasing for cap >= 2
224    indices
225}
226
227fn file_chunk_sweep_report_from_peer_results(
228    attempt: usize,
229    deferred_retry: bool,
230    results: &[ChunkPeerGetResult],
231) -> (Option<Bytes>, FileChunkPeerSweepReport) {
232    let mut content = None;
233    let peers = results
234        .iter()
235        .map(|result| {
236            if content.is_none() {
237                if let Ok(Some(chunk)) = &result.chunk_result {
238                    content = Some(chunk.content.clone());
239                }
240            }
241
242            FileChunkPeerReportPeer {
243                peer_id: result.peer_id,
244                peer_addrs: result.peer_addrs.clone(),
245                xor_distance: result.xor_distance,
246                status: file_chunk_peer_status(&result.chunk_result),
247            }
248        })
249        .collect();
250
251    (
252        content,
253        FileChunkPeerSweepReport {
254            attempt,
255            deferred_retry,
256            error: None,
257            peers,
258        },
259    )
260}
261
262fn file_chunk_sweep_report_from_error(
263    attempt: usize,
264    deferred_retry: bool,
265    error: &Error,
266) -> FileChunkPeerSweepReport {
267    FileChunkPeerSweepReport {
268        attempt,
269        deferred_retry,
270        error: Some(error.to_string()),
271        peers: Vec::new(),
272    }
273}
274
275fn file_chunk_reports_from_recorded_sweeps(
276    mut sweeps: Vec<RecordedFileChunkPeerSweep>,
277) -> Vec<FileChunkPeerReport> {
278    sweeps.sort_by_key(|record| (record.index, record.sweep.attempt));
279
280    let mut reports: Vec<FileChunkPeerReport> = Vec::new();
281    for record in sweeps {
282        if let Some(report) = reports
283            .last_mut()
284            .filter(|report| report.index == record.index)
285        {
286            report.sweeps.push(record.sweep);
287            continue;
288        }
289
290        reports.push(FileChunkPeerReport {
291            index: record.index,
292            address: record.address,
293            sweeps: vec![record.sweep],
294        });
295    }
296
297    reports
298}
299
300fn file_chunk_peer_status(
301    chunk_result: &std::result::Result<Option<ant_protocol::DataChunk>, Error>,
302) -> FileChunkPeerStatus {
303    match chunk_result {
304        Ok(Some(chunk)) => FileChunkPeerStatus::Found {
305            bytes: chunk.content.len(),
306        },
307        Ok(None) => FileChunkPeerStatus::NotFound,
308        Err(Error::Timeout(e)) => FileChunkPeerStatus::Timeout { message: e.clone() },
309        Err(Error::Network(e)) => FileChunkPeerStatus::NetworkError { message: e.clone() },
310        Err(e) => FileChunkPeerStatus::Error {
311            message: e.to_string(),
312        },
313    }
314}
315
316/// Gas used by one `pay_for_quotes` transaction that packs up to
317/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
318///
319/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
320/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
321/// the base tx overhead. On Arbitrum that is roughly
322/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
323/// conservative per-wave upper bound.
324const GAS_PER_WAVE_TX: u128 = 1_500_000;
325
326/// Gas used by one merkle batch payment transaction.
327///
328/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
329/// and posts a pool commitment, so budget higher than a plain transfer.
330const GAS_PER_MERKLE_TX: u128 = 500_000;
331
332/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
333/// figure when no live gas oracle is consulted.
334///
335/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
336/// that as the default so the CLI prints a sensible order-of-magnitude
337/// number. Users should treat the reported gas cost as an estimate, not a
338/// commitment — real gas is bid at submission time.
339const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
340
341/// Extra headroom percentage for disk space check.
342///
343/// Encrypted chunks are slightly larger than the source data due to padding
344/// and self-encryption overhead. We require file_size + 10% free space in
345/// the temp directory to account for this.
346const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
347
348/// Temporary on-disk buffer for encrypted chunks.
349///
350/// During file encryption, chunks are written to a temp directory so that
351/// only their 32-byte addresses stay in memory. At upload time chunks are
352/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
353/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
354///
355/// This is a small TOCTOU guard covering the sub-millisecond window inside
356/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
357/// dir is older than this and its lockfile is releasable, the owning process
358/// is gone and the dir is safe to reap — regardless of how old it is.
359///
360/// The previous policy waited 24 h before reaping any orphan, which meant
361/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
362/// spill dir until the next day's upload — and on a host being restart-looped
363/// by systemd, orphans could fill the disk well within that window.
364const SPILL_STALE_GRACE_SECS: u64 = 30;
365
366/// Prefix for spill directory names to distinguish from user files.
367const SPILL_DIR_PREFIX: &str = "spill_";
368
369/// Lockfile name inside each spill dir to signal active use.
370const SPILL_LOCK_NAME: &str = ".lock";
371
372struct ChunkSpill {
373    /// Directory holding spilled chunk files (named by hex address).
374    dir: PathBuf,
375    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
376    _lock: std::fs::File,
377    /// Deduplicated list of chunk addresses.
378    addresses: Vec<[u8; 32]>,
379    /// Tracks seen addresses for deduplication.
380    seen: HashSet<[u8; 32]>,
381    /// Byte size per spilled chunk address.
382    sizes: HashMap<[u8; 32], u64>,
383    /// Running total of unique chunk byte sizes (for average-size calculation).
384    total_bytes: u64,
385}
386
387impl ChunkSpill {
388    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
389    fn spill_root() -> Result<PathBuf> {
390        use crate::config;
391        let root = config::data_dir()
392            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
393            .join("spill");
394        Ok(root)
395    }
396
397    /// Create a new spill directory under `<data_dir>/spill/`.
398    ///
399    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
400    /// identified by prefix and cleaned up by age. A lockfile inside the
401    /// dir prevents concurrent cleanup from deleting an active spill.
402    fn new() -> Result<Self> {
403        let root = Self::spill_root()?;
404        std::fs::create_dir_all(&root)?;
405
406        // Clean up stale spill dirs from previous crashed runs.
407        Self::cleanup_stale(&root);
408
409        let now = std::time::SystemTime::now()
410            .duration_since(std::time::UNIX_EPOCH)
411            .unwrap_or_default()
412            .as_secs();
413        let unique: u64 = rand::random();
414        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
415        std::fs::create_dir(&dir)?;
416
417        // Create and hold a lockfile for the lifetime of this spill.
418        // cleanup_stale() will skip dirs with locked files.
419        let lock_path = dir.join(SPILL_LOCK_NAME);
420        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
421            Error::Io(std::io::Error::new(
422                e.kind(),
423                format!("failed to create spill lockfile: {e}"),
424            ))
425        })?;
426        lock_file.try_lock_exclusive().map_err(|e| {
427            Error::Io(std::io::Error::new(
428                e.kind(),
429                format!("failed to lock spill lockfile: {e}"),
430            ))
431        })?;
432
433        Ok(Self {
434            dir,
435            _lock: lock_file,
436            addresses: Vec::new(),
437            seen: HashSet::new(),
438            sizes: HashMap::new(),
439            total_bytes: 0,
440        })
441    }
442
443    /// Clean up stale spill directories. Best-effort, errors are logged.
444    ///
445    /// A spill dir is reaped when:
446    /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
447    /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
448    /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
449    /// 4. Its lockfile is releasable — i.e. no live process holds it
450    ///
451    /// The lockfile is the primary correctness gate: a releasable lock means
452    /// the owning `ChunkSpill` has been dropped or the process is gone, so
453    /// the dir is fair game. The grace period covers only the brief window
454    /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
455    ///
456    /// Safe to call concurrently from multiple processes.
457    fn cleanup_stale(root: &Path) {
458        let now = std::time::SystemTime::now()
459            .duration_since(std::time::UNIX_EPOCH)
460            .unwrap_or_default()
461            .as_secs();
462
463        if now == 0 {
464            // Clock is broken (before Unix epoch). Skip cleanup to avoid
465            // misidentifying dirs as stale.
466            warn!("System clock before Unix epoch, skipping spill cleanup");
467            return;
468        }
469
470        let entries = match std::fs::read_dir(root) {
471            Ok(entries) => entries,
472            Err(_) => return,
473        };
474
475        for entry in entries.flatten() {
476            let name = entry.file_name();
477            let name_str = name.to_string_lossy();
478
479            // Only process dirs with our prefix.
480            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
481                Some(s) => s,
482                None => continue,
483            };
484
485            // Parse timestamp: "spill_<timestamp>_<random>"
486            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
487                Some(ts) => ts,
488                None => continue,
489            };
490
491            if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
492                continue;
493            }
494
495            // Safety: only delete actual directories, not symlinks.
496            let file_type = match entry.file_type() {
497                Ok(ft) => ft,
498                Err(_) => continue,
499            };
500            if !file_type.is_dir() {
501                continue;
502            }
503
504            let path = entry.path();
505
506            // Check lockfile: if locked, the dir is in active use -- skip it.
507            let lock_path = path.join(SPILL_LOCK_NAME);
508            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
509                use fs2::FileExt;
510                if lock_file.try_lock_exclusive().is_err() {
511                    // Lock held by another process -- dir is active.
512                    debug!("Skipping active spill dir: {}", path.display());
513                    continue;
514                }
515                // We acquired the lock, so no one else holds it.
516                // Drop it before deleting.
517                drop(lock_file);
518            }
519
520            info!("Cleaning up stale spill dir: {}", path.display());
521            if let Err(e) = std::fs::remove_dir_all(&path) {
522                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
523            }
524        }
525    }
526
527    /// Run stale spill cleanup. Call at client startup or periodically.
528    #[allow(dead_code)]
529    pub(crate) fn run_cleanup() {
530        if let Ok(root) = Self::spill_root() {
531            Self::cleanup_stale(&root);
532        }
533    }
534
535    /// Write one encrypted chunk to disk and record its address.
536    ///
537    /// Deduplicates by content address: if the same chunk was already
538    /// spilled, the write and accounting are skipped. This prevents
539    /// double-uploads and inflated quoting metrics.
540    fn push(&mut self, content: &[u8]) -> Result<()> {
541        let address = compute_address(content);
542        if !self.seen.insert(address) {
543            return Ok(());
544        }
545        let path = self.dir.join(hex::encode(address));
546        std::fs::write(&path, content)?;
547        let content_len = content.len() as u64;
548        self.sizes.insert(address, content_len);
549        self.total_bytes += content_len;
550        self.addresses.push(address);
551        Ok(())
552    }
553
554    /// Number of chunks stored.
555    fn len(&self) -> usize {
556        self.addresses.len()
557    }
558
559    /// Total bytes of all spilled chunks.
560    fn total_bytes(&self) -> u64 {
561        self.total_bytes
562    }
563
564    /// Address and byte-size pairs for all spilled chunks.
565    fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
566        self.addresses
567            .iter()
568            .map(|address| {
569                self.sizes
570                    .get(address)
571                    .copied()
572                    .map(|size| (*address, size))
573                    .ok_or_else(|| {
574                        Error::Storage(format!(
575                            "missing size for spilled chunk {}",
576                            hex::encode(address)
577                        ))
578                    })
579            })
580            .collect()
581    }
582
583    /// Read a single chunk back from disk by address.
584    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
585        let path = self.dir.join(hex::encode(address));
586        let data = std::fs::read(&path).map_err(|e| {
587            Error::Io(std::io::Error::new(
588                e.kind(),
589                format!("reading spilled chunk {}: {e}", hex::encode(address)),
590            ))
591        })?;
592        Ok(Bytes::from(data))
593    }
594
595    /// Read a wave of chunks from disk.
596    fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
597        let mut out = Vec::with_capacity(wave_addrs.len());
598        for addr in wave_addrs {
599            let content = self.read_chunk(addr)?;
600            out.push((content, *addr));
601        }
602        Ok(out)
603    }
604
605    /// Clean up the spill directory.
606    fn cleanup(&self) {
607        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
608            warn!(
609                "Failed to clean up chunk spill dir {}: {e}",
610                self.dir.display()
611            );
612        }
613    }
614}
615
616impl Drop for ChunkSpill {
617    fn drop(&mut self) {
618        self.cleanup();
619    }
620}
621
622fn cached_merkle_covers_addresses(
623    cached: &MerkleBatchPaymentResult,
624    addresses: &[[u8; 32]],
625) -> bool {
626    addresses
627        .iter()
628        .all(|addr| cached.proofs.contains_key(addr))
629}
630
631/// Split `addresses` into `(to_store, missing_proof)`: those that have a merkle
632/// proof in `proofs`, and those that don't.
633///
634/// A partial [`MerkleBatchPaymentResult`] (from a `pay_for_merkle_multi_batch`
635/// where a later sub-batch's payment failed) carries proofs only for the
636/// already-paid sub-batches, so unpaid chunks reach the upload path with no
637/// proof. `upload_waves_merkle` reports those as failed via
638/// [`Error::PartialUpload`] rather than aborting the whole file. Order within
639/// each group follows `addresses`.
640fn partition_addresses_by_proof(
641    addresses: &[[u8; 32]],
642    proofs: &HashMap<[u8; 32], Vec<u8>>,
643) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
644    addresses
645        .iter()
646        .copied()
647        .partition(|addr| proofs.contains_key(addr))
648}
649
650/// Build a `PartialUpload` after a fatal merkle store error, with accurate
651/// counts.
652///
653/// A fatal abort can leave chunks in three states: confirmed stored (in
654/// `stored_addresses`), known-failed (in `known_failed` — missing proofs, the
655/// quorum shortfalls and the fatal chunk seen so far), and "in flight when the
656/// abort hit" (neither). Rather than trust the helpers to enumerate the last
657/// group, this derives the failed set authoritatively as *every* `addresses`
658/// entry not in `stored_addresses`, preferring a known per-chunk message and
659/// falling back to the fatal `reason`. That guarantees
660/// `stored_count + failed_count` accounts for the whole file — fixing the
661/// under-reporting where a fatal wave could surface `failed_count = 0` and omit
662/// same-pass successes.
663fn partial_upload_after_fatal(
664    addresses: &[[u8; 32]],
665    stored_addresses: Vec<[u8; 32]>,
666    stored_count: usize,
667    total_chunks: usize,
668    known_failed: Vec<([u8; 32], String)>,
669    spend: PartialUploadSpend,
670    reason: String,
671) -> Error {
672    let stored_set: HashSet<[u8; 32]> = stored_addresses.iter().copied().collect();
673    let mut failed_map: HashMap<[u8; 32], String> = HashMap::new();
674    for (addr, msg) in known_failed {
675        if !stored_set.contains(&addr) {
676            failed_map.entry(addr).or_insert(msg);
677        }
678    }
679    for addr in addresses {
680        if !stored_set.contains(addr) {
681            failed_map.entry(*addr).or_insert_with(|| reason.clone());
682        }
683    }
684    let failed: Vec<([u8; 32], String)> = failed_map.into_iter().collect();
685    let failed_count = failed.len();
686    Error::PartialUpload {
687        stored: stored_addresses,
688        stored_count,
689        failed,
690        failed_count,
691        total_chunks,
692        spend: Box::new(spend),
693        reason,
694    }
695}
696
697/// One wave's contribution to a single-node upload, distilled from its
698/// `batch_upload_chunks_with_events` result.
699#[derive(Debug)]
700struct SingleWaveOutcome {
701    /// Addresses confirmed stored in this wave.
702    stored: Vec<[u8; 32]>,
703    /// Chunks that failed after retries in this wave.
704    failed: Vec<([u8; 32], String)>,
705    /// Storage cost paid on-chain for this wave, in atto-tokens.
706    storage_atto: Amount,
707    /// Gas paid on-chain for this wave, in wei.
708    gas_wei: u128,
709    /// Per-wave store/retry statistics. Empty for a quorum-short wave, whose
710    /// `PartialUpload` carries no stats.
711    stats: WaveAggregateStats,
712}
713
714/// Fold one wave's batch-upload result for the single-node path.
715///
716/// A `PartialUpload` (chunks short of quorum after retries) is **recoverable**:
717/// its stored/failed chunks and on-chain spend are returned so the caller
718/// records them and continues to the next wave, making the file make maximum
719/// progress exactly like `upload_waves_merkle`. Every other error is **fatal**
720/// (wallet/payment-infrastructure failures, missing proofs, spill reads) and is
721/// returned via `Err` to abort the file. Because `UPLOAD_WAVE_SIZE ==
722/// PAYMENT_WAVE_SIZE`, each batch call is exactly one payment wave, so folding a
723/// `PartialUpload` leaves nothing un-attempted within the wave.
724fn fold_single_wave(
725    result: Result<(Vec<[u8; 32]>, String, u128, WaveAggregateStats)>,
726) -> Result<SingleWaveOutcome> {
727    match result {
728        Ok((stored, storage, gas, stats)) => Ok(SingleWaveOutcome {
729            stored,
730            failed: Vec::new(),
731            storage_atto: storage.parse().unwrap_or(Amount::ZERO),
732            gas_wei: gas,
733            stats,
734        }),
735        Err(Error::PartialUpload {
736            stored,
737            failed,
738            spend,
739            ..
740        }) => Ok(SingleWaveOutcome {
741            stored,
742            failed,
743            storage_atto: spend.storage_cost_atto.parse().unwrap_or(Amount::ZERO),
744            gas_wei: spend.gas_cost_wei,
745            stats: WaveAggregateStats::default(),
746        }),
747        Err(e) => Err(e),
748    }
749}
750
751/// Check that the spill directory has enough free space for the spilled chunks.
752///
753/// `file_size` is the source file's byte count. We require
754/// `file_size + 10%` free space to account for self-encryption overhead.
755fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
756    let spill_root = ChunkSpill::spill_root()?;
757
758    // Ensure the root exists so fs2 can query it.
759    std::fs::create_dir_all(&spill_root)?;
760
761    let available = fs2::available_space(&spill_root).map_err(|e| {
762        Error::Io(std::io::Error::new(
763            e.kind(),
764            format!(
765                "failed to query disk space on {}: {e}",
766                spill_root.display()
767            ),
768        ))
769    })?;
770
771    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
772    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
773    let required = file_size.saturating_add(headroom);
774
775    if available < required {
776        let avail_mb = available / (1024 * 1024);
777        let req_mb = required / (1024 * 1024);
778        return Err(Error::InsufficientDiskSpace(format!(
779            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
780            spill_root.display()
781        )));
782    }
783
784    debug!(
785        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
786        spill_root.display()
787    );
788    Ok(())
789}
790
791fn usable_memory_bytes() -> Option<u64> {
792    let mut system = sysinfo::System::new();
793    system.refresh_memory();
794
795    let available_memory = system.available_memory();
796    let free_memory = system.free_memory();
797    let used_memory = system.used_memory();
798    let total_memory = system.total_memory();
799    let unused_memory = total_memory.saturating_sub(used_memory);
800
801    let mut usable = [available_memory, free_memory, unused_memory]
802        .into_iter()
803        .filter(|bytes| *bytes > 0)
804        .max();
805
806    let cgroup_free_memory = system
807        .cgroup_limits()
808        .filter(|limits| limits.total_memory > 0)
809        .map(|limits| limits.free_memory);
810    if let Some(cgroup_free_memory) = cgroup_free_memory {
811        usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
812    }
813
814    debug!(
815        available_memory,
816        free_memory,
817        used_memory,
818        total_memory,
819        cgroup_free_memory,
820        usable_memory = ?usable,
821        "Detected usable memory for stream decrypt batch sizing"
822    );
823
824    usable
825}
826
827fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
828    let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
829    let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
830        .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
831        .max(1);
832    let cap = (budget / estimated_bytes_per_chunk).max(1);
833
834    usize::try_from(cap).unwrap_or(usize::MAX)
835}
836
837fn adaptive_stream_decrypt_batch_size(
838    total_chunks: usize,
839    fetch_cap: usize,
840    configured_batch_floor: usize,
841    usable_memory_bytes: Option<u64>,
842) -> usize {
843    let fetch_target = fetch_cap
844        .max(1)
845        .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
846    let requested = match usable_memory_bytes {
847        Some(bytes) => {
848            let memory_cap = stream_decrypt_batch_memory_cap(bytes);
849            configured_batch_floor
850                .max(fetch_target)
851                .max(1)
852                .min(memory_cap)
853        }
854        None => configured_batch_floor.max(1),
855    };
856
857    requested.min(total_chunks.max(1)).max(1)
858}
859
860/// Whether the data map is published to the network for address-based retrieval.
861///
862/// A private upload stores only the data chunks and returns the `DataMap` to
863/// the caller — only someone holding that `DataMap` can reconstruct the file.
864/// A public upload additionally stores the serialized `DataMap` as a chunk on
865/// the network, yielding a single chunk address that anyone can use to
866/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
867#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
868pub enum Visibility {
869    /// Keep the data map local; only the holder can retrieve the file.
870    #[default]
871    Private,
872    /// Publish the data map as a network chunk so anyone with the returned
873    /// address can retrieve and decrypt the file.
874    Public,
875}
876
877/// Confidence attached to an [`UploadCostEstimate`]'s `storage_cost_atto`.
878///
879/// `estimate_upload_cost` prices a file by sampling a few of its chunk
880/// addresses and extrapolating. When every sampled chunk is already stored
881/// there is no live price to extrapolate from, so a `"0"` cost can mean either
882/// "provably free" (the whole file was sampled) or only "probably free" (the
883/// tail was unsampled). This lets callers tell those apart instead of treating
884/// every `"0"` as unconditionally free.
885#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
886#[serde(rename_all = "snake_case")]
887pub enum CostEstimateConfidence {
888    /// At least one sampled chunk returned a live quote; `storage_cost_atto`
889    /// is extrapolated from a real per-chunk price. The normal case.
890    #[default]
891    PricedSample,
892    /// Every chunk in the file was sampled and every one was already stored.
893    /// `storage_cost_atto` is exactly `"0"` — the upload is genuinely free.
894    VerifiedAllAlreadyStored,
895    /// Every *sampled* chunk was already stored, but not all chunks were
896    /// sampled. `storage_cost_atto` is `"0"` as a best-effort guess; the real
897    /// upload reconciles the true cost at payment time. Render this as "likely
898    /// already stored", not a guaranteed-free price.
899    AllSamplesAlreadyStoredIncomplete,
900}
901
902/// Estimated cost of uploading a file, returned by
903/// [`Client::estimate_upload_cost`].
904///
905/// Marked `#[non_exhaustive]` so adding a field later is not a breaking change
906/// for downstream consumers that construct or pattern-match on this struct.
907#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
908#[non_exhaustive]
909pub struct UploadCostEstimate {
910    /// Original file size in bytes.
911    pub file_size: u64,
912    /// Number of chunks the file would be split into (data chunks only,
913    /// does not include the DataMap chunk added during public uploads).
914    pub chunk_count: usize,
915    /// Estimated total storage cost in atto (token smallest unit).
916    pub storage_cost_atto: String,
917    /// Estimated gas cost in wei as a string. This is a rough heuristic
918    /// based on chunk count and payment mode, NOT a live gas price query.
919    pub estimated_gas_cost_wei: String,
920    /// Payment mode that would be used.
921    pub payment_mode: PaymentMode,
922    /// How much to trust `storage_cost_atto`. See [`CostEstimateConfidence`].
923    #[serde(default)]
924    pub confidence: CostEstimateConfidence,
925}
926
927/// Result of a file upload: the `DataMap` needed to retrieve the file.
928///
929/// Marked `#[non_exhaustive]` so adding a new field in future is not a
930/// breaking change for downstream consumers that construct or pattern-match
931/// on this struct.
932#[derive(Debug, Clone)]
933#[non_exhaustive]
934pub struct FileUploadResult {
935    /// The data map containing chunk metadata for reconstruction.
936    pub data_map: DataMap,
937    /// Number of chunks stored on the network.
938    pub chunks_stored: usize,
939    /// Number of chunks that failed to store. Always 0 for a successful
940    /// upload — partial-failure information is conveyed via
941    /// [`crate::data::Error::PartialUpload`] instead.
942    pub chunks_failed: usize,
943    /// Total number of chunks in the upload, including chunks that were
944    /// already stored and skipped. On full success this equals `chunks_stored`.
945    pub total_chunks: usize,
946    /// Which payment mode was actually used (not just requested).
947    pub payment_mode_used: PaymentMode,
948    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
949    pub storage_cost_atto: String,
950    /// Total gas cost in wei. 0 if no on-chain transactions were made.
951    pub gas_cost_wei: u128,
952    /// Chunk address of the serialized `DataMap`, set only for
953    /// [`Visibility::Public`] uploads. **`Some` means this address is
954    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
955    /// necessarily that *this* upload paid to store it — if the serialized
956    /// `DataMap` hashed to a chunk that was already on the network (same
957    /// file uploaded before; deterministic via self-encryption), the address
958    /// is still returned but no storage payment was made for it.
959    pub data_map_address: Option<[u8; 32]>,
960    /// Sum of chunk-store RPC attempts across the upload
961    /// (`>= chunks_stored` on full success; more if any chunk retried).
962    /// `0` for paths that don't run the wave store loop.
963    pub chunk_attempts_total: usize,
964    /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
965    /// success, empty for paths that don't run the wave store loop).
966    pub store_durations_ms: Vec<u64>,
967    /// Count of stored chunks that succeeded on each retry round
968    /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
969    /// paths that don't run the wave store loop.
970    pub retries_histogram: [usize; 4],
971}
972
973/// Payment information for external signing — either wave-batch or merkle.
974#[derive(Debug)]
975pub enum ExternalPaymentInfo {
976    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
977    WaveBatch {
978        /// Chunks ready for payment (needed for finalize).
979        prepared_chunks: Vec<PreparedChunk>,
980        /// Payment intent for external signing.
981        payment_intent: PaymentIntent,
982    },
983    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
984    Merkle {
985        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
986        prepared_batch: PreparedMerkleBatch,
987        /// Raw chunk contents that still need upload after the preflight check.
988        chunk_contents: Vec<Bytes>,
989        /// Chunk addresses that still need upload after the preflight check.
990        chunk_addresses: Vec<[u8; 32]>,
991    },
992}
993
994/// Prepared upload ready for external payment.
995///
996/// Contains everything needed to construct the on-chain payment transaction
997/// externally (e.g. via WalletConnect in a desktop app) and then finalize
998/// the upload without a Rust-side wallet.
999///
1000/// Note: This struct stays in Rust memory — only the public fields of
1001/// `payment_info` are sent to the frontend. `PreparedChunk` contains
1002/// non-serializable network types, so the full struct cannot derive `Serialize`.
1003///
1004/// Marked `#[non_exhaustive]` so adding a new field in future is not a
1005/// breaking change for downstream consumers.
1006#[derive(Debug)]
1007#[non_exhaustive]
1008pub struct PreparedUpload {
1009    /// The data map for later retrieval.
1010    pub data_map: DataMap,
1011    /// Payment information for chunks that still need payment after the
1012    /// already-stored preflight. This may be wave-batch even when the original
1013    /// chunk count was merkle-eligible if the remaining count is below the
1014    /// merkle threshold.
1015    pub payment_info: ExternalPaymentInfo,
1016    /// Chunk address of the serialized `DataMap` when this upload was
1017    /// prepared with [`Visibility::Public`]. `Some` means the address is
1018    /// retrievable on the network after finalization — either because this
1019    /// upload paid to store the chunk in `payment_info`, or because the
1020    /// chunk was already on the network (deterministic self-encryption).
1021    /// Carried through to [`FileUploadResult::data_map_address`].
1022    pub data_map_address: Option<[u8; 32]>,
1023    /// Chunk addresses already present on the network when this upload was
1024    /// prepared. These do not require payment or PUT during finalization.
1025    pub already_stored_addresses: Vec<[u8; 32]>,
1026    /// Total chunk count for the upload, including already-stored chunks.
1027    pub total_chunks: usize,
1028}
1029
1030/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
1031type EncryptionChannels = (
1032    tokio::sync::mpsc::Receiver<Bytes>,
1033    tokio::sync::oneshot::Receiver<DataMap>,
1034    tokio::task::JoinHandle<Result<()>>,
1035);
1036
1037/// Spawn a blocking task that streams file encryption through a channel.
1038fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
1039    let metadata = std::fs::metadata(&path)?;
1040    let data_size = usize::try_from(metadata.len())
1041        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
1042
1043    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
1044    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
1045
1046    let handle = tokio::task::spawn_blocking(move || {
1047        let file = std::fs::File::open(&path)?;
1048        let mut reader = std::io::BufReader::new(file);
1049
1050        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
1051        let read_error_clone = Arc::clone(&read_error);
1052
1053        let data_iter = std::iter::from_fn(move || {
1054            let mut buffer = vec![0u8; 8192];
1055            match std::io::Read::read(&mut reader, &mut buffer) {
1056                Ok(0) => None,
1057                Ok(n) => {
1058                    buffer.truncate(n);
1059                    Some(Bytes::from(buffer))
1060                }
1061                Err(e) => {
1062                    let mut guard = read_error_clone
1063                        .lock()
1064                        .unwrap_or_else(|poisoned| poisoned.into_inner());
1065                    *guard = Some(e);
1066                    None
1067                }
1068            }
1069        });
1070
1071        let mut stream = stream_encrypt(data_size, data_iter)
1072            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
1073
1074        for chunk_result in stream.chunks() {
1075            // Check for captured read errors immediately after each chunk.
1076            // stream_encrypt sees None (EOF) when a read fails, so it stops
1077            // producing chunks. We must detect this before sending the
1078            // partial results to avoid uploading a truncated DataMap.
1079            {
1080                let guard = read_error
1081                    .lock()
1082                    .unwrap_or_else(|poisoned| poisoned.into_inner());
1083                if let Some(ref e) = *guard {
1084                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
1085                }
1086            }
1087
1088            let (_hash, content) = chunk_result
1089                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
1090            if chunk_tx.blocking_send(content).is_err() {
1091                return Err(Error::Encryption("upload receiver dropped".to_string()));
1092            }
1093        }
1094
1095        // Final check: read error after last chunk (stream saw EOF).
1096        {
1097            let guard = read_error
1098                .lock()
1099                .unwrap_or_else(|poisoned| poisoned.into_inner());
1100            if let Some(ref e) = *guard {
1101                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
1102            }
1103        }
1104
1105        let datamap = stream
1106            .into_datamap()
1107            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
1108        if datamap_tx.send(datamap).is_err() {
1109            warn!("DataMap receiver dropped — upload may have been cancelled");
1110        }
1111        Ok(())
1112    });
1113
1114    Ok((chunk_rx, datamap_rx, handle))
1115}
1116
1117/// RAII guard for the staging temp file used during a disk download.
1118///
1119/// Removes the file on drop — including a panic unwind out of the
1120/// `block_in_place` decrypt loop — unless [`commit`](Self::commit) has
1121/// promoted it to its final path. Centralizes the cleanup the explicit error
1122/// arms used to repeat.
1123struct TempDownload {
1124    /// `Some` while the staging file may need cleanup; `None` once committed.
1125    path: Option<PathBuf>,
1126}
1127
1128impl TempDownload {
1129    fn new(path: PathBuf) -> Self {
1130        Self { path: Some(path) }
1131    }
1132
1133    /// Path of the staging file (valid until `commit`).
1134    fn path(&self) -> &Path {
1135        self.path
1136            .as_deref()
1137            .expect("TempDownload::path called after commit")
1138    }
1139
1140    /// Rename the staged file to `dest`. On success the guard is defused so
1141    /// `Drop` is a no-op; on failure the guard stays armed and `Drop` removes
1142    /// the orphaned temp file.
1143    fn commit(mut self, dest: &Path) -> std::io::Result<()> {
1144        std::fs::rename(self.path(), dest)?; // err → guard armed → Drop cleans up
1145        self.path = None; // success → nothing left to clean
1146        Ok(())
1147    }
1148}
1149
1150impl Drop for TempDownload {
1151    fn drop(&mut self) {
1152        if let Some(path) = self.path.take() {
1153            if let Err(e) = std::fs::remove_file(&path) {
1154                // Absent file is fine (never created / already gone).
1155                if e.kind() != std::io::ErrorKind::NotFound {
1156                    warn!(
1157                        "Failed to remove temp download file {}: {e}",
1158                        path.display()
1159                    );
1160                }
1161            }
1162        }
1163    }
1164}
1165
1166impl Client {
1167    /// Upload a file to the network using streaming self-encryption.
1168    ///
1169    /// Automatically selects merkle batch payment for files that produce
1170    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
1171    /// directory so peak memory stays at ~256 MB regardless of file size.
1172    ///
1173    /// # Errors
1174    ///
1175    /// Returns an error if the file cannot be read, encryption fails,
1176    /// or any chunk cannot be stored.
1177    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
1178        self.file_upload_with_mode(path, PaymentMode::Auto).await
1179    }
1180
1181    /// Estimate the cost of uploading a file without actually uploading.
1182    ///
1183    /// Encrypts the file to determine chunk count and sizes, then requests
1184    /// a single quote from the network for a representative chunk. The
1185    /// per-chunk price is extrapolated to the total chunk count.
1186    ///
1187    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
1188    /// chunks are cleaned up automatically when the function returns.
1189    ///
1190    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
1191    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
1192    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
1193    /// varies with network conditions.
1194    ///
1195    /// Sampled chunk addresses are spread across the whole file (not the first
1196    /// N) so a shared leading prefix doesn't bias the sample. When a sample
1197    /// returns a live quote the per-chunk price is extrapolated and the result
1198    /// is tagged [`CostEstimateConfidence::PricedSample`].
1199    ///
1200    /// When every sampled chunk is already stored the result is still `Ok`
1201    /// with `storage_cost_atto: "0"`, tagged either
1202    /// [`CostEstimateConfidence::VerifiedAllAlreadyStored`] when the whole file
1203    /// was sampled (exactly free) or
1204    /// [`CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete`] when the
1205    /// tail was unsampled (a best-effort guess that payment reconciles).
1206    ///
1207    /// # Errors
1208    ///
1209    /// Returns an error if the file cannot be read, encryption fails, or the
1210    /// network cannot provide a quote.
1211    pub async fn estimate_upload_cost(
1212        &self,
1213        path: &Path,
1214        mode: PaymentMode,
1215        progress: Option<mpsc::Sender<UploadEvent>>,
1216    ) -> Result<UploadCostEstimate> {
1217        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
1218
1219        if file_size < 3 {
1220            return Err(Error::InvalidData(
1221                "File too small: self-encryption requires at least 3 bytes".into(),
1222            ));
1223        }
1224
1225        check_disk_space_for_spill(file_size)?;
1226
1227        info!(
1228            "Estimating upload cost for {} ({file_size} bytes)",
1229            path.display()
1230        );
1231
1232        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1233        let chunk_count = spill.len();
1234
1235        if let Some(ref tx) = progress {
1236            let _ = tx
1237                .send(UploadEvent::Encrypted {
1238                    total_chunks: chunk_count,
1239                })
1240                .await;
1241        }
1242
1243        info!("Encrypted into {chunk_count} chunks, requesting quote");
1244        let uses_merkle = should_use_merkle(chunk_count, mode);
1245
1246        // Sample chunk addresses spread evenly across the file (see
1247        // `distributed_sample_indices`) rather than the first N. A single
1248        // AlreadyStored result says nothing about the rest of the file, and a
1249        // positional sample lands on a shared leading prefix in the worst case,
1250        // so we spread the probe and only treat the whole file as "fully
1251        // stored" when every sample comes back stored.
1252        let sample_indices = distributed_sample_indices(spill.addresses.len(), ESTIMATE_SAMPLE_CAP);
1253        let mut sampled = 0usize;
1254        let mut all_already_stored = true;
1255        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
1256
1257        for &idx in &sample_indices {
1258            let addr = &spill.addresses[idx];
1259            sampled += 1;
1260            let chunk_bytes = spill.read_chunk(addr)?;
1261            let data_size = u64::try_from(chunk_bytes.len())
1262                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1263            let result = if uses_merkle {
1264                self.get_store_quotes_with_fault_tolerance(addr, data_size, DATA_TYPE_CHUNK)
1265                    .await
1266            } else {
1267                self.get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
1268                    .await
1269            };
1270            match result {
1271                Ok(q) => {
1272                    quotes_opt = Some(q);
1273                    all_already_stored = false;
1274                    break;
1275                }
1276                Err(Error::AlreadyStored) => {
1277                    debug!(
1278                        "Sample chunk {} already stored; trying next address ({sampled}/{})",
1279                        hex::encode(addr),
1280                        sample_indices.len()
1281                    );
1282                    continue;
1283                }
1284                Err(e) => return Err(e),
1285            }
1286        }
1287
1288        let quotes = match quotes_opt {
1289            Some(q) => q,
1290            None if all_already_stored && sampled == chunk_count => {
1291                // Every address in the file was sampled and every one is
1292                // already on the network — a zero-cost estimate is exact here.
1293                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
1294                return Ok(UploadCostEstimate {
1295                    file_size,
1296                    chunk_count,
1297                    storage_cost_atto: "0".into(),
1298                    estimated_gas_cost_wei: "0".into(),
1299                    payment_mode: if uses_merkle {
1300                        PaymentMode::Merkle
1301                    } else {
1302                        PaymentMode::Single
1303                    },
1304                    confidence: CostEstimateConfidence::VerifiedAllAlreadyStored,
1305                });
1306            }
1307            None => {
1308                // Every sampled chunk was already stored but the tail was not
1309                // sampled, so there is no live price to extrapolate. The
1310                // estimate is display-only and payment reconciles the true
1311                // cost, so return an optimistic zero flagged as incomplete
1312                // rather than erroring — callers still get a value to show.
1313                info!(
1314                    "All {sampled}/{chunk_count} sampled chunks already stored; \
1315                     returning incomplete zero-cost estimate"
1316                );
1317                return Ok(UploadCostEstimate {
1318                    file_size,
1319                    chunk_count,
1320                    storage_cost_atto: "0".into(),
1321                    estimated_gas_cost_wei: "0".into(),
1322                    payment_mode: if uses_merkle {
1323                        PaymentMode::Merkle
1324                    } else {
1325                        PaymentMode::Single
1326                    },
1327                    confidence: CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete,
1328                });
1329            }
1330        };
1331
1332        // Use the median price × 3 (matches SingleNodePayment::from_quotes
1333        // which pays 3x the median to incentivize reliable storage).
1334        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
1335        prices.sort();
1336        let median_price = prices
1337            .get(prices.len() / 2)
1338            .copied()
1339            .unwrap_or(Amount::ZERO);
1340        let per_chunk_cost = median_price * Amount::from(3u64);
1341
1342        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
1343        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
1344
1345        // Estimate gas cost from realistic per-transaction budgets rather
1346        // than a flat per-chunk or per-wave number.
1347        //
1348        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
1349        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
1350        //   The dominant cost is one SSTORE per entry plus base tx overhead,
1351        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
1352        //   on a full wave and multiply by the number of waves. The previous
1353        //   per-wave figure of 150k was closer to a single-entry transfer
1354        //   and understated cost by 5–10x for full waves.
1355        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
1356        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
1357        //
1358        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
1359        // Arbitrum baseline). Treat the result as advisory, not a commitment.
1360        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
1361        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
1362        let estimated_gas: u128 = if uses_merkle {
1363            merkle_batches
1364                .saturating_mul(GAS_PER_MERKLE_TX)
1365                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1366        } else {
1367            waves
1368                .saturating_mul(GAS_PER_WAVE_TX)
1369                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1370        };
1371
1372        info!(
1373            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
1374        );
1375
1376        Ok(UploadCostEstimate {
1377            file_size,
1378            chunk_count,
1379            storage_cost_atto: total_storage.to_string(),
1380            estimated_gas_cost_wei: estimated_gas.to_string(),
1381            payment_mode: if uses_merkle {
1382                PaymentMode::Merkle
1383            } else {
1384                PaymentMode::Single
1385            },
1386            confidence: CostEstimateConfidence::PricedSample,
1387        })
1388    }
1389
1390    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
1391    ///
1392    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
1393    /// [`Visibility::Private`] — see that method for details.
1394    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
1395        self.file_prepare_upload_with_progress(path, Visibility::Private, None)
1396            .await
1397    }
1398
1399    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
1400    ///
1401    /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
1402    /// `progress: None` — see that method for details.
1403    pub async fn file_prepare_upload_with_visibility(
1404        &self,
1405        path: &Path,
1406        visibility: Visibility,
1407    ) -> Result<PreparedUpload> {
1408        self.file_prepare_upload_with_progress(path, visibility, None)
1409            .await
1410    }
1411
1412    /// Phase 1 of external-signer upload with progress events.
1413    ///
1414    /// Requires an EVM network (for contract price queries) but NOT a wallet.
1415    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
1416    /// and a [`PaymentIntent`] that the external signer uses to construct
1417    /// and submit the on-chain payment transaction.
1418    ///
1419    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
1420    /// is bundled into the payment batch as an additional chunk and its
1421    /// address is recorded on the returned [`PreparedUpload`]. After
1422    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
1423    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
1424    /// can share a single address from which anyone can retrieve the file.
1425    ///
1426    /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
1427    /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
1428    /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
1429    /// emitted later by [`Client::finalize_upload_with_progress`] /
1430    /// [`Client::finalize_upload_merkle_with_progress`].
1431    ///
1432    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1433    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1434    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1435    /// inherent to the two-phase external-signer protocol — the chunks must
1436    /// stay in memory until [`Client::finalize_upload`] stores them. For very
1437    /// large files, prefer [`Client::file_upload`] which streams directly.
1438    ///
1439    /// # Errors
1440    ///
1441    /// Returns an error if there is insufficient disk space, the file cannot
1442    /// be read, encryption fails, or quote collection fails.
1443    pub async fn file_prepare_upload_with_progress(
1444        &self,
1445        path: &Path,
1446        visibility: Visibility,
1447        progress: Option<mpsc::Sender<UploadEvent>>,
1448    ) -> Result<PreparedUpload> {
1449        debug!(
1450            "Preparing file upload for external signing (visibility={visibility:?}): {}",
1451            path.display()
1452        );
1453
1454        let file_size = std::fs::metadata(path)?.len();
1455        check_disk_space_for_spill(file_size)?;
1456
1457        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1458
1459        info!(
1460            "Encrypted {} into {} chunks for external signing (spilled to disk)",
1461            path.display(),
1462            spill.len()
1463        );
1464
1465        // Read each chunk from disk and collect quotes concurrently.
1466        // Note: all PreparedChunks accumulate in memory because the external-signer
1467        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1468        let mut chunk_data: Vec<Bytes> = spill
1469            .addresses
1470            .iter()
1471            .map(|addr| spill.read_chunk(addr))
1472            .collect::<std::result::Result<Vec<_>, _>>()?;
1473
1474        // For public uploads, bundle the serialized DataMap as an extra chunk
1475        // in the same payment batch. This lets the external signer pay for
1476        // the data chunks and the DataMap chunk in one flow, and lets the
1477        // finalize step return the DataMap's chunk address as the shareable
1478        // retrieval address.
1479        let data_map_address = match visibility {
1480            Visibility::Private => None,
1481            Visibility::Public => {
1482                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1483                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1484                })?;
1485                let bytes = Bytes::from(serialized);
1486                let address = compute_address(&bytes);
1487                info!(
1488                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1489                    bytes.len(),
1490                    hex::encode(address)
1491                );
1492                chunk_data.push(bytes);
1493                Some(address)
1494            }
1495        };
1496
1497        let chunk_count = chunk_data.len();
1498
1499        if let Some(ref tx) = progress {
1500            let _ = tx
1501                .send(UploadEvent::Encrypted {
1502                    total_chunks: chunk_count,
1503                })
1504                .await;
1505        }
1506
1507        let (payment_info, already_stored_addresses) = if should_use_merkle(
1508            chunk_count,
1509            PaymentMode::Auto,
1510        ) {
1511            // Merkle path: build tree, collect candidate pools, return for external payment.
1512            info!("Using merkle batch preparation for {chunk_count} file chunks");
1513
1514            let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1515                .iter()
1516                .map(|chunk| {
1517                    let size = u64::try_from(chunk.len())
1518                        .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1519                    Ok((compute_address(chunk), size))
1520                })
1521                .collect::<Result<Vec<_>>>()?;
1522
1523            let merkle_plan = self
1524                .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1525                .await?;
1526
1527            if merkle_plan.to_upload.is_empty() {
1528                info!("All {chunk_count} file chunks already stored; no external payment needed");
1529                (
1530                    ExternalPaymentInfo::WaveBatch {
1531                        prepared_chunks: Vec::new(),
1532                        payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1533                    },
1534                    merkle_plan.already_stored,
1535                )
1536            } else {
1537                let chunk_data =
1538                    chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1539
1540                if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1541                    info!(
1542                        "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1543                        merkle_plan.to_upload.len()
1544                    );
1545                    let (payment_info, mut wave_already_stored) = self
1546                        .prepare_wave_batch_external_chunks(
1547                            chunk_data,
1548                            progress.as_ref(),
1549                            chunk_count,
1550                        )
1551                        .await?;
1552                    let mut already_stored = merkle_plan.already_stored;
1553                    already_stored.append(&mut wave_already_stored);
1554                    (payment_info, already_stored)
1555                } else {
1556                    match self
1557                        .prepare_merkle_batch_external(
1558                            &merkle_plan.to_upload,
1559                            DATA_TYPE_CHUNK,
1560                            merkle_plan.to_upload_avg_size(),
1561                        )
1562                        .await
1563                    {
1564                        Ok(prepared_batch) => {
1565                            info!(
1566                                "File prepared for external merkle signing: {} chunks, depth={} ({})",
1567                                merkle_plan.to_upload.len(),
1568                                prepared_batch.depth,
1569                                path.display()
1570                            );
1571
1572                            (
1573                                ExternalPaymentInfo::Merkle {
1574                                    prepared_batch,
1575                                    chunk_contents: chunk_data,
1576                                    chunk_addresses: merkle_plan.to_upload,
1577                                },
1578                                merkle_plan.already_stored,
1579                            )
1580                        }
1581                        Err(Error::InsufficientPeers(ref msg)) => {
1582                            info!(
1583                                "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1584                            );
1585                            let (payment_info, mut wave_already_stored) = self
1586                                .prepare_wave_batch_external_chunks(
1587                                    chunk_data,
1588                                    progress.as_ref(),
1589                                    chunk_count,
1590                                )
1591                                .await?;
1592                            let mut already_stored = merkle_plan.already_stored;
1593                            already_stored.append(&mut wave_already_stored);
1594                            (payment_info, already_stored)
1595                        }
1596                        Err(e) => return Err(e),
1597                    }
1598                }
1599            }
1600        } else {
1601            self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1602                .await?
1603        };
1604
1605        // Surface the "DataMap chunk was already on the network" case
1606        // so debugging "why is data_map_address set but no storage cost
1607        // appears for it?" doesn't require reading the source. See the
1608        // `data_map_address` doc comment for why this is still a valid
1609        // `Some(addr)` outcome.
1610        if let Some(addr) = data_map_address {
1611            let data_map_needs_payment = match &payment_info {
1612                ExternalPaymentInfo::WaveBatch {
1613                    prepared_chunks, ..
1614                } => prepared_chunks.iter().any(|c| c.address == addr),
1615                ExternalPaymentInfo::Merkle {
1616                    chunk_addresses, ..
1617                } => chunk_addresses.contains(&addr),
1618            };
1619            if !data_map_needs_payment {
1620                info!(
1621                    "Public upload: DataMap chunk {} was already stored \
1622                     on the network — address is retrievable without a \
1623                     new payment",
1624                    hex::encode(addr)
1625                );
1626            }
1627        }
1628
1629        Ok(PreparedUpload {
1630            data_map,
1631            payment_info,
1632            data_map_address,
1633            already_stored_addresses,
1634            total_chunks: chunk_count,
1635        })
1636    }
1637
1638    async fn prepare_wave_batch_external_chunks(
1639        &self,
1640        chunk_data: Vec<Bytes>,
1641        progress: Option<&mpsc::Sender<UploadEvent>>,
1642        progress_total: usize,
1643    ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1644        let chunk_count = chunk_data.len();
1645        let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1646            .into_iter()
1647            .map(|content| {
1648                let address = compute_address(&content);
1649                (content, address)
1650            })
1651            .collect();
1652
1653        // Wave-batch path: collect quotes per chunk concurrently, emitting
1654        // a `ChunkQuoted` event after each completion so callers can drive
1655        // a progress bar through the slow quote phase.
1656        let quote_limiter = self.controller().quote.clone();
1657        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1658        let mut quote_stream = stream::iter(chunks_with_addr)
1659            .map(|(content, address)| {
1660                let limiter = quote_limiter.clone();
1661                async move {
1662                    let result = observe_op(
1663                        &limiter,
1664                        || async move { self.prepare_chunk_payment(content).await },
1665                        classify_error,
1666                    )
1667                    .await;
1668                    (address, result)
1669                }
1670            })
1671            .buffer_unordered(quote_concurrency);
1672
1673        let mut prepared_chunks = Vec::with_capacity(chunk_count);
1674        let mut already_stored = Vec::new();
1675        let mut quoted = 0usize;
1676        while let Some((address, result)) = quote_stream.next().await {
1677            match result? {
1678                Some(prepared) => prepared_chunks.push(prepared),
1679                None => already_stored.push(address),
1680            }
1681            quoted += 1;
1682            if let Some(tx) = progress {
1683                let _ = tx.try_send(UploadEvent::ChunkQuoted {
1684                    quoted,
1685                    total: progress_total,
1686                });
1687            }
1688        }
1689
1690        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1691        info!(
1692            "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1693            prepared_chunks.len(),
1694            already_stored.len(),
1695            payment_intent.total_amount,
1696        );
1697
1698        Ok((
1699            ExternalPaymentInfo::WaveBatch {
1700                prepared_chunks,
1701                payment_intent,
1702            },
1703            already_stored,
1704        ))
1705    }
1706
1707    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1708    ///
1709    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1710    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1711    /// payment. Builds payment proofs and stores chunks on the network.
1712    ///
1713    /// # Errors
1714    ///
1715    /// Returns an error if the prepared upload used merkle payment (use
1716    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1717    /// or any chunk cannot be stored.
1718    pub async fn finalize_upload(
1719        &self,
1720        prepared: PreparedUpload,
1721        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1722    ) -> Result<FileUploadResult> {
1723        self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1724            .await
1725    }
1726
1727    /// Phase 2 of external-signer upload (wave-batch) with progress events.
1728    ///
1729    /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1730    /// on the provided channel as each chunk is successfully stored.
1731    ///
1732    /// # Errors
1733    ///
1734    /// Same as [`Client::finalize_upload`].
1735    pub async fn finalize_upload_with_progress(
1736        &self,
1737        prepared: PreparedUpload,
1738        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1739        progress: Option<mpsc::Sender<UploadEvent>>,
1740    ) -> Result<FileUploadResult> {
1741        let data_map_address = prepared.data_map_address;
1742        let already_stored_addresses = prepared.already_stored_addresses;
1743        let already_stored_count = already_stored_addresses.len();
1744        let total_chunks = prepared.total_chunks;
1745        match prepared.payment_info {
1746            ExternalPaymentInfo::WaveBatch {
1747                prepared_chunks,
1748                payment_intent,
1749            } => {
1750                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1751                let wave_result = self
1752                    .store_paid_chunks_with_events(
1753                        paid_chunks,
1754                        progress.as_ref(),
1755                        already_stored_count,
1756                        total_chunks,
1757                    )
1758                    .await;
1759                if !wave_result.failed.is_empty() {
1760                    let failed_count = wave_result.failed.len();
1761                    let stored_count = already_stored_count + wave_result.stored.len();
1762                    let mut stored = already_stored_addresses;
1763                    stored.extend(wave_result.stored);
1764                    return Err(Error::PartialUpload {
1765                        stored,
1766                        stored_count,
1767                        failed: wave_result.failed,
1768                        failed_count,
1769                        total_chunks,
1770                        // Report the storage spend known from the payment intent
1771                        // the external signer was handed. Gas is paid by the
1772                        // signer out-of-band, so it stays unknown (0).
1773                        spend: Box::new(PartialUploadSpend {
1774                            storage_cost_atto: payment_intent.total_amount.to_string(),
1775                            gas_cost_wei: 0,
1776                        }),
1777                        reason: "finalize_upload: chunk storage failed after retries".into(),
1778                    });
1779                }
1780                let chunks_stored = already_stored_count + wave_result.stored.len();
1781
1782                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1783
1784                let mut stats = WaveAggregateStats::default();
1785                stats.absorb(&wave_result);
1786
1787                Ok(FileUploadResult {
1788                    data_map: prepared.data_map,
1789                    chunks_stored,
1790                    chunks_failed: 0,
1791                    total_chunks,
1792                    payment_mode_used: PaymentMode::Single,
1793                    // Storage spend is known from the payment intent; gas is
1794                    // paid by the external signer out-of-band (unknown here).
1795                    storage_cost_atto: payment_intent.total_amount.to_string(),
1796                    gas_cost_wei: 0,
1797                    data_map_address,
1798                    chunk_attempts_total: stats.chunk_attempts_total,
1799                    store_durations_ms: stats.store_durations_ms,
1800                    retries_histogram: stats.retries_histogram,
1801                })
1802            }
1803            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1804                "Cannot finalize merkle upload with wave-batch tx hashes. \
1805                 Use finalize_upload_merkle() instead."
1806                    .to_string(),
1807            )),
1808        }
1809    }
1810
1811    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1812    ///
1813    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1814    /// returned by the on-chain merkle payment transaction. Generates proofs and
1815    /// stores chunks on the network.
1816    ///
1817    /// # Errors
1818    ///
1819    /// Returns an error if the prepared upload used wave-batch payment (use
1820    /// [`Client::finalize_upload`] instead), proof generation fails,
1821    /// or any chunk cannot be stored.
1822    pub async fn finalize_upload_merkle(
1823        &self,
1824        prepared: PreparedUpload,
1825        winner_pool_hash: [u8; 32],
1826    ) -> Result<FileUploadResult> {
1827        self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1828            .await
1829    }
1830
1831    /// Phase 2 of external-signer upload (merkle) with progress events.
1832    ///
1833    /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1834    /// on the provided channel as each chunk is successfully stored.
1835    ///
1836    /// # Errors
1837    ///
1838    /// Same as [`Client::finalize_upload_merkle`].
1839    pub async fn finalize_upload_merkle_with_progress(
1840        &self,
1841        prepared: PreparedUpload,
1842        winner_pool_hash: [u8; 32],
1843        progress: Option<mpsc::Sender<UploadEvent>>,
1844    ) -> Result<FileUploadResult> {
1845        let data_map_address = prepared.data_map_address;
1846        let already_stored_count = prepared.already_stored_addresses.len();
1847        let total_chunks = prepared.total_chunks;
1848        match prepared.payment_info {
1849            ExternalPaymentInfo::Merkle {
1850                prepared_batch,
1851                chunk_contents,
1852                chunk_addresses,
1853            } => {
1854                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1855                let outcome = self
1856                    .merkle_upload_chunks(
1857                        chunk_contents,
1858                        chunk_addresses,
1859                        &batch_result,
1860                        progress.as_ref(),
1861                        already_stored_count,
1862                        total_chunks,
1863                    )
1864                    .await?;
1865
1866                info!(
1867                    "External-signer merkle upload finalized: {} chunks stored, {} failed",
1868                    outcome.stored, outcome.failed
1869                );
1870
1871                Ok(FileUploadResult {
1872                    data_map: prepared.data_map,
1873                    chunks_stored: outcome.stored,
1874                    chunks_failed: outcome.failed,
1875                    total_chunks,
1876                    payment_mode_used: PaymentMode::Merkle,
1877                    storage_cost_atto: "0".into(),
1878                    gas_cost_wei: 0,
1879                    data_map_address,
1880                    chunk_attempts_total: outcome.stats.chunk_attempts_total,
1881                    store_durations_ms: outcome.stats.store_durations_ms,
1882                    retries_histogram: outcome.stats.retries_histogram,
1883                })
1884            }
1885            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1886                "Cannot finalize wave-batch upload with merkle winner hash. \
1887                 Use finalize_upload() instead."
1888                    .to_string(),
1889            )),
1890        }
1891    }
1892
1893    /// Upload a file with a specific payment mode.
1894    ///
1895    /// Before encryption, checks that the temp directory has enough free
1896    /// disk space for the spilled chunks (~1.1× source file size).
1897    ///
1898    /// Encrypted chunks are spilled to a temp directory during encryption
1899    /// so that only their 32-byte addresses stay in memory. At upload time,
1900    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1901    ///
1902    /// # Errors
1903    ///
1904    /// Returns an error if there is insufficient disk space, the file cannot
1905    /// be read, encryption fails, or any chunk cannot be stored.
1906    #[allow(clippy::too_many_lines)]
1907    pub async fn file_upload_with_mode(
1908        &self,
1909        path: &Path,
1910        mode: PaymentMode,
1911    ) -> Result<FileUploadResult> {
1912        self.file_upload_with_progress(path, mode, None).await
1913    }
1914
1915    /// Upload a file publicly, storing the serialized [`DataMap`] as part of
1916    /// the same upload payment batch.
1917    ///
1918    /// The returned [`FileUploadResult::data_map_address`] can be shared for
1919    /// public downloads via [`Client::data_map_fetch`].
1920    #[allow(clippy::too_many_lines)]
1921    pub async fn file_upload_public_with_mode(
1922        &self,
1923        path: &Path,
1924        mode: PaymentMode,
1925    ) -> Result<FileUploadResult> {
1926        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, None)
1927            .await
1928    }
1929
1930    /// Upload a file with progress events sent to the given channel.
1931    ///
1932    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1933    /// provided channel for UI progress feedback.
1934    #[allow(clippy::too_many_lines)]
1935    pub async fn file_upload_with_progress(
1936        &self,
1937        path: &Path,
1938        mode: PaymentMode,
1939        progress: Option<mpsc::Sender<UploadEvent>>,
1940    ) -> Result<FileUploadResult> {
1941        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Private, progress)
1942            .await
1943    }
1944
1945    /// Public file upload with progress events.
1946    ///
1947    /// Same as [`Client::file_upload_public_with_mode`] but sends
1948    /// [`UploadEvent`]s to the provided channel for UI progress feedback.
1949    #[allow(clippy::too_many_lines)]
1950    pub async fn file_upload_public_with_progress(
1951        &self,
1952        path: &Path,
1953        mode: PaymentMode,
1954        progress: Option<mpsc::Sender<UploadEvent>>,
1955    ) -> Result<FileUploadResult> {
1956        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, progress)
1957            .await
1958    }
1959
1960    #[allow(clippy::too_many_lines)]
1961    async fn file_upload_with_visibility_and_progress(
1962        &self,
1963        path: &Path,
1964        mode: PaymentMode,
1965        visibility: Visibility,
1966        progress: Option<mpsc::Sender<UploadEvent>>,
1967    ) -> Result<FileUploadResult> {
1968        debug!(
1969            "Streaming file upload with mode {mode:?}, visibility {visibility:?}: {}",
1970            path.display()
1971        );
1972
1973        // Pre-flight: verify enough temp disk space for the chunk spill.
1974        let file_size = std::fs::metadata(path)?.len();
1975        check_disk_space_for_spill(file_size)?;
1976
1977        // Phase 1: Encrypt file and spill chunks to temp directory.
1978        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1979        let (mut spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1980
1981        let data_map_address = match visibility {
1982            Visibility::Private => None,
1983            Visibility::Public => {
1984                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1985                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1986                })?;
1987                let address = compute_address(&serialized);
1988                info!(
1989                    "Public upload: adding DataMap chunk ({} bytes) at address {} to payment batch",
1990                    serialized.len(),
1991                    hex::encode(address)
1992                );
1993                spill.push(&serialized)?;
1994                Some(address)
1995            }
1996        };
1997
1998        let chunk_count = spill.len();
1999        info!(
2000            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
2001            path.display()
2002        );
2003        if let Some(ref tx) = progress {
2004            let _ = tx
2005                .send(UploadEvent::Encrypted {
2006                    total_chunks: chunk_count,
2007                })
2008                .await;
2009        }
2010
2011        // Phase 2: Decide payment mode and upload in waves from disk.
2012        //
2013        // For the merkle path, attempt to resume from a cached
2014        // receipt before paying again. The cache is keyed by the
2015        // CANONICAL source path so `./foo`, `/abs/foo`, and any
2016        // symlink alias all resolve to the same cache entry — a
2017        // crash-and-retry from a different cwd or via a different
2018        // alias still hits the receipt. Canonicalize may fail (the
2019        // file could have been moved between phase 1 and here); we
2020        // fall back to the display string in that case, which
2021        // preserves pre-fix behaviour rather than dropping cache
2022        // resume entirely.
2023        let file_path_key = std::fs::canonicalize(path)
2024            .map(|p| p.display().to_string())
2025            .unwrap_or_else(|_| path.display().to_string());
2026        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
2027            .should_use_merkle(chunk_count, mode)
2028        {
2029            info!("Using merkle batch payment for {chunk_count} file chunks");
2030
2031            let cached_merkle =
2032                crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
2033                    .map(|(_cache_path, cached)| cached);
2034
2035            let merkle_plan = match self
2036                .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
2037                .await
2038            {
2039                Ok(plan) => plan,
2040                Err(e) => {
2041                    if let Some(cached) = cached_merkle
2042                        .as_ref()
2043                        .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
2044                    {
2045                        info!(
2046                            "Merkle preflight failed ({e}); \
2047                             resuming with cached merkle proofs"
2048                        );
2049                        let (stored, sc, gc, stats) = self
2050                            .upload_waves_merkle(
2051                                &spill,
2052                                &spill.addresses,
2053                                cached,
2054                                &[],
2055                                progress.as_ref(),
2056                            )
2057                            .await?;
2058                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2059                        return Ok(FileUploadResult {
2060                            data_map,
2061                            chunks_stored: stored,
2062                            chunks_failed: 0,
2063                            total_chunks: chunk_count,
2064                            payment_mode_used: PaymentMode::Merkle,
2065                            storage_cost_atto: sc,
2066                            gas_cost_wei: gc,
2067                            data_map_address,
2068                            chunk_attempts_total: stats.chunk_attempts_total,
2069                            store_durations_ms: stats.store_durations_ms,
2070                            retries_histogram: stats.retries_histogram,
2071                        });
2072                    }
2073                    match &e {
2074                        Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
2075                            info!(
2076                                "Merkle preflight needs more peers ({msg}), \
2077                                 falling back to wave-batch"
2078                            );
2079                            let (stored, sc, gc, fb_stats) = self
2080                                .upload_waves_single(
2081                                    &spill,
2082                                    progress.as_ref(),
2083                                    Some(&file_path_key),
2084                                )
2085                                .await?;
2086                            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2087                            return Ok(FileUploadResult {
2088                                data_map,
2089                                chunks_stored: stored,
2090                                chunks_failed: 0,
2091                                total_chunks: chunk_count,
2092                                payment_mode_used: PaymentMode::Single,
2093                                storage_cost_atto: sc,
2094                                gas_cost_wei: gc,
2095                                data_map_address,
2096                                chunk_attempts_total: fb_stats.chunk_attempts_total,
2097                                store_durations_ms: fb_stats.store_durations_ms,
2098                                retries_histogram: fb_stats.retries_histogram,
2099                            });
2100                        }
2101                        _ => return Err(e),
2102                    }
2103                }
2104            };
2105
2106            if merkle_plan.to_upload.is_empty() {
2107                info!("All {chunk_count} merkle chunks already stored; skipping payment");
2108                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2109                crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2110                (
2111                    chunk_count,
2112                    PaymentMode::Merkle,
2113                    "0".to_string(),
2114                    0,
2115                    WaveAggregateStats::default(),
2116                )
2117            } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
2118                let remaining_chunks = merkle_plan.to_upload.len();
2119                if let Some(cached) = cached_merkle
2120                    .as_ref()
2121                    .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
2122                {
2123                    info!(
2124                        "{remaining_chunks} chunks remain below merkle threshold; \
2125                         reusing cached merkle proofs"
2126                    );
2127                    let (stored, sc, gc, stats) = self
2128                        .upload_waves_merkle(
2129                            &spill,
2130                            &merkle_plan.to_upload,
2131                            cached,
2132                            &merkle_plan.already_stored,
2133                            progress.as_ref(),
2134                        )
2135                        .await?;
2136                    crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2137                    (stored, PaymentMode::Merkle, sc, gc, stats)
2138                } else {
2139                    if cached_merkle.is_some() {
2140                        info!(
2141                            "{remaining_chunks} chunks remain below merkle threshold, \
2142                             and the cached merkle receipt does not cover them. \
2143                             Discarding cache and using single-node payment."
2144                        );
2145                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2146                    } else {
2147                        info!(
2148                            "{remaining_chunks} chunks need upload after merkle preflight; \
2149                             using single-node payment"
2150                        );
2151                    }
2152                    let (stored, sc, gc, stats) = self
2153                        .upload_spill_addresses_single(
2154                            &spill,
2155                            &merkle_plan.to_upload,
2156                            progress.as_ref(),
2157                            &merkle_plan.already_stored,
2158                            chunk_count,
2159                            Some(&file_path_key),
2160                        )
2161                        .await?;
2162                    crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2163                    (stored, PaymentMode::Single, sc, gc, stats)
2164                }
2165            } else {
2166                let batch_result = if let Some(cached) = cached_merkle.as_ref() {
2167                    // Validate the cache against the chunks that still need
2168                    // storage. Extra proofs are harmless: a previous attempt
2169                    // may have paid for chunks that are now already stored.
2170                    if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
2171                        info!(
2172                            "Skipping merkle payment phase; resuming with \
2173                             cached proofs for {} remaining chunks",
2174                            merkle_plan.to_upload.len()
2175                        );
2176                        Ok(cached.clone())
2177                    } else {
2178                        info!(
2179                            "Cached merkle receipt does not cover the current \
2180                             remaining chunks (cached={}, remaining={}). \
2181                             Discarding cache and paying fresh.",
2182                            cached.proofs.len(),
2183                            merkle_plan.to_upload.len()
2184                        );
2185                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2186                        self.pay_for_merkle_batch(
2187                            &merkle_plan.to_upload,
2188                            DATA_TYPE_CHUNK,
2189                            merkle_plan.to_upload_avg_size(),
2190                        )
2191                        .await
2192                        .inspect(|result| {
2193                            crate::data::client::cached_merkle::try_save(&file_path_key, result);
2194                        })
2195                    }
2196                } else {
2197                    self.pay_for_merkle_batch(
2198                        &merkle_plan.to_upload,
2199                        DATA_TYPE_CHUNK,
2200                        merkle_plan.to_upload_avg_size(),
2201                    )
2202                    .await
2203                    .inspect(|result| {
2204                        // Save BEFORE the store phase so a crash
2205                        // mid-upload leaves a resumable receipt.
2206                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
2207                    })
2208                };
2209
2210                let batch_result = match batch_result {
2211                    Ok(result) => result,
2212                    Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
2213                        info!("Merkle needs more peers ({msg}), falling back to wave-batch");
2214                        let (stored, sc, gc, fb_stats) = self
2215                            .upload_spill_addresses_single(
2216                                &spill,
2217                                &merkle_plan.to_upload,
2218                                progress.as_ref(),
2219                                &merkle_plan.already_stored,
2220                                chunk_count,
2221                                Some(&file_path_key),
2222                            )
2223                            .await?;
2224                        crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2225                        return Ok(FileUploadResult {
2226                            data_map,
2227                            chunks_stored: stored,
2228                            chunks_failed: 0,
2229                            total_chunks: chunk_count,
2230                            payment_mode_used: PaymentMode::Single,
2231                            storage_cost_atto: sc,
2232                            gas_cost_wei: gc,
2233                            data_map_address,
2234                            chunk_attempts_total: fb_stats.chunk_attempts_total,
2235                            store_durations_ms: fb_stats.store_durations_ms,
2236                            retries_histogram: fb_stats.retries_histogram,
2237                        });
2238                    }
2239                    Err(e) => return Err(e),
2240                };
2241
2242                let (stored, sc, gc, stats) = self
2243                    .upload_waves_merkle(
2244                        &spill,
2245                        &merkle_plan.to_upload,
2246                        &batch_result,
2247                        &merkle_plan.already_stored,
2248                        progress.as_ref(),
2249                    )
2250                    .await?;
2251                // Upload succeeded end-to-end; the cached receipt is
2252                // no longer needed.
2253                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2254                (stored, PaymentMode::Merkle, sc, gc, stats)
2255            }
2256        } else {
2257            let (stored, sc, gc, stats) = self
2258                .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
2259                .await?;
2260            // Full file success: drop any cached single-node receipt.
2261            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2262            (stored, PaymentMode::Single, sc, gc, stats)
2263        };
2264
2265        info!(
2266            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
2267            path.display()
2268        );
2269
2270        Ok(FileUploadResult {
2271            data_map,
2272            chunks_stored,
2273            chunks_failed: 0,
2274            total_chunks: chunk_count,
2275            payment_mode_used: actual_mode,
2276            storage_cost_atto,
2277            gas_cost_wei,
2278            data_map_address,
2279            chunk_attempts_total: stats.chunk_attempts_total,
2280            store_durations_ms: stats.store_durations_ms,
2281            retries_histogram: stats.retries_histogram,
2282        })
2283    }
2284
2285    /// Encrypt a file and spill chunks to a temp directory.
2286    ///
2287    /// Logs progress every 100 chunks so users get feedback during
2288    /// multi-GB encryptions.
2289    ///
2290    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
2291    async fn encrypt_file_to_spill(
2292        &self,
2293        path: &Path,
2294        progress: Option<&mpsc::Sender<UploadEvent>>,
2295    ) -> Result<(ChunkSpill, DataMap)> {
2296        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
2297
2298        let mut spill = ChunkSpill::new()?;
2299        while let Some(content) = chunk_rx.recv().await {
2300            spill.push(&content)?;
2301            let chunks_done = spill.len();
2302            if let Some(tx) = progress {
2303                if chunks_done.is_multiple_of(10) {
2304                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
2305                }
2306            }
2307            if chunks_done % 100 == 0 {
2308                let mb = spill.total_bytes() / (1024 * 1024);
2309                info!(
2310                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
2311                    path.display()
2312                );
2313            }
2314        }
2315
2316        // Await encryption completion to catch errors before paying.
2317        handle
2318            .await
2319            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
2320            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
2321
2322        let data_map = datamap_rx
2323            .await
2324            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
2325
2326        Ok((spill, data_map))
2327    }
2328
2329    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
2330    ///
2331    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
2332    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
2333    ///
2334    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
2335    async fn upload_waves_single(
2336        &self,
2337        spill: &ChunkSpill,
2338        progress: Option<&mpsc::Sender<UploadEvent>>,
2339        resume_key: Option<&str>,
2340    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2341        self.upload_spill_addresses_single(
2342            spill,
2343            &spill.addresses,
2344            progress,
2345            &[],
2346            spill.len(),
2347            resume_key,
2348        )
2349        .await
2350    }
2351
2352    async fn upload_spill_addresses_single(
2353        &self,
2354        spill: &ChunkSpill,
2355        addresses: &[[u8; 32]],
2356        progress: Option<&mpsc::Sender<UploadEvent>>,
2357        already_stored_addresses: &[[u8; 32]],
2358        total_chunks: usize,
2359        resume_key: Option<&str>,
2360    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2361        let mut total_stored = already_stored_addresses.len();
2362        let mut total_storage = Amount::ZERO;
2363        let mut total_gas: u128 = 0;
2364        let mut agg_stats = WaveAggregateStats::default();
2365        // A wave whose chunks fall short of quorum after retries must not abort
2366        // the file: its failures are accumulated here and surfaced as a single
2367        // `PartialUpload` only after every wave has been attempted, mirroring
2368        // `upload_waves_merkle`. Aborting on the first failed wave (the old `?`)
2369        // discarded all later waves' progress — already self-encrypted, spilled,
2370        // and in some cases already paid for — converting high per-chunk success
2371        // into 0% per-file success.
2372        // Seed with the addresses a preflight already confirmed stored (e.g.
2373        // the merkle-fallback path passes `merkle_plan.already_stored`), so a
2374        // returned `PartialUpload.stored` lists every stored chunk and
2375        // `stored_count == stored.len()` holds for programmatic callers.
2376        let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2377        let mut failed: Vec<([u8; 32], String)> = Vec::new();
2378        let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
2379        let wave_count = waves.len();
2380
2381        // Unconditional breadcrumb: lets a clean run confirm the continue-on-
2382        // partial single-node path is in effect (the old path aborted the file
2383        // on the first failed wave instead of continuing across all waves).
2384        info!(
2385            "single-node upload: {} chunk(s) in {wave_count} wave(s) (continue-on-partial)",
2386            addresses.len()
2387        );
2388
2389        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2390            let wave_num = wave_idx + 1;
2391            let wave_data: Vec<Bytes> = wave_addrs
2392                .iter()
2393                .map(|addr| spill.read_chunk(addr))
2394                .collect::<Result<Vec<_>>>()?;
2395
2396            info!(
2397                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
2398                wave_data.len()
2399            );
2400            if let Some(tx) = progress {
2401                let _ = tx
2402                    .send(UploadEvent::QuotingChunks {
2403                        wave: wave_num,
2404                        total_waves: wave_count,
2405                        chunks_in_wave: wave_data.len(),
2406                    })
2407                    .await;
2408            }
2409            // Fold this wave's result. A quorum shortfall (`PartialUpload`) is
2410            // recoverable and its parts are returned to be recorded here;
2411            // genuinely fatal errors propagate via `?` and abort the file, as in
2412            // `upload_waves_merkle`.
2413            let outcome = fold_single_wave(
2414                self.batch_upload_chunks_with_events(
2415                    wave_data,
2416                    progress,
2417                    total_stored,
2418                    total_chunks,
2419                    resume_key,
2420                )
2421                .await,
2422            )?;
2423
2424            if !outcome.failed.is_empty() {
2425                warn!(
2426                    "Wave {wave_num}/{wave_count}: {} chunk(s) failed to store after retries; \
2427                     continuing with remaining waves",
2428                    outcome.failed.len()
2429                );
2430            }
2431
2432            total_stored += outcome.stored.len();
2433            stored_addresses.extend(outcome.stored);
2434            failed.extend(outcome.failed);
2435            total_storage += outcome.storage_atto;
2436            total_gas = total_gas.saturating_add(outcome.gas_wei);
2437            // Merge per-wave stats (a quorum-short wave contributes none, since
2438            // `PartialUpload` carries no stats).
2439            agg_stats.chunk_attempts_total = agg_stats
2440                .chunk_attempts_total
2441                .saturating_add(outcome.stats.chunk_attempts_total);
2442            agg_stats
2443                .store_durations_ms
2444                .extend(outcome.stats.store_durations_ms);
2445            for (slot, count) in agg_stats
2446                .retries_histogram
2447                .iter_mut()
2448                .zip(outcome.stats.retries_histogram.iter())
2449            {
2450                *slot = slot.saturating_add(*count);
2451            }
2452
2453            if let Some(tx) = progress {
2454                let _ = tx
2455                    .send(UploadEvent::WaveComplete {
2456                        wave: wave_num,
2457                        total_waves: wave_count,
2458                        stored_so_far: total_stored,
2459                        total: total_chunks,
2460                    })
2461                    .await;
2462            }
2463        }
2464
2465        // Any chunk still failed after every wave was attempted means the file
2466        // is not fully stored — surface it as `PartialUpload` (never silently
2467        // succeed with missing chunks), carrying the real on-chain spend.
2468        if !failed.is_empty() {
2469            let failed_count = failed.len();
2470            warn!(
2471                "single-node upload incomplete: {failed_count}/{total_chunks} chunks failed after retries"
2472            );
2473            return Err(Error::PartialUpload {
2474                stored: stored_addresses,
2475                stored_count: total_stored,
2476                failed,
2477                failed_count,
2478                total_chunks,
2479                spend: Box::new(PartialUploadSpend {
2480                    storage_cost_atto: total_storage.to_string(),
2481                    gas_cost_wei: total_gas,
2482                }),
2483                reason: format!("{failed_count} chunk(s) failed to store after retries"),
2484            });
2485        }
2486
2487        Ok((
2488            total_stored,
2489            total_storage.to_string(),
2490            total_gas,
2491            agg_stats,
2492        ))
2493    }
2494
2495    /// Upload chunks from a spill using pre-computed merkle proofs.
2496    ///
2497    /// Reads one wave at a time from disk, pairs each chunk with its proof,
2498    /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
2499    ///
2500    /// A chunk that is transiently short of quorum (`InsufficientPeers`) does
2501    /// **not** abort the file, nor does it block the pipeline: each wave is
2502    /// stored in a **single pass** (no in-wave backoff barrier), and chunks
2503    /// short of quorum are collected into a file-level deferred set rather than
2504    /// retried in place. After the last wave, [`merkle_deferred_retry`] retries
2505    /// the whole deferred set in concurrent rounds ([`DEFERRED_ROUND_DELAYS_SECS`]
2506    /// delays), re-reading each chunk's body from the spill and reusing its
2507    /// proof. This keeps every wave running at full fan-out instead of parking
2508    /// idle slots behind one slow chunk's backoff, while peak memory stays
2509    /// bounded (bodies are re-read from disk, never pinned). Non-quorum errors
2510    /// (e.g. a missing proof) stay fatal and abort immediately.
2511    ///
2512    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)` on success.
2513    /// Costs come from the `batch_result` which was populated during payment.
2514    ///
2515    /// # Errors
2516    ///
2517    /// Returns [`Error::PartialUpload`] if any chunk is still short of quorum
2518    /// after all retries across every wave (other chunks remain stored), or the
2519    /// underlying error for a non-quorum failure.
2520    async fn upload_waves_merkle(
2521        &self,
2522        spill: &ChunkSpill,
2523        addresses: &[[u8; 32]],
2524        batch_result: &MerkleBatchPaymentResult,
2525        already_stored_addresses: &[[u8; 32]],
2526        progress: Option<&mpsc::Sender<UploadEvent>>,
2527    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2528        let mut total_stored = already_stored_addresses.len();
2529        let total_chunks = total_stored + addresses.len();
2530        let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2531        let mut failed: Vec<([u8; 32], String)> = Vec::new();
2532        // Chunks short of quorum on their single wave pass are collected here and
2533        // retried after the last wave (see `merkle_deferred_retry`), so a slow
2534        // chunk never holds its wave's other slots idle behind a backoff.
2535        let mut deferred: Vec<([u8; 32], String)> = Vec::new();
2536        let mut agg_stats = WaveAggregateStats::default();
2537
2538        // Chunks without a merkle proof were never paid for: a partial
2539        // `pay_for_merkle_multi_batch` result carries proofs only for the
2540        // sub-batches whose on-chain payment succeeded. Such a chunk cannot be
2541        // stored, so record it as failed (surfaced via `PartialUpload` once the
2542        // storable chunks have been attempted) rather than letting its
2543        // "missing proof" error abort the whole file and discard every other
2544        // chunk's progress.
2545        let (to_store, missing_proof) =
2546            partition_addresses_by_proof(addresses, &batch_result.proofs);
2547        if !missing_proof.is_empty() {
2548            warn!(
2549                "{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
2550                missing_proof.len()
2551            );
2552            for addr in &missing_proof {
2553                failed.push((
2554                    *addr,
2555                    format!("Missing merkle proof for chunk {}", hex::encode(addr)),
2556                ));
2557            }
2558        }
2559
2560        let waves: Vec<&[[u8; 32]]> = to_store.chunks(UPLOAD_WAVE_SIZE).collect();
2561        let wave_count = waves.len();
2562
2563        let store_limiter = self.controller().store.clone();
2564
2565        // Store one chunk to its (freshly re-collected) close group, reusing the
2566        // chunk's merkle proof. Shared across every retry round so a converged
2567        // routing table yields a fresh group. Only `InsufficientPeers` is
2568        // recoverable; a missing proof stays fatal. Mirrors the external-signer
2569        // path's closure in `merkle_upload_chunks`.
2570        let store_one = |addr: [u8; 32], content: Bytes| {
2571            let limiter = store_limiter.clone();
2572            let proof_bytes = batch_result.proofs.get(&addr).cloned();
2573            async move {
2574                let started = std::time::Instant::now();
2575                let proof = proof_bytes.ok_or_else(|| {
2576                    Error::Payment(format!(
2577                        "Missing merkle proof for chunk {}",
2578                        hex::encode(addr)
2579                    ))
2580                })?;
2581                let peers = self.put_target_peers(&addr).await?;
2582                observe_op(
2583                    &limiter,
2584                    || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
2585                    classify_error,
2586                )
2587                .await
2588                .map(|_| started)
2589            }
2590        };
2591
2592        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2593            let wave_num = wave_idx + 1;
2594            let wave = spill.read_wave(wave_addrs)?;
2595
2596            info!(
2597                "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
2598                wave.len()
2599            );
2600
2601            // Clamp fan-out to wave size — partial last wave should
2602            // not pay for extra slots (see PERF-RESULTS.md).
2603            let store_concurrency = store_limiter.current().min(wave.len().max(1));
2604            let chunks: Vec<([u8; 32], Bytes)> = wave
2605                .into_iter()
2606                .map(|(content, addr)| (addr, content))
2607                .collect();
2608
2609            // Store the wave in a SINGLE pass (`max_attempts = 1`, no backoff):
2610            // quorum-short chunks are collected and deferred to a post-wave
2611            // concurrent retry rather than parking this wave's other slots
2612            // behind a backoff. `stored_offset` is the running cumulative count
2613            // so the progress events the driver emits stay correctly numbered
2614            // across waves.
2615            let outcome = merkle_store_with_retry(
2616                chunks,
2617                store_concurrency,
2618                1,
2619                std::time::Duration::ZERO,
2620                progress,
2621                total_stored,
2622                total_chunks,
2623                &store_one,
2624            )
2625            .await?;
2626
2627            // Record this wave's confirmed stores from the explicit set the
2628            // store helper reports. Using that set (rather than inferring
2629            // "wave chunks minus failed") keeps `stored_addresses` correct even
2630            // when a fatal abort leaves some of the wave neither stored nor
2631            // reported short of quorum.
2632            stored_addresses.extend(&outcome.stored_addresses);
2633            total_stored = outcome.stored;
2634
2635            // Merge per-wave stats (durations, attempts, per-round histogram).
2636            agg_stats.chunk_attempts_total = agg_stats
2637                .chunk_attempts_total
2638                .saturating_add(outcome.stats.chunk_attempts_total);
2639            agg_stats
2640                .store_durations_ms
2641                .extend(outcome.stats.store_durations_ms);
2642            for (slot, count) in agg_stats
2643                .retries_histogram
2644                .iter_mut()
2645                .zip(outcome.stats.retries_histogram.iter())
2646            {
2647                *slot = slot.saturating_add(*count);
2648            }
2649
2650            if let Some(e) = outcome.fatal {
2651                // A non-quorum store error is fatal (missing proofs were
2652                // filtered out above, so this is a genuine network/store
2653                // failure). Preserve every chunk stored so far — including this
2654                // wave's same-pass successes — and report every not-stored chunk
2655                // as failed, so the `PartialUpload` counts are accurate rather
2656                // than omitting same-wave stores and under-counting failures.
2657                warn!("merkle wave {wave_num}/{wave_count} aborted: {e}");
2658                // Best per-chunk messages we already have: missing-proof, this
2659                // wave's shortfalls, and earlier waves' deferred shortfalls.
2660                let mut known_failed = failed;
2661                known_failed.extend(outcome.failed_addresses);
2662                known_failed.extend(std::mem::take(&mut deferred));
2663                return Err(partial_upload_after_fatal(
2664                    addresses,
2665                    stored_addresses,
2666                    total_stored,
2667                    total_chunks,
2668                    known_failed,
2669                    PartialUploadSpend {
2670                        storage_cost_atto: batch_result.storage_cost_atto.clone(),
2671                        gas_cost_wei: batch_result.gas_cost_wei,
2672                    },
2673                    format!("merkle chunk store aborted: {e}"),
2674                ));
2675            }
2676
2677            // Non-fatal: this wave's quorum-short chunks are deferred (not failed
2678            // yet) for the post-wave concurrent retry. A deferred chunk joins
2679            // `stored_addresses` only if/when a later round stores it.
2680            deferred.extend(outcome.failed_addresses);
2681
2682            if let Some(tx) = progress {
2683                let _ = tx
2684                    .send(UploadEvent::WaveComplete {
2685                        wave: wave_num,
2686                        total_waves: wave_count,
2687                        stored_so_far: total_stored,
2688                        total: total_chunks,
2689                    })
2690                    .await;
2691            }
2692        }
2693
2694        // The wave passes never blocked on backoff; now retry the whole
2695        // file-level deferred set in concurrent rounds. Bodies are re-read from
2696        // the spill at retry time (peak RAM unchanged) and proofs are re-attached
2697        // by `store_one`. Chunks still short after the final round become
2698        // `failed`; a non-quorum error aborts as `PartialUpload`.
2699        if !deferred.is_empty() {
2700            info!(
2701                "Deferring {} merkle chunk(s) short of quorum for concurrent retry after final wave",
2702                deferred.len()
2703            );
2704            let dr = merkle_deferred_retry(
2705                deferred,
2706                &DEFERRED_ROUND_DELAYS_SECS,
2707                // Read and store at most one wave's worth of bodies at a time so
2708                // the deferred path keeps the wave path's ~256 MB peak-memory
2709                // bound regardless of how many chunks were deferred file-wide.
2710                UPLOAD_WAVE_SIZE,
2711                |addrs: &[[u8; 32]]| {
2712                    spill.read_wave(addrs).map(|wave| {
2713                        wave.into_iter()
2714                            .map(|(content, addr)| (addr, content))
2715                            .collect()
2716                    })
2717                },
2718                |n: usize| store_limiter.current().min(n.max(1)),
2719                progress,
2720                total_stored,
2721                total_chunks,
2722                &store_one,
2723            )
2724            .await?;
2725
2726            stored_addresses.extend(dr.stored_addresses);
2727            total_stored = dr.stored;
2728
2729            // Merge the deferred pass's stats — its histogram is already mapped
2730            // to the right per-round slots — into the file aggregate.
2731            agg_stats.chunk_attempts_total = agg_stats
2732                .chunk_attempts_total
2733                .saturating_add(dr.stats.chunk_attempts_total);
2734            agg_stats
2735                .store_durations_ms
2736                .extend(dr.stats.store_durations_ms);
2737            for (slot, count) in agg_stats
2738                .retries_histogram
2739                .iter_mut()
2740                .zip(dr.stats.retries_histogram.iter())
2741            {
2742                *slot = slot.saturating_add(*count);
2743            }
2744
2745            if let Some(reason) = dr.fatal {
2746                // A non-quorum store error during a deferred round is fatal, the
2747                // same as in the wave path: preserve everything stored so far and
2748                // report every not-stored chunk as failed.
2749                warn!("merkle deferred retry aborted: {reason}");
2750                let mut known_failed = failed;
2751                known_failed.extend(dr.failed_addresses);
2752                return Err(partial_upload_after_fatal(
2753                    addresses,
2754                    stored_addresses,
2755                    total_stored,
2756                    total_chunks,
2757                    known_failed,
2758                    PartialUploadSpend {
2759                        storage_cost_atto: batch_result.storage_cost_atto.clone(),
2760                        gas_cost_wei: batch_result.gas_cost_wei,
2761                    },
2762                    format!("merkle chunk store aborted: {reason}"),
2763                ));
2764            }
2765            failed.extend(dr.failed_addresses);
2766        }
2767
2768        // A file with any permanently-failed chunk is not fully stored — surface
2769        // it as `PartialUpload`, but only after the single wave pass and every
2770        // deferred retry round are exhausted (never silently succeed with
2771        // missing chunks).
2772        if !failed.is_empty() {
2773            let failed_count = failed.len();
2774            let total_attempts = 1 + DEFERRED_ROUND_DELAYS_SECS.len();
2775            warn!(
2776                "merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
2777            );
2778            return Err(Error::PartialUpload {
2779                stored: stored_addresses,
2780                stored_count: total_stored,
2781                failed,
2782                failed_count,
2783                total_chunks,
2784                spend: Box::new(PartialUploadSpend {
2785                    storage_cost_atto: batch_result.storage_cost_atto.clone(),
2786                    gas_cost_wei: batch_result.gas_cost_wei,
2787                }),
2788                reason: format!(
2789                    "{failed_count} chunk(s) short of quorum after {total_attempts} attempts"
2790                ),
2791            });
2792        }
2793
2794        Ok((
2795            total_stored,
2796            batch_result.storage_cost_atto.clone(),
2797            batch_result.gas_cost_wei,
2798            agg_stats,
2799        ))
2800    }
2801
2802    /// Download and decrypt a file from the network, writing it to disk.
2803    ///
2804    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2805    /// memory at a time, avoiding OOM on large files. Chunks are fetched
2806    /// concurrently within each batch, then decrypted data is written to
2807    /// disk incrementally.
2808    ///
2809    /// Returns the number of bytes written.
2810    ///
2811    /// # Panics
2812    ///
2813    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2814    /// Will panic if called from a `current_thread` runtime because
2815    /// `streaming_decrypt` takes a synchronous callback that must bridge
2816    /// back to async via `block_in_place`.
2817    ///
2818    /// # Errors
2819    ///
2820    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2821    /// or the file cannot be written.
2822    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2823        self.file_download_with_progress(data_map, output, None)
2824            .await
2825    }
2826
2827    /// Download and decrypt a file, trying the requested number of
2828    /// closest peers for every chunk fetch.
2829    ///
2830    /// Returns the number of bytes written.
2831    ///
2832    /// # Errors
2833    ///
2834    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2835    /// or the file cannot be written.
2836    pub async fn file_download_from_closest_peers(
2837        &self,
2838        data_map: &DataMap,
2839        output: &Path,
2840        peer_count: NonZeroUsize,
2841    ) -> Result<u64> {
2842        self.file_download_with_progress_using_peer_count(data_map, output, None, peer_count.get())
2843            .await
2844    }
2845
2846    /// Download and decrypt a file with progress events, trying the
2847    /// requested number of closest peers for every chunk fetch.
2848    ///
2849    /// Same as [`Client::file_download_from_closest_peers`] but sends
2850    /// [`DownloadEvent`]s for UI feedback.
2851    ///
2852    /// # Errors
2853    ///
2854    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2855    /// or the file cannot be written.
2856    pub async fn file_download_with_progress_from_closest_peers(
2857        &self,
2858        data_map: &DataMap,
2859        output: &Path,
2860        progress: Option<mpsc::Sender<DownloadEvent>>,
2861        peer_count: NonZeroUsize,
2862    ) -> Result<u64> {
2863        self.file_download_with_progress_using_peer_count(
2864            data_map,
2865            output,
2866            progress,
2867            peer_count.get(),
2868        )
2869        .await
2870    }
2871
2872    /// Download and decrypt a file with peer-health diagnostics.
2873    ///
2874    /// Each file chunk is fetched by querying every selected closest peer,
2875    /// not by returning after the first successful peer. The returned report
2876    /// records which peers had each chunk and which did not. DataMap
2877    /// resolution still uses the normal early-return fetch path; diagnostics
2878    /// are for file chunks only.
2879    ///
2880    /// # Errors
2881    ///
2882    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2883    /// or the file cannot be written.
2884    pub async fn file_download_with_peer_report_from_closest_peers(
2885        &self,
2886        data_map: &DataMap,
2887        output: &Path,
2888        progress: Option<mpsc::Sender<DownloadEvent>>,
2889        peer_count: NonZeroUsize,
2890    ) -> Result<FileDownloadWithPeerReport> {
2891        let chunk_reports = Arc::new(Mutex::new(Vec::new()));
2892        let bytes_written = self
2893            .file_download_with_progress_using_peer_count_and_reports(
2894                data_map,
2895                output,
2896                progress,
2897                peer_count.get(),
2898                Some(chunk_reports.clone()),
2899            )
2900            .await?;
2901
2902        let chunk_reports = chunk_reports
2903            .lock()
2904            .map_err(|_| Error::Storage("file chunk peer report lock poisoned".to_string()))?
2905            .clone();
2906        let chunk_reports = file_chunk_reports_from_recorded_sweeps(chunk_reports);
2907
2908        Ok(FileDownloadWithPeerReport {
2909            bytes_written,
2910            chunk_reports,
2911        })
2912    }
2913
2914    async fn download_fetch_file_chunk(
2915        &self,
2916        idx: usize,
2917        hash: XorName,
2918        context: FileDownloadFetchContext,
2919        is_deferred_retry: bool,
2920        attempt: usize,
2921    ) -> std::result::Result<DownloadBatchEntry, self_encryption::Error> {
2922        let addr = hash.0;
2923        let addr_hex = hex::encode(addr);
2924
2925        let chunk_content = if let Some(peer_reports) = context.peer_reports {
2926            match self
2927                .chunk_get_from_closest_peer_group(&addr, context.peer_count)
2928                .await
2929            {
2930                Ok(results) => {
2931                    let (content, sweep) = file_chunk_sweep_report_from_peer_results(
2932                        attempt,
2933                        is_deferred_retry,
2934                        &results,
2935                    );
2936                    peer_reports
2937                        .lock()
2938                        .map_err(|_| {
2939                            self_encryption::Error::Generic(
2940                                "file chunk peer report lock poisoned".to_string(),
2941                            )
2942                        })?
2943                        .push(RecordedFileChunkPeerSweep {
2944                            index: idx + 1,
2945                            address: addr,
2946                            sweep,
2947                        });
2948                    content
2949                }
2950                Err(e) => {
2951                    if is_deferred_retry {
2952                        info!(
2953                            "Deferred all-peer retry for {addr_hex} hit transient error: {e}; re-deferring"
2954                        );
2955                    } else {
2956                        info!("First-pass all-peer fetch error for {addr_hex}: {e}; deferring");
2957                    }
2958                    peer_reports
2959                        .lock()
2960                        .map_err(|_| {
2961                            self_encryption::Error::Generic(
2962                                "file chunk peer report lock poisoned".to_string(),
2963                            )
2964                        })?
2965                        .push(RecordedFileChunkPeerSweep {
2966                            index: idx + 1,
2967                            address: addr,
2968                            sweep: file_chunk_sweep_report_from_error(
2969                                attempt,
2970                                is_deferred_retry,
2971                                &e,
2972                            ),
2973                        });
2974                    None
2975                }
2976            }
2977        } else {
2978            match self
2979                .chunk_get_observed_from_closest_peers(&addr, context.peer_count)
2980                .await
2981            {
2982                Ok(Some(chunk)) => Some(chunk.content),
2983                Ok(None) => None,
2984                Err(e) => {
2985                    if is_deferred_retry {
2986                        info!(
2987                            "Deferred retry for {addr_hex} hit transient error: {e}; re-deferring"
2988                        );
2989                    } else {
2990                        info!("First-pass fetch error for {addr_hex}: {e}; deferring");
2991                    }
2992                    None
2993                }
2994            }
2995        };
2996
2997        let Some(content) = chunk_content else {
2998            return Ok((idx, Err(hash)));
2999        };
3000
3001        let fetched = context
3002            .fetched_ref
3003            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
3004            + 1;
3005        if is_deferred_retry {
3006            info!(
3007                "Downloaded {fetched}/{} (deferred retry)",
3008                context.total_chunks
3009            );
3010        } else {
3011            let total_chunks = context.total_chunks;
3012            info!("Downloaded {fetched}/{total_chunks}");
3013        }
3014        if let Some(ref tx) = context.progress_ref {
3015            let _ = tx.try_send(DownloadEvent::ChunksFetched {
3016                fetched,
3017                total: context.total_chunks,
3018            });
3019        }
3020
3021        Ok((idx, Ok(content)))
3022    }
3023
3024    /// Shared download core: resolve the DataMap, then fetch + streaming-decrypt
3025    /// the file one batch at a time, handing each decrypted plaintext segment
3026    /// (in order) to `on_chunk`. Constant memory — only one decrypt batch is
3027    /// resident at a time. Returns the total plaintext bytes produced.
3028    ///
3029    /// `on_chunk` is async so a sink can apply backpressure (e.g. a bounded
3030    /// channel). Driving the decrypt iterator runs the batched chunk fetch via
3031    /// `block_in_place`, so this requires a multi-threaded Tokio runtime.
3032    ///
3033    /// Every chunk fetch tries `peer_count` closest peers.
3034    ///
3035    /// Progress reporting (via `progress`):
3036    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
3037    ///    `ChunksFetched` with `total: 0` during resolution)
3038    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
3039    /// 3. Fetches data chunks with accurate `fetched/total` progress
3040    async fn download_decrypted_chunks<F, Fut>(
3041        &self,
3042        data_map: &DataMap,
3043        progress: Option<mpsc::Sender<DownloadEvent>>,
3044        peer_count: usize,
3045        peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
3046        mut on_chunk: F,
3047    ) -> Result<u64>
3048    where
3049        F: FnMut(Bytes) -> Fut,
3050        Fut: std::future::Future<Output = Result<()>>,
3051    {
3052        let handle = Handle::current();
3053
3054        // Phase 1: Resolve hierarchical DataMap to root level.
3055        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
3056        let root_map = if data_map.is_child() {
3057            let dm_chunks = data_map.len();
3058            if let Some(ref tx) = progress {
3059                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
3060                    total_map_chunks: dm_chunks,
3061                });
3062            }
3063
3064            let resolve_progress = progress.clone();
3065            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3066
3067            let resolved = tokio::task::block_in_place(|| {
3068                let counter_ref = resolve_counter.clone();
3069                let progress_ref = resolve_progress.clone();
3070                let fetch_limiter = self.controller().fetch.clone();
3071                let fetch = |batch: &[(usize, XorName)]| {
3072                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
3073                    let counter = counter_ref.clone();
3074                    let prog = progress_ref.clone();
3075                    let limiter = fetch_limiter.clone();
3076                    handle.block_on(async {
3077                        // Use rebucketed_unordered so the in-flight cap
3078                        // is re-read from the limiter as each slot frees.
3079                        // `buffer_unordered` snapshots the cap once at
3080                        // pipeline build, which means observe_op
3081                        // signals from inside chunk_get cannot reduce
3082                        // concurrency on the current batch — exactly
3083                        // the case where load-shedding is needed.
3084                        let mut results = rebucketed_unordered(
3085                            &limiter,
3086                            batch_owned,
3087                            |(idx, hash): (usize, XorName)| {
3088                                let counter = counter.clone();
3089                                let prog = prog.clone();
3090                                async move {
3091                                    let addr = hash.0;
3092                                    // chunk_get_observed feeds the
3093                                    // adaptive fetch limiter once per
3094                                    // call via chunk_get_outcome
3095                                    // (Ok(None) -> Timeout is the
3096                                    // load-shedding signal for
3097                                    // sustained close-group exhaustion).
3098                                    let chunk = self
3099                                        .chunk_get_observed_from_closest_peers(&addr, peer_count)
3100                                        .await
3101                                        .map_err(|e| {
3102                                            self_encryption::Error::Generic(format!(
3103                                                "DataMap resolution failed: {e}"
3104                                            ))
3105                                        })?
3106                                        .ok_or_else(|| {
3107                                            self_encryption::Error::Generic(format!(
3108                                                "DataMap chunk not found: {}",
3109                                                hex::encode(addr)
3110                                            ))
3111                                        })?;
3112                                    let fetched = counter
3113                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
3114                                        + 1;
3115                                    if let Some(ref tx) = prog {
3116                                        let _ =
3117                                            tx.try_send(DownloadEvent::MapChunkFetched { fetched });
3118                                    }
3119                                    Ok::<_, self_encryption::Error>((idx, chunk.content))
3120                                }
3121                            },
3122                        )
3123                        .await?;
3124                        // CRITICAL: self_encryption::get_root_data_map_parallel
3125                        // pairs the returned Vec POSITIONALLY with the input
3126                        // hashes via .zip() and discards our idx field.
3127                        // rebucketed_unordered preserves first-completion
3128                        // order, so sort by idx to restore input order
3129                        // before returning.
3130                        results.sort_by_key(|(idx, _)| *idx);
3131                        Ok(results)
3132                    })
3133                };
3134                get_root_data_map_parallel(data_map.clone(), &fetch)
3135            })
3136            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
3137
3138            info!(
3139                "Resolved hierarchical DataMap: {} data chunks",
3140                resolved.len()
3141            );
3142            resolved
3143        } else {
3144            data_map.clone()
3145        };
3146
3147        // Phase 2: Now we know the real chunk count.
3148        let total_chunks = root_map.len();
3149        if let Some(ref tx) = progress {
3150            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
3151        }
3152
3153        // Phase 3: Fetch and decrypt data chunks with accurate progress.
3154        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3155        let fetched_for_closure = fetched_counter.clone();
3156        let progress_for_closure = progress.clone();
3157        let peer_reports_for_closure = peer_reports.clone();
3158
3159        let fetch_limiter_outer = self.controller().fetch.clone();
3160        let usable_memory = usable_memory_bytes();
3161        let configured_batch_floor = stream_decrypt_batch_size();
3162        let fetch_cap = fetch_limiter_outer.current();
3163        let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
3164            total_chunks,
3165            fetch_cap,
3166            configured_batch_floor,
3167            usable_memory,
3168        );
3169        info!(
3170            total_chunks,
3171            fetch_cap,
3172            configured_batch_floor,
3173            ?usable_memory,
3174            decrypt_batch_size,
3175            "Selected adaptive stream decrypt batch size"
3176        );
3177
3178        let stream = streaming_decrypt_with_batch_size(
3179            &root_map,
3180            |batch: &[(usize, XorName)]| {
3181                let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
3182                let fetch_context = FileDownloadFetchContext {
3183                    total_chunks,
3184                    peer_count,
3185                    fetched_ref: fetched_for_closure.clone(),
3186                    progress_ref: progress_for_closure.clone(),
3187                    peer_reports: peer_reports_for_closure.clone(),
3188                };
3189                let fetch_limiter = fetch_limiter_outer.clone();
3190
3191                tokio::task::block_in_place(|| {
3192                    handle.block_on(async {
3193                        // First pass: try every chunk in the batch. Normal mode
3194                        // uses chunk_get_observed (early-return after a found
3195                        // peer); diagnostic mode asks every selected closest
3196                        // peer and records that sweep before returning bytes.
3197                        // Any missing chunk or transient fetch error is encoded
3198                        // as Err(hash), so one noisy chunk does not abort the
3199                        // whole batch before the deferred retry rounds run.
3200                        let first_fetch_context = fetch_context.clone();
3201                        let raw: Vec<DownloadBatchEntry> = rebucketed_unordered(
3202                            &fetch_limiter,
3203                            batch_owned,
3204                            |(idx, hash): (usize, XorName)| {
3205                                let fetch_context = first_fetch_context.clone();
3206                                async move {
3207                                    self.download_fetch_file_chunk(
3208                                        idx,
3209                                        hash,
3210                                        fetch_context,
3211                                        false,
3212                                        FIRST_DIAGNOSTIC_FETCH_ATTEMPT,
3213                                    )
3214                                    .await
3215                                }
3216                            },
3217                        )
3218                        .await?;
3219
3220                        // Partition: things we already have vs the
3221                        // deferred set we need to retry.
3222                        let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
3223                        let mut deferred: Vec<(usize, XorName)> = Vec::new();
3224                        for (idx, inner) in raw {
3225                            match inner {
3226                                Ok(bytes) => results.push((idx, bytes)),
3227                                Err(hash) => deferred.push((idx, hash)),
3228                            }
3229                        }
3230
3231                        // Deferred retry pass: retry the deferred chunks
3232                        // in CONCURRENT rounds (reusing the fetch
3233                        // limiter's cap), not serially. The first round
3234                        // fires immediately — most deferrals on a
3235                        // healthy-but-lossy link are peer-side noise
3236                        // that clears in well under a second, and
3237                        // serializing them behind mandatory multi-second
3238                        // sleeps was the single biggest throughput sink
3239                        // on such links (a batch deferring ~20 chunks
3240                        // burned minutes of near-zero throughput even
3241                        // though every chunk succeeded on its first
3242                        // retry). Only chunks that survive a round get a
3243                        // longer back-off before the next, so genuine
3244                        // saturation still gets time to settle.
3245                        if !deferred.is_empty() {
3246                            // Round delays in seconds. Round 0 is
3247                            // immediate; later rounds back off to ride
3248                            // out sustained saturation.
3249                            const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
3250                            info!(
3251                                "Deferring {} chunk(s) for concurrent retry after batch settles",
3252                                deferred.len()
3253                            );
3254                            let mut remaining = deferred;
3255                            for (round, &delay_secs) in
3256                                DEFERRED_ROUND_DELAYS_SECS.iter().enumerate()
3257                            {
3258                                if remaining.is_empty() {
3259                                    break;
3260                                }
3261                                if delay_secs > 0 {
3262                                    tokio::time::sleep(std::time::Duration::from_secs(delay_secs))
3263                                        .await;
3264                                }
3265                                info!(
3266                                    "Deferred retry round {}/{}: {} chunk(s)",
3267                                    round + 1,
3268                                    DEFERRED_ROUND_DELAYS_SECS.len(),
3269                                    remaining.len(),
3270                                );
3271                                let round_input = std::mem::take(&mut remaining);
3272                                let retry_fetch_context = fetch_context.clone();
3273                                let round_results: Vec<DownloadBatchEntry> = rebucketed_unordered(
3274                                    &fetch_limiter,
3275                                    round_input,
3276                                    |(idx, hash): (usize, XorName)| {
3277                                        let fetch_context = retry_fetch_context.clone();
3278                                        async move {
3279                                            self.download_fetch_file_chunk(
3280                                                idx,
3281                                                hash,
3282                                                fetch_context,
3283                                                true,
3284                                                round + DEFERRED_RETRY_ATTEMPT_OFFSET,
3285                                            )
3286                                            .await
3287                                        }
3288                                    },
3289                                )
3290                                .await?;
3291                                for (idx, inner) in round_results {
3292                                    match inner {
3293                                        Ok(bytes) => results.push((idx, bytes)),
3294                                        Err(hash) => remaining.push((idx, hash)),
3295                                    }
3296                                }
3297                            }
3298                            if let Some((_, hash)) = remaining.first() {
3299                                return Err(self_encryption::Error::Generic(format!(
3300                                    "Chunk not found after {} deferred retry rounds: {}",
3301                                    DEFERRED_ROUND_DELAYS_SECS.len(),
3302                                    hex::encode(hash.0),
3303                                )));
3304                            }
3305                        }
3306
3307                        // streaming_decrypt itself sort_by_keys before
3308                        // zipping, but the same closure is also passed
3309                        // through get_root_data_map_parallel internally
3310                        // (see self_encryption::stream_decrypt.rs::new), and
3311                        // THAT path zips positionally without sorting. Sort
3312                        // here so both consumers see input order.
3313                        results.sort_by_key(|(idx, _)| *idx);
3314                        Ok(results)
3315                    })
3316                })
3317            },
3318            decrypt_batch_size,
3319        )
3320        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
3321
3322        // Drive the iterator (each `next()` runs the batched fetch via
3323        // block_in_place) and hand each decrypted segment to the sink in
3324        // order. Awaiting the sink between items yields back to the runtime so
3325        // a bounded sink can apply backpressure.
3326        let mut bytes_total = 0u64;
3327        for chunk_result in stream {
3328            let chunk: Bytes =
3329                chunk_result.map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
3330            bytes_total += chunk.len() as u64;
3331            on_chunk(chunk).await?;
3332        }
3333        Ok(bytes_total)
3334    }
3335
3336    /// Download and decrypt a file to disk, with optional progress events.
3337    ///
3338    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI
3339    /// feedback. Streams to a temp file (one decrypt batch resident at a time)
3340    /// and renames atomically on success. A `TempDownload` guard removes the
3341    /// staging file on any error path, including a panic.
3342    pub async fn file_download_with_progress(
3343        &self,
3344        data_map: &DataMap,
3345        output: &Path,
3346        progress: Option<mpsc::Sender<DownloadEvent>>,
3347    ) -> Result<u64> {
3348        self.file_download_with_progress_using_peer_count(
3349            data_map,
3350            output,
3351            progress,
3352            self.config().close_group_size,
3353        )
3354        .await
3355    }
3356
3357    /// Download and decrypt a file to disk with progress events, trying
3358    /// `peer_count` closest peers for every chunk fetch.
3359    ///
3360    /// Streams to a temp file (one decrypt batch resident at a time) and
3361    /// renames atomically on success.
3362    async fn file_download_with_progress_using_peer_count(
3363        &self,
3364        data_map: &DataMap,
3365        output: &Path,
3366        progress: Option<mpsc::Sender<DownloadEvent>>,
3367        peer_count: usize,
3368    ) -> Result<u64> {
3369        self.file_download_with_progress_using_peer_count_and_reports(
3370            data_map, output, progress, peer_count, None,
3371        )
3372        .await
3373    }
3374
3375    async fn file_download_with_progress_using_peer_count_and_reports(
3376        &self,
3377        data_map: &DataMap,
3378        output: &Path,
3379        progress: Option<mpsc::Sender<DownloadEvent>>,
3380        peer_count: usize,
3381        peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
3382    ) -> Result<u64> {
3383        debug!("Downloading file to {}", output.display());
3384
3385        let parent = output.parent().unwrap_or_else(|| Path::new("."));
3386        let unique: u64 = rand::random();
3387        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
3388
3389        // Guard removes the staging file on any early return OR a panic unwind
3390        // out of the `block_in_place` decrypt loop; defused only by a
3391        // successful commit(). Centralizes what used to be three duplicated
3392        // cleanup arms.
3393        let tmp = TempDownload::new(tmp_path);
3394        let mut file = std::fs::File::create(tmp.path())?;
3395
3396        let bytes_written = self
3397            .download_decrypted_chunks(data_map, progress, peer_count, peer_reports, |bytes| {
3398                let r = file.write_all(&bytes).map_err(Error::from);
3399                std::future::ready(r)
3400            })
3401            .await?;
3402        file.flush()?;
3403        drop(file); // close the handle before rename (Windows won't rename an open file)
3404
3405        tmp.commit(output)?;
3406        info!(
3407            "File downloaded: {bytes_written} bytes written to {}",
3408            output.display()
3409        );
3410        Ok(bytes_written)
3411    }
3412
3413    /// Download and decrypt a file, streaming the plaintext to `sink` instead
3414    /// of writing to disk.
3415    ///
3416    /// Constant memory (one decrypt batch resident at a time); the caller
3417    /// receives bytes progressively as each batch decrypts, suitable for
3418    /// forwarding to an HTTP chunked body or a gRPC response stream. The
3419    /// bounded `sink` applies backpressure. If the receiver is dropped (e.g.
3420    /// the client disconnected) the download stops early and returns
3421    /// [`Error::Cancelled`].
3422    ///
3423    /// The channel item type is `Result<Bytes, Error>`, so the caller sets up:
3424    ///
3425    /// ```ignore
3426    /// let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(8);
3427    /// ```
3428    ///
3429    /// Typically the caller `tokio::spawn`s this and converts the matching
3430    /// `Receiver` into its response stream. Requires a multi-threaded Tokio
3431    /// runtime (the decrypt iterator uses `block_in_place`).
3432    pub async fn file_download_to_sender(
3433        &self,
3434        data_map: &DataMap,
3435        sink: mpsc::Sender<std::result::Result<Bytes, Error>>,
3436        progress: Option<mpsc::Sender<DownloadEvent>>,
3437    ) -> Result<u64> {
3438        let peer_count = self.config().close_group_size;
3439        self.download_decrypted_chunks(data_map, progress, peer_count, None, |bytes| {
3440            let sink = sink.clone();
3441            async move {
3442                sink.send(Ok(bytes))
3443                    .await
3444                    .map_err(|_| Error::Cancelled("download stream receiver dropped".into()))
3445            }
3446        })
3447        .await
3448    }
3449}
3450
3451#[cfg(test)]
3452#[allow(clippy::unwrap_used)]
3453mod tests {
3454    use super::*;
3455
3456    #[test]
3457    fn distributed_sample_indices_spreads_across_large_file() {
3458        // cap 5 over 100 chunks: first and last included, evenly spread.
3459        assert_eq!(distributed_sample_indices(100, 5), vec![0, 24, 49, 74, 99]);
3460    }
3461
3462    #[test]
3463    fn distributed_sample_indices_covers_whole_small_file() {
3464        // total <= cap returns every index, preserving the exact
3465        // "whole file sampled" detection in estimate_upload_cost.
3466        assert_eq!(distributed_sample_indices(3, 5), vec![0, 1, 2]);
3467        assert_eq!(distributed_sample_indices(5, 5), vec![0, 1, 2, 3, 4]);
3468    }
3469
3470    #[test]
3471    fn distributed_sample_indices_is_in_range_and_increasing() {
3472        assert!(distributed_sample_indices(0, 5).is_empty());
3473        assert_eq!(distributed_sample_indices(1, 5), vec![0]);
3474        for total in 1..200usize {
3475            let idx = distributed_sample_indices(total, 5);
3476            assert_eq!(*idx.first().unwrap(), 0);
3477            assert_eq!(*idx.last().unwrap(), total - 1);
3478            assert!(idx.iter().all(|&i| i < total));
3479            assert!(idx.windows(2).all(|w| w[0] < w[1]));
3480        }
3481    }
3482
3483    #[test]
3484    fn disk_space_check_passes_for_small_file() {
3485        // A 1 KB file should always pass the disk space check
3486        check_disk_space_for_spill(1024).unwrap();
3487    }
3488
3489    #[test]
3490    fn disk_space_check_fails_for_absurd_size() {
3491        // Requesting space for a 1 exabyte file should fail on any real system
3492        let result = check_disk_space_for_spill(u64::MAX / 2);
3493        assert!(result.is_err());
3494        let err = result.unwrap_err();
3495        assert!(
3496            matches!(err, Error::InsufficientDiskSpace(_)),
3497            "expected InsufficientDiskSpace, got: {err}"
3498        );
3499    }
3500
3501    #[test]
3502    fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
3503        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
3504
3505        assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
3506    }
3507
3508    #[test]
3509    fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
3510        let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
3511
3512        assert_eq!(batch_size, 12);
3513    }
3514
3515    #[test]
3516    fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
3517        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
3518
3519        assert_eq!(batch_size, 32);
3520    }
3521
3522    #[test]
3523    fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
3524        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
3525
3526        assert_eq!(batch_size, 10);
3527    }
3528
3529    #[test]
3530    fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
3531        let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
3532            .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
3533            .max(1);
3534        let usable_memory = estimated_bytes_per_chunk
3535            .saturating_mul(16)
3536            .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
3537        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
3538
3539        assert_eq!(batch_size, 16);
3540    }
3541
3542    #[test]
3543    fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
3544        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
3545
3546        assert_eq!(batch_size, 1);
3547    }
3548
3549    #[test]
3550    fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
3551        let covered = compute_address(&Bytes::from_static(b"covered"));
3552        let extra = compute_address(&Bytes::from_static(b"extra"));
3553        let missing = compute_address(&Bytes::from_static(b"missing"));
3554        let cached = MerkleBatchPaymentResult {
3555            proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
3556            chunk_count: 2,
3557            storage_cost_atto: "0".to_string(),
3558            gas_cost_wei: 0,
3559            merkle_payment_timestamp: 0,
3560        };
3561
3562        assert!(cached_merkle_covers_addresses(&cached, &[covered]));
3563        assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
3564        assert!(!cached_merkle_covers_addresses(
3565            &cached,
3566            &[covered, missing]
3567        ));
3568    }
3569
3570    /// A partial merkle payment leaves some addresses without a proof. Those
3571    /// must be split out so `upload_waves_merkle` reports them as failed
3572    /// (`PartialUpload`) instead of aborting the whole file — preserving the
3573    /// addresses' original order in each group.
3574    #[test]
3575    fn partition_addresses_by_proof_splits_paid_and_unpaid() {
3576        let paid_a = [1u8; 32];
3577        let unpaid_b = [2u8; 32];
3578        let paid_c = [3u8; 32];
3579        let unpaid_d = [4u8; 32];
3580        let proofs: HashMap<[u8; 32], Vec<u8>> =
3581            HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
3582
3583        let (to_store, missing) =
3584            partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
3585
3586        assert_eq!(to_store, vec![paid_a, paid_c]);
3587        assert_eq!(missing, vec![unpaid_b, unpaid_d]);
3588    }
3589
3590    /// A wave that returns `Ok` contributes its stored chunks, parsed cost, and
3591    /// stats; nothing is recorded as failed.
3592    #[test]
3593    fn fold_single_wave_keeps_ok_wave() {
3594        let stored = vec![[1u8; 32], [2u8; 32]];
3595        let stats = WaveAggregateStats {
3596            chunk_attempts_total: 7,
3597            ..Default::default()
3598        };
3599
3600        let outcome = fold_single_wave(Ok((stored.clone(), "100".to_string(), 9, stats))).unwrap();
3601
3602        assert_eq!(outcome.stored, stored);
3603        assert!(outcome.failed.is_empty());
3604        assert_eq!(outcome.storage_atto.to_string(), "100");
3605        assert_eq!(outcome.gas_wei, 9);
3606        assert_eq!(outcome.stats.chunk_attempts_total, 7);
3607    }
3608
3609    /// The core V2-461 semantic: a wave short of quorum (`PartialUpload`) is
3610    /// recoverable — its stored chunks, failed chunks, and on-chain spend are
3611    /// folded so the caller can continue to the next wave rather than aborting
3612    /// the whole file.
3613    #[test]
3614    fn fold_single_wave_folds_partial_upload() {
3615        let stored = vec![[3u8; 32]];
3616        let failed = vec![([4u8; 32], "short of quorum".to_string())];
3617        let err = Error::PartialUpload {
3618            stored: stored.clone(),
3619            stored_count: 1,
3620            failed: failed.clone(),
3621            failed_count: 1,
3622            total_chunks: 2,
3623            spend: Box::new(PartialUploadSpend {
3624                storage_cost_atto: "250".to_string(),
3625                gas_cost_wei: 11,
3626            }),
3627            reason: "wave store failed after retries".to_string(),
3628        };
3629
3630        let outcome = fold_single_wave(Err(err)).unwrap();
3631
3632        assert_eq!(outcome.stored, stored);
3633        assert_eq!(outcome.failed, failed);
3634        assert_eq!(outcome.storage_atto.to_string(), "250");
3635        assert_eq!(outcome.gas_wei, 11);
3636        // `PartialUpload` carries no stats, so the failed wave contributes none.
3637        assert_eq!(outcome.stats.chunk_attempts_total, 0);
3638    }
3639
3640    /// A non-`PartialUpload` error (wallet/payment-infrastructure failure) is
3641    /// fatal and must abort the file, not be folded into the failed set.
3642    #[test]
3643    fn fold_single_wave_propagates_fatal_error() {
3644        let result = fold_single_wave(Err(Error::Payment("wallet unavailable".to_string())));
3645
3646        assert!(
3647            matches!(result, Err(Error::Payment(_))),
3648            "fatal payment error must propagate, got: {result:?}"
3649        );
3650    }
3651
3652    #[test]
3653    fn partition_addresses_by_proof_handles_all_or_nothing() {
3654        let a = [5u8; 32];
3655        let b = [6u8; 32];
3656
3657        // No proofs at all → every address is missing.
3658        let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
3659        let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
3660        assert!(to_store.is_empty());
3661        assert_eq!(missing, vec![a, b]);
3662
3663        // All proofs present → nothing missing.
3664        let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
3665        let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
3666        assert_eq!(to_store, vec![a, b]);
3667        assert!(missing.is_empty());
3668    }
3669
3670    #[test]
3671    fn chunk_spill_round_trip() {
3672        let mut spill = ChunkSpill::new().unwrap();
3673        let data1 = vec![0xAA; 1024];
3674        let data2 = vec![0xBB; 2048];
3675
3676        spill.push(&data1).unwrap();
3677        spill.push(&data2).unwrap();
3678
3679        assert_eq!(spill.len(), 2);
3680        assert_eq!(spill.total_bytes(), 1024 + 2048);
3681        let chunk_entries = spill.chunk_entries().unwrap();
3682        let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
3683        assert_eq!(entry_total, 1024 + 2048);
3684
3685        // Read back and verify
3686        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
3687        assert_eq!(&chunk1[..], &data1[..]);
3688
3689        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
3690        assert_eq!(&chunk2[..], &data2[..]);
3691
3692        // Verify waves with 1-chunk wave size
3693        let waves: Vec<_> = spill.addresses.chunks(1).collect();
3694        assert_eq!(waves.len(), 2);
3695    }
3696
3697    #[test]
3698    fn chunk_spill_cleanup_on_drop() {
3699        let dir;
3700        {
3701            let spill = ChunkSpill::new().unwrap();
3702            dir = spill.dir.clone();
3703            assert!(dir.exists());
3704        }
3705        // After drop, the directory should be cleaned up
3706        assert!(!dir.exists(), "spill dir should be removed on drop");
3707    }
3708
3709    #[test]
3710    fn chunk_spill_deduplicates_identical_content() {
3711        let mut spill = ChunkSpill::new().unwrap();
3712        let data = vec![0xCC; 512];
3713
3714        spill.push(&data).unwrap();
3715        spill.push(&data).unwrap(); // same content, should be skipped
3716        spill.push(&data).unwrap(); // again
3717
3718        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
3719        assert_eq!(
3720            spill.total_bytes(),
3721            512,
3722            "total_bytes should count unique only"
3723        );
3724
3725        // Different content should still be added
3726        let data2 = vec![0xDD; 256];
3727        spill.push(&data2).unwrap();
3728        assert_eq!(spill.len(), 2);
3729        assert_eq!(spill.total_bytes(), 512 + 256);
3730    }
3731}
3732
3733/// Compile-time assertions that Client file method futures are Send.
3734#[cfg(test)]
3735mod send_assertions {
3736    use super::*;
3737
3738    fn _assert_send<T: Send>(_: &T) {}
3739
3740    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3741    async fn _file_upload_is_send(client: &Client) {
3742        let fut = client.file_upload(Path::new("/dev/null"));
3743        _assert_send(&fut);
3744    }
3745
3746    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3747    async fn _file_upload_with_mode_is_send(client: &Client) {
3748        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
3749        _assert_send(&fut);
3750    }
3751
3752    #[allow(
3753        dead_code,
3754        unreachable_code,
3755        unused_variables,
3756        clippy::diverging_sub_expression
3757    )]
3758    async fn _file_download_is_send(client: &Client) {
3759        let dm: DataMap = todo!();
3760        let fut = client.file_download(&dm, Path::new("/dev/null"));
3761        _assert_send(&fut);
3762    }
3763}